summaryrefslogtreecommitdiffstats
path: root/exporting/pubsub
diff options
context:
space:
mode:
Diffstat (limited to 'exporting/pubsub')
-rw-r--r--exporting/pubsub/Makefile.am8
l---------exporting/pubsub/README.md1
-rw-r--r--exporting/pubsub/integrations/google_cloud_pub_sub.md145
-rw-r--r--exporting/pubsub/metadata.yaml152
-rw-r--r--exporting/pubsub/pubsub.c195
-rw-r--r--exporting/pubsub/pubsub.h14
-rw-r--r--exporting/pubsub/pubsub_publish.cc258
-rw-r--r--exporting/pubsub/pubsub_publish.h37
8 files changed, 0 insertions, 810 deletions
diff --git a/exporting/pubsub/Makefile.am b/exporting/pubsub/Makefile.am
deleted file mode 100644
index 161784b8..00000000
--- a/exporting/pubsub/Makefile.am
+++ /dev/null
@@ -1,8 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-
-dist_noinst_DATA = \
- README.md \
- $(NULL)
diff --git a/exporting/pubsub/README.md b/exporting/pubsub/README.md
deleted file mode 120000
index 8633f172..00000000
--- a/exporting/pubsub/README.md
+++ /dev/null
@@ -1 +0,0 @@
-integrations/google_cloud_pub_sub.md \ No newline at end of file
diff --git a/exporting/pubsub/integrations/google_cloud_pub_sub.md b/exporting/pubsub/integrations/google_cloud_pub_sub.md
deleted file mode 100644
index c2483314..00000000
--- a/exporting/pubsub/integrations/google_cloud_pub_sub.md
+++ /dev/null
@@ -1,145 +0,0 @@
-<!--startmeta
-custom_edit_url: "https://github.com/netdata/netdata/edit/master/exporting/pubsub/README.md"
-meta_yaml: "https://github.com/netdata/netdata/edit/master/exporting/pubsub/metadata.yaml"
-sidebar_label: "Google Cloud Pub Sub"
-learn_status: "Published"
-learn_rel_path: "Exporting"
-message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE EXPORTER'S metadata.yaml FILE"
-endmeta-->
-
-# Google Cloud Pub Sub
-
-
-<img src="https://netdata.cloud/img/pubsub.png" width="150"/>
-
-
-Export metrics to Google Cloud Pub/Sub Service
-
-
-
-<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" />
-
-## Setup
-
-### Prerequisites
-
-####
-
-- First [install](https://github.com/googleapis/google-cloud-cpp/) install Google Cloud Platform C++ Client Libraries
-- Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc`
-- Next, Netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
-
-
-
-### Configuration
-
-#### File
-
-The configuration file name for this integration is `exporting.conf`.
-
-
-You can edit the configuration file using the `edit-config` script from the
-Netdata [config directory](https://github.com/netdata/netdata/blob/master/docs/configure/nodes.md#the-netdata-config-directory).
-
-```bash
-cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata
-sudo ./edit-config exporting.conf
-```
-#### Options
-
-The following options can be defined for this exporter.
-
-
-<details><summary>Config options</summary>
-
-| Name | Description | Default | Required |
-|:----|:-----------|:-------|:--------:|
-| enabled | Enables or disables an exporting connector instance (yes/no). | no | yes |
-| destination | Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics. | pubsub.googleapis.com | yes |
-| username | Username for HTTP authentication | my_username | no |
-| password | Password for HTTP authentication | my_password | no |
-| data source | Selects the kind of data that will be sent to the external database. (as collected/average/sum) | | no |
-| hostname | The hostname to be used for sending data to the external database server. | [global].hostname | no |
-| prefix | The prefix to add to all metrics. | Netdata | no |
-| update every | Frequency of sending sending data to the external database, in seconds. | 10 | no |
-| buffer on failures | The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. | 10 | no |
-| timeout ms | The timeout in milliseconds to wait for the external database server to process the data. | 2 * update_every * 1000 | no |
-| send hosts matching | Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/libnetdata/simple_pattern#simple-patterns). | localhost * | no |
-| send charts matching | One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. | * | no |
-| send names instead of ids | Controls the metric names Netdata should send to the external database (yes/no). | | no |
-| send configured labels | Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes/no). | | no |
-| send automatic labels | Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes/no). | | no |
-
-##### destination
-
-The format of each item in this list, is: [PROTOCOL:]IP[:PORT].
-- PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine.
-- IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port.
-- PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used.
-
-Example IPv4:
- ```yaml
- destination = pubsub.googleapis.com
- ```
-When multiple servers are defined, Netdata will try the next one when the previous one fails.
-
-
-##### update every
-
-Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers
-send data to the same database. This randomness does not affect the quality of the data, only the time they are sent.
-
-
-##### buffer on failures
-
-If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it).
-
-
-##### send hosts matching
-
-Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern).
-The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to
-filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts.
-
-A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`,
-use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative).
-
-
-##### send charts matching
-
-A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads,
-use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used,
-positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter
-has a higher priority than the configuration option.
-
-
-##### send names instead of ids
-
-Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names
-are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are
-different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc.
-
-
-</details>
-
-#### Examples
-
-##### Basic configuration
-
-- Set the destination option to a Pub/Sub service endpoint. pubsub.googleapis.com is the default one.
-- Create the credentials JSON file by following Google Cloud's authentication guide.
-- The user running the Agent (typically netdata) needs read access to google_cloud_credentials.json, which you can set
- `chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json`
-- Set the credentials file option to the full path of the file.
-
-
-```yaml
-[pubsub:my_instance]
- enabled = yes
- destination = pubsub.googleapis.com
- credentials file = /etc/netdata/google_cloud_credentials.json
- project id = my_project
- topic id = my_topic
-
-```
-
diff --git a/exporting/pubsub/metadata.yaml b/exporting/pubsub/metadata.yaml
deleted file mode 100644
index 7f57bb80..00000000
--- a/exporting/pubsub/metadata.yaml
+++ /dev/null
@@ -1,152 +0,0 @@
-# yamllint disable rule:line-length
----
-id: 'export-google-pubsub'
-meta:
- name: 'Google Cloud Pub Sub'
- link: 'https://cloud.google.com/pubsub'
- categories:
- - export
- icon_filename: 'pubsub.png'
-keywords:
- - exporter
- - Google Cloud
- - Pub Sub
-overview:
- exporter_description: |
- Export metrics to Google Cloud Pub/Sub Service
- exporter_limitations: ''
-setup:
- prerequisites:
- list:
- - title: ''
- description: |
- - First [install](https://github.com/googleapis/google-cloud-cpp/) install Google Cloud Platform C++ Client Libraries
- - Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc`
- - Next, Netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
- configuration:
- file:
- name: 'exporting.conf'
- options:
- description: |
- The following options can be defined for this exporter.
- folding:
- title: 'Config options'
- enabled: true
- list:
- - name: 'enabled'
- default_value: 'no'
- description: 'Enables or disables an exporting connector instance (yes|no).'
- required: true
- - name: 'destination'
- default_value: 'pubsub.googleapis.com'
- description: 'Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics.'
- required: true
- detailed_description: |
- The format of each item in this list, is: [PROTOCOL:]IP[:PORT].
- - PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine.
- - IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port.
- - PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used.
-
- Example IPv4:
- ```yaml
- destination = pubsub.googleapis.com
- ```
- When multiple servers are defined, Netdata will try the next one when the previous one fails.
- - name: 'username'
- default_value: 'my_username'
- description: 'Username for HTTP authentication'
- required: false
- - name: 'password'
- default_value: 'my_password'
- description: 'Password for HTTP authentication'
- required: false
- - name: 'data source'
- default_value: ''
- description: 'Selects the kind of data that will be sent to the external database. (as collected|average|sum)'
- required: false
- - name: 'hostname'
- default_value: '[global].hostname'
- description: 'The hostname to be used for sending data to the external database server.'
- required: false
- - name: 'prefix'
- default_value: 'Netdata'
- description: 'The prefix to add to all metrics.'
- required: false
- - name: 'update every'
- default_value: '10'
- description: |
- Frequency of sending sending data to the external database, in seconds.
- required: false
- detailed_description: |
- Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers
- send data to the same database. This randomness does not affect the quality of the data, only the time they are sent.
- - name: 'buffer on failures'
- default_value: '10'
- description: |
- The number of iterations (`update every` seconds) to buffer data, when the external database server is not available.
- required: false
- detailed_description: |
- If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it).
- - name: 'timeout ms'
- default_value: '2 * update_every * 1000'
- description: 'The timeout in milliseconds to wait for the external database server to process the data.'
- required: false
- - name: 'send hosts matching'
- default_value: 'localhost *'
- description: |
- Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/libnetdata/simple_pattern#simple-patterns).
- required: false
- detailed_description: |
- Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern).
- The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to
- filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts.
-
- A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`,
- use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative).
- - name: 'send charts matching'
- default_value: '*'
- description: |
- One or more space separated patterns (use * as wildcard) checked against both chart id and chart name.
- required: false
- detailed_description: |
- A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads,
- use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used,
- positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter
- has a higher priority than the configuration option.
- - name: 'send names instead of ids'
- default_value: ''
- description: 'Controls the metric names Netdata should send to the external database (yes|no).'
- required: false
- detailed_description: |
- Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names
- are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are
- different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc.
- - name: 'send configured labels'
- default_value: ''
- description: 'Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes|no).'
- required: false
- - name: 'send automatic labels'
- default_value: ''
- description: 'Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes|no).'
- required: false
- examples:
- folding:
- enabled: true
- title: ''
- list:
- - name: 'Basic configuration'
- folding:
- enabled: false
- description: |
- - Set the destination option to a Pub/Sub service endpoint. pubsub.googleapis.com is the default one.
- - Create the credentials JSON file by following Google Cloud's authentication guide.
- - The user running the Agent (typically netdata) needs read access to google_cloud_credentials.json, which you can set
- `chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json`
- - Set the credentials file option to the full path of the file.
- config: |
- [pubsub:my_instance]
- enabled = yes
- destination = pubsub.googleapis.com
- credentials file = /etc/netdata/google_cloud_credentials.json
- project id = my_project
- topic id = my_topic
diff --git a/exporting/pubsub/pubsub.c b/exporting/pubsub/pubsub.c
deleted file mode 100644
index 4989160a..00000000
--- a/exporting/pubsub/pubsub.c
+++ /dev/null
@@ -1,195 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "pubsub.h"
-
-/**
- * Initialize Pub/Sub connector instance
- *
- * @param instance an instance data structure.
- * @return Returns 0 on success, 1 on failure.
- */
-int init_pubsub_instance(struct instance *instance)
-{
- instance->worker = pubsub_connector_worker;
-
- instance->start_batch_formatting = NULL;
- instance->start_host_formatting = format_host_labels_json_plaintext;
- instance->start_chart_formatting = NULL;
-
-
- if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
- instance->metric_formatting = format_dimension_collected_json_plaintext;
- else
- instance->metric_formatting = format_dimension_stored_json_plaintext;
-
- instance->end_chart_formatting = NULL;
- instance->variables_formatting = NULL;
- instance->end_host_formatting = flush_host_labels;
- instance->end_batch_formatting = NULL;
-
- instance->prepare_header = NULL;
- instance->check_response = NULL;
-
- instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
- if (!instance->buffer) {
- netdata_log_error("EXPORTING: cannot create buffer for Pub/Sub exporting connector instance %s", instance->config.name);
- return 1;
- }
- uv_mutex_init(&instance->mutex);
- uv_cond_init(&instance->cond_var);
-
- struct pubsub_specific_data *connector_specific_data = callocz(1, sizeof(struct pubsub_specific_data));
- instance->connector_specific_data = (void *)connector_specific_data;
-
- struct pubsub_specific_config *connector_specific_config =
- (struct pubsub_specific_config *)instance->config.connector_specific_config;
- char error_message[ERROR_LINE_MAX + 1] = "";
- if (pubsub_init(
- (void *)connector_specific_data, error_message, instance->config.destination,
- connector_specific_config->credentials_file, connector_specific_config->project_id,
- connector_specific_config->topic_id)) {
- netdata_log_error(
- "EXPORTING: Cannot initialize a Pub/Sub publisher for instance %s: %s",
- instance->config.name, error_message);
- return 1;
- }
-
- return 0;
-}
-
-/**
- * Clean a PubSub connector instance
- *
- * @param instance an instance data structure.
- */
-void clean_pubsub_instance(struct instance *instance)
-{
- netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name);
-
- struct pubsub_specific_data *connector_specific_data =
- (struct pubsub_specific_data *)instance->connector_specific_data;
- pubsub_cleanup(connector_specific_data);
- freez(connector_specific_data);
-
- buffer_free(instance->buffer);
-
- struct pubsub_specific_config *connector_specific_config =
- (struct pubsub_specific_config *)instance->config.connector_specific_config;
- freez(connector_specific_config->credentials_file);
- freez(connector_specific_config->project_id);
- freez(connector_specific_config->topic_id);
- freez(connector_specific_config);
-
- netdata_log_info("EXPORTING: instance %s exited", instance->config.name);
- instance->exited = 1;
-
- return;
-}
-
-/**
- * Pub/Sub connector worker
- *
- * Runs in a separate thread for every instance.
- *
- * @param instance_p an instance data structure.
- */
-void pubsub_connector_worker(void *instance_p)
-{
- struct instance *instance = (struct instance *)instance_p;
- struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config;
- struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data;
-
- while (!instance->engine->exit) {
- struct stats *stats = &instance->stats;
- char error_message[ERROR_LINE_MAX + 1] = "";
-
- uv_mutex_lock(&instance->mutex);
- while (!instance->data_is_ready)
- uv_cond_wait(&instance->cond_var, &instance->mutex);
- instance->data_is_ready = 0;
-
-
- if (unlikely(instance->engine->exit)) {
- uv_mutex_unlock(&instance->mutex);
- break;
- }
-
- // reset the monitoring chart counters
- stats->received_bytes =
- stats->sent_bytes =
- stats->sent_metrics =
- stats->lost_metrics =
- stats->receptions =
- stats->transmission_successes =
- stats->transmission_failures =
- stats->data_lost_events =
- stats->lost_bytes =
- stats->reconnects = 0;
-
- BUFFER *buffer = (BUFFER *)instance->buffer;
- size_t buffer_len = buffer_strlen(buffer);
-
- stats->buffered_bytes = buffer_len;
-
- if (pubsub_add_message(instance->connector_specific_data, (char *)buffer_tostring(buffer))) {
- netdata_log_error("EXPORTING: Instance %s: Cannot add data to a message", instance->config.name);
-
- stats->data_lost_events++;
- stats->lost_metrics += stats->buffered_metrics;
- stats->lost_bytes += buffer_len;
-
- goto cleanup;
- }
-
- netdata_log_debug(
- D_EXPORTING, "EXPORTING: pubsub_publish(): project = %s, topic = %s, buffer = %zu",
- connector_specific_config->project_id, connector_specific_config->topic_id, buffer_len);
-
- if (pubsub_publish((void *)connector_specific_data, error_message, stats->buffered_metrics, buffer_len)) {
- netdata_log_error("EXPORTING: Instance: %s: Cannot publish a message: %s", instance->config.name, error_message);
-
- stats->transmission_failures++;
- stats->data_lost_events++;
- stats->lost_metrics += stats->buffered_metrics;
- stats->lost_bytes += buffer_len;
-
- goto cleanup;
- }
-
- stats->sent_bytes = buffer_len;
- stats->transmission_successes++;
-
- size_t sent_metrics = 0, lost_metrics = 0, sent_bytes = 0, lost_bytes = 0;
-
- if (unlikely(pubsub_get_result(
- connector_specific_data, error_message, &sent_metrics, &sent_bytes, &lost_metrics, &lost_bytes))) {
- // oops! we couldn't send (all or some of the) data
- netdata_log_error("EXPORTING: %s", error_message);
- netdata_log_error(
- "EXPORTING: failed to write data to service '%s'. Willing to write %zu bytes, wrote %zu bytes.",
- instance->config.destination, lost_bytes, sent_bytes);
-
- stats->transmission_failures++;
- stats->data_lost_events++;
- stats->lost_metrics += lost_metrics;
- stats->lost_bytes += lost_bytes;
- } else {
- stats->receptions++;
- stats->sent_metrics = sent_metrics;
- }
-
- cleanup:
- send_internal_metrics(instance);
-
- buffer_flush(buffer);
- stats->buffered_metrics = 0;
-
- uv_mutex_unlock(&instance->mutex);
-
-#ifdef UNIT_TESTING
- return;
-#endif
- }
-
- clean_pubsub_instance(instance);
-}
diff --git a/exporting/pubsub/pubsub.h b/exporting/pubsub/pubsub.h
deleted file mode 100644
index 0bcb76f9..00000000
--- a/exporting/pubsub/pubsub.h
+++ /dev/null
@@ -1,14 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_EXPORTING_PUBSUB_H
-#define NETDATA_EXPORTING_PUBSUB_H
-
-#include "exporting/exporting_engine.h"
-#include "exporting/json/json.h"
-#include "pubsub_publish.h"
-
-int init_pubsub_instance(struct instance *instance);
-void clean_pubsub_instance(struct instance *instance);
-void pubsub_connector_worker(void *instance_p);
-
-#endif //NETDATA_EXPORTING_PUBSUB_H
diff --git a/exporting/pubsub/pubsub_publish.cc b/exporting/pubsub/pubsub_publish.cc
deleted file mode 100644
index cc14154f..00000000
--- a/exporting/pubsub/pubsub_publish.cc
+++ /dev/null
@@ -1,258 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include <google/pubsub/v1/pubsub.grpc.pb.h>
-#include <grpcpp/grpcpp.h>
-#include <stdexcept>
-#include "pubsub_publish.h"
-
-#define EVENT_CHECK_TIMEOUT 50
-
-struct response {
- grpc::ClientContext *context;
- google::pubsub::v1::PublishResponse *publish_response;
- size_t tag;
- grpc::Status *status;
-
- size_t published_metrics;
- size_t published_bytes;
-};
-
-static inline void copy_error_message(char *error_message_dst, const char *error_message_src)
-{
- std::strncpy(error_message_dst, error_message_src, ERROR_LINE_MAX);
- error_message_dst[ERROR_LINE_MAX] = '\0';
-}
-
-/**
- * Initialize a Pub/Sub client and a data structure for responses.
- *
- * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
- * @param error_message report error message to a caller.
- * @param destination a Pub/Sub service endpoint.
- * @param credentials_file a full path for a file with google application credentials.
- * @param project_id a project ID.
- * @param topic_id a topic ID.
- * @return Returns 0 on success, 1 on failure.
- */
-int pubsub_init(
- void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file,
- const char *project_id, const char *topic_id)
-{
- struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
-
- try {
- setenv("GOOGLE_APPLICATION_CREDENTIALS", credentials_file, 0);
-
- std::shared_ptr<grpc::ChannelCredentials> credentials = grpc::GoogleDefaultCredentials();
- if (credentials == nullptr) {
- copy_error_message(error_message, "Can't load credentials");
- return 1;
- }
-
- std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(destination, credentials);
-
- google::pubsub::v1::Publisher::Stub *stub = new google::pubsub::v1::Publisher::Stub(channel);
- if (!stub) {
- copy_error_message(error_message, "Can't create a publisher stub");
- return 1;
- }
-
- connector_specific_data->stub = stub;
-
- google::pubsub::v1::PublishRequest *request = new google::pubsub::v1::PublishRequest;
- connector_specific_data->request = request;
- ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))
- ->set_topic(std::string("projects/") + project_id + "/topics/" + topic_id);
-
- grpc::CompletionQueue *cq = new grpc::CompletionQueue;
- connector_specific_data->completion_queue = cq;
-
- connector_specific_data->responses = new std::list<struct response>;
-
- return 0;
- } catch (std::exception const &ex) {
- std::string em(std::string("Standard exception raised: ") + ex.what());
- copy_error_message(error_message, em.c_str());
- return 1;
- }
-
- return 0;
-}
-
-/**
- * Clean the PubSub connector instance specific data
- */
-void pubsub_cleanup(void *pubsub_specific_data_p)
-{
- struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
-
- std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
- std::list<struct response>::iterator response;
- for (response = responses->begin(); response != responses->end(); ++response) {
- // TODO: If we do this, there are a huge amount of possibly lost records. We need to find a right way of
- // cleaning up contexts
- // delete response->context;
- delete response->publish_response;
- delete response->status;
- }
- delete responses;
-
- ((grpc::CompletionQueue *)connector_specific_data->completion_queue)->Shutdown();
- delete (grpc::CompletionQueue *)connector_specific_data->completion_queue;
- delete (google::pubsub::v1::PublishRequest *)connector_specific_data->request;
- delete (google::pubsub::v1::Publisher::Stub *)connector_specific_data->stub;
-
- // TODO: Find how to shutdown grpc gracefully. grpc_shutdown() doesn't seem to work.
- // grpc_shutdown();
-
- return;
-}
-
-/**
- * Add data to a Pub/Sub request message.
- *
- * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
- * @param data a text buffer with metrics.
- * @return Returns 0 on success, 1 on failure.
- */
-int pubsub_add_message(void *pubsub_specific_data_p, char *data)
-{
- struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
-
- try {
- google::pubsub::v1::PubsubMessage *message =
- ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->add_messages();
- if (!message)
- return 1;
-
- message->set_data(data);
- } catch (std::exception const &ex) {
- return 1;
- }
-
- return 0;
-}
-
-/**
- * Send data to the Pub/Sub service
- *
- * @param pubsub_specific_data_p a pointer to a structure with client and request outcome information.
- * @param error_message report error message to a caller.
- * @param buffered_metrics the number of metrics we are going to send.
- * @param buffered_bytes the number of bytes we are going to send.
- * @return Returns 0 on success, 1 on failure.
- */
-int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes)
-{
- struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
-
- try {
- grpc::ClientContext *context = new grpc::ClientContext;
-
- std::unique_ptr<grpc::ClientAsyncResponseReader<google::pubsub::v1::PublishResponse> > rpc(
- ((google::pubsub::v1::Publisher::Stub *)(connector_specific_data->stub))
- ->AsyncPublish(
- context, (*(google::pubsub::v1::PublishRequest *)(connector_specific_data->request)),
- ((grpc::CompletionQueue *)(connector_specific_data->completion_queue))));
-
- struct response response;
- response.context = context;
- response.publish_response = new google::pubsub::v1::PublishResponse;
- response.tag = connector_specific_data->last_tag++;
- response.status = new grpc::Status;
- response.published_metrics = buffered_metrics;
- response.published_bytes = buffered_bytes;
-
- rpc->Finish(response.publish_response, response.status, (void *)response.tag);
-
- ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->clear_messages();
-
- ((std::list<struct response> *)(connector_specific_data->responses))->push_back(response);
- } catch (std::exception const &ex) {
- std::string em(std::string("Standard exception raised: ") + ex.what());
- copy_error_message(error_message, em.c_str());
- return 1;
- }
-
- return 0;
-}
-
-/**
- * Get results from service responses
- *
- * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
- * @param error_message report error message to a caller.
- * @param sent_metrics report to a caller how many metrics was successfully sent.
- * @param sent_bytes report to a caller how many bytes was successfully sent.
- * @param lost_metrics report to a caller how many metrics was lost during transmission.
- * @param lost_bytes report to a caller how many bytes was lost during transmission.
- * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission.
- */
-int pubsub_get_result(
- void *pubsub_specific_data_p, char *error_message,
- size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes)
-{
- struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
- std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
- grpc::CompletionQueue::NextStatus next_status;
-
- *sent_metrics = 0;
- *sent_bytes = 0;
- *lost_metrics = 0;
- *lost_bytes = 0;
-
- try {
- do {
- std::list<struct response>::iterator response;
- void *got_tag;
- bool ok = false;
-
- auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(50);
- next_status = (*(grpc::CompletionQueue *)(connector_specific_data->completion_queue))
- .AsyncNext(&got_tag, &ok, deadline);
-
- if (next_status == grpc::CompletionQueue::GOT_EVENT) {
- for (response = responses->begin(); response != responses->end(); ++response) {
- if ((void *)response->tag == got_tag)
- break;
- }
-
- if (response == responses->end()) {
- copy_error_message(error_message, "Cannot get Pub/Sub response");
- return 1;
- }
-
- if (ok && response->publish_response->message_ids_size()) {
- *sent_metrics += response->published_metrics;
- *sent_bytes += response->published_bytes;
- } else {
- *lost_metrics += response->published_metrics;
- *lost_bytes += response->published_bytes;
- response->status->error_message().copy(error_message, ERROR_LINE_MAX);
- error_message[ERROR_LINE_MAX] = '\0';
- }
-
- delete response->context;
- delete response->publish_response;
- delete response->status;
- responses->erase(response);
- }
-
- if (next_status == grpc::CompletionQueue::SHUTDOWN) {
- copy_error_message(error_message, "Completion queue shutdown");
- return 1;
- }
-
- } while (next_status == grpc::CompletionQueue::GOT_EVENT);
- } catch (std::exception const &ex) {
- std::string em(std::string("Standard exception raised: ") + ex.what());
- copy_error_message(error_message, em.c_str());
- return 1;
- }
-
- if (*lost_metrics) {
- return 1;
- }
-
- return 0;
-}
diff --git a/exporting/pubsub/pubsub_publish.h b/exporting/pubsub/pubsub_publish.h
deleted file mode 100644
index 567a262f..00000000
--- a/exporting/pubsub/pubsub_publish.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_EXPORTING_PUBSUB_PUBLISH_H
-#define NETDATA_EXPORTING_PUBSUB_PUBLISH_H
-
-#define ERROR_LINE_MAX 1023
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-struct pubsub_specific_data {
- void *stub;
- void *request;
- void *completion_queue;
-
- void *responses;
- size_t last_tag;
-};
-
-int pubsub_init(
- void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file,
- const char *project_id, const char *topic_id);
-void pubsub_cleanup(void *pubsub_specific_data_p);
-
-int pubsub_add_message(void *pubsub_specific_data_p, char *data);
-
-int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes);
-int pubsub_get_result(
- void *pubsub_specific_data_p, char *error_message,
- size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif //NETDATA_EXPORTING_PUBSUB_PUBLISH_H