diff options
Diffstat (limited to 'exporting/pubsub')
-rw-r--r-- | exporting/pubsub/Makefile.am | 8 | ||||
l--------- | exporting/pubsub/README.md | 1 | ||||
-rw-r--r-- | exporting/pubsub/integrations/google_cloud_pub_sub.md | 145 | ||||
-rw-r--r-- | exporting/pubsub/metadata.yaml | 152 | ||||
-rw-r--r-- | exporting/pubsub/pubsub.c | 195 | ||||
-rw-r--r-- | exporting/pubsub/pubsub.h | 14 | ||||
-rw-r--r-- | exporting/pubsub/pubsub_publish.cc | 258 | ||||
-rw-r--r-- | exporting/pubsub/pubsub_publish.h | 37 |
8 files changed, 0 insertions, 810 deletions
diff --git a/exporting/pubsub/Makefile.am b/exporting/pubsub/Makefile.am deleted file mode 100644 index 161784b8..00000000 --- a/exporting/pubsub/Makefile.am +++ /dev/null @@ -1,8 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in - -dist_noinst_DATA = \ - README.md \ - $(NULL) diff --git a/exporting/pubsub/README.md b/exporting/pubsub/README.md deleted file mode 120000 index 8633f172..00000000 --- a/exporting/pubsub/README.md +++ /dev/null @@ -1 +0,0 @@ -integrations/google_cloud_pub_sub.md
\ No newline at end of file diff --git a/exporting/pubsub/integrations/google_cloud_pub_sub.md b/exporting/pubsub/integrations/google_cloud_pub_sub.md deleted file mode 100644 index c2483314..00000000 --- a/exporting/pubsub/integrations/google_cloud_pub_sub.md +++ /dev/null @@ -1,145 +0,0 @@ -<!--startmeta -custom_edit_url: "https://github.com/netdata/netdata/edit/master/exporting/pubsub/README.md" -meta_yaml: "https://github.com/netdata/netdata/edit/master/exporting/pubsub/metadata.yaml" -sidebar_label: "Google Cloud Pub Sub" -learn_status: "Published" -learn_rel_path: "Exporting" -message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE EXPORTER'S metadata.yaml FILE" -endmeta--> - -# Google Cloud Pub Sub - - -<img src="https://netdata.cloud/img/pubsub.png" width="150"/> - - -Export metrics to Google Cloud Pub/Sub Service - - - -<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" /> - -## Setup - -### Prerequisites - -#### - -- First [install](https://github.com/googleapis/google-cloud-cpp/) install Google Cloud Platform C++ Client Libraries -- Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc` -- Next, Netdata should be re-installed from the source. The installer will detect that the required libraries are now available. - - - -### Configuration - -#### File - -The configuration file name for this integration is `exporting.conf`. - - -You can edit the configuration file using the `edit-config` script from the -Netdata [config directory](https://github.com/netdata/netdata/blob/master/docs/configure/nodes.md#the-netdata-config-directory). - -```bash -cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata -sudo ./edit-config exporting.conf -``` -#### Options - -The following options can be defined for this exporter. - - -<details><summary>Config options</summary> - -| Name | Description | Default | Required | -|:----|:-----------|:-------|:--------:| -| enabled | Enables or disables an exporting connector instance (yes/no). | no | yes | -| destination | Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics. | pubsub.googleapis.com | yes | -| username | Username for HTTP authentication | my_username | no | -| password | Password for HTTP authentication | my_password | no | -| data source | Selects the kind of data that will be sent to the external database. (as collected/average/sum) | | no | -| hostname | The hostname to be used for sending data to the external database server. | [global].hostname | no | -| prefix | The prefix to add to all metrics. | Netdata | no | -| update every | Frequency of sending sending data to the external database, in seconds. | 10 | no | -| buffer on failures | The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. | 10 | no | -| timeout ms | The timeout in milliseconds to wait for the external database server to process the data. | 2 * update_every * 1000 | no | -| send hosts matching | Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/libnetdata/simple_pattern#simple-patterns). | localhost * | no | -| send charts matching | One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. | * | no | -| send names instead of ids | Controls the metric names Netdata should send to the external database (yes/no). | | no | -| send configured labels | Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes/no). | | no | -| send automatic labels | Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes/no). | | no | - -##### destination - -The format of each item in this list, is: [PROTOCOL:]IP[:PORT]. -- PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine. -- IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port. -- PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used. - -Example IPv4: - ```yaml - destination = pubsub.googleapis.com - ``` -When multiple servers are defined, Netdata will try the next one when the previous one fails. - - -##### update every - -Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers -send data to the same database. This randomness does not affect the quality of the data, only the time they are sent. - - -##### buffer on failures - -If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it). - - -##### send hosts matching - -Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern). -The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to -filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts. - -A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`, -use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative). - - -##### send charts matching - -A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads, -use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used, -positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter -has a higher priority than the configuration option. - - -##### send names instead of ids - -Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names -are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are -different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc. - - -</details> - -#### Examples - -##### Basic configuration - -- Set the destination option to a Pub/Sub service endpoint. pubsub.googleapis.com is the default one. -- Create the credentials JSON file by following Google Cloud's authentication guide. -- The user running the Agent (typically netdata) needs read access to google_cloud_credentials.json, which you can set - `chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json` -- Set the credentials file option to the full path of the file. - - -```yaml -[pubsub:my_instance] - enabled = yes - destination = pubsub.googleapis.com - credentials file = /etc/netdata/google_cloud_credentials.json - project id = my_project - topic id = my_topic - -``` - diff --git a/exporting/pubsub/metadata.yaml b/exporting/pubsub/metadata.yaml deleted file mode 100644 index 7f57bb80..00000000 --- a/exporting/pubsub/metadata.yaml +++ /dev/null @@ -1,152 +0,0 @@ -# yamllint disable rule:line-length ---- -id: 'export-google-pubsub' -meta: - name: 'Google Cloud Pub Sub' - link: 'https://cloud.google.com/pubsub' - categories: - - export - icon_filename: 'pubsub.png' -keywords: - - exporter - - Google Cloud - - Pub Sub -overview: - exporter_description: | - Export metrics to Google Cloud Pub/Sub Service - exporter_limitations: '' -setup: - prerequisites: - list: - - title: '' - description: | - - First [install](https://github.com/googleapis/google-cloud-cpp/) install Google Cloud Platform C++ Client Libraries - - Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc` - - Next, Netdata should be re-installed from the source. The installer will detect that the required libraries are now available. - configuration: - file: - name: 'exporting.conf' - options: - description: | - The following options can be defined for this exporter. - folding: - title: 'Config options' - enabled: true - list: - - name: 'enabled' - default_value: 'no' - description: 'Enables or disables an exporting connector instance (yes|no).' - required: true - - name: 'destination' - default_value: 'pubsub.googleapis.com' - description: 'Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics.' - required: true - detailed_description: | - The format of each item in this list, is: [PROTOCOL:]IP[:PORT]. - - PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine. - - IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port. - - PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used. - - Example IPv4: - ```yaml - destination = pubsub.googleapis.com - ``` - When multiple servers are defined, Netdata will try the next one when the previous one fails. - - name: 'username' - default_value: 'my_username' - description: 'Username for HTTP authentication' - required: false - - name: 'password' - default_value: 'my_password' - description: 'Password for HTTP authentication' - required: false - - name: 'data source' - default_value: '' - description: 'Selects the kind of data that will be sent to the external database. (as collected|average|sum)' - required: false - - name: 'hostname' - default_value: '[global].hostname' - description: 'The hostname to be used for sending data to the external database server.' - required: false - - name: 'prefix' - default_value: 'Netdata' - description: 'The prefix to add to all metrics.' - required: false - - name: 'update every' - default_value: '10' - description: | - Frequency of sending sending data to the external database, in seconds. - required: false - detailed_description: | - Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers - send data to the same database. This randomness does not affect the quality of the data, only the time they are sent. - - name: 'buffer on failures' - default_value: '10' - description: | - The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. - required: false - detailed_description: | - If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it). - - name: 'timeout ms' - default_value: '2 * update_every * 1000' - description: 'The timeout in milliseconds to wait for the external database server to process the data.' - required: false - - name: 'send hosts matching' - default_value: 'localhost *' - description: | - Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/libnetdata/simple_pattern#simple-patterns). - required: false - detailed_description: | - Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern). - The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to - filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts. - - A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`, - use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative). - - name: 'send charts matching' - default_value: '*' - description: | - One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. - required: false - detailed_description: | - A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads, - use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used, - positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter - has a higher priority than the configuration option. - - name: 'send names instead of ids' - default_value: '' - description: 'Controls the metric names Netdata should send to the external database (yes|no).' - required: false - detailed_description: | - Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names - are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are - different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc. - - name: 'send configured labels' - default_value: '' - description: 'Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes|no).' - required: false - - name: 'send automatic labels' - default_value: '' - description: 'Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes|no).' - required: false - examples: - folding: - enabled: true - title: '' - list: - - name: 'Basic configuration' - folding: - enabled: false - description: | - - Set the destination option to a Pub/Sub service endpoint. pubsub.googleapis.com is the default one. - - Create the credentials JSON file by following Google Cloud's authentication guide. - - The user running the Agent (typically netdata) needs read access to google_cloud_credentials.json, which you can set - `chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json` - - Set the credentials file option to the full path of the file. - config: | - [pubsub:my_instance] - enabled = yes - destination = pubsub.googleapis.com - credentials file = /etc/netdata/google_cloud_credentials.json - project id = my_project - topic id = my_topic diff --git a/exporting/pubsub/pubsub.c b/exporting/pubsub/pubsub.c deleted file mode 100644 index 4989160a..00000000 --- a/exporting/pubsub/pubsub.c +++ /dev/null @@ -1,195 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "pubsub.h" - -/** - * Initialize Pub/Sub connector instance - * - * @param instance an instance data structure. - * @return Returns 0 on success, 1 on failure. - */ -int init_pubsub_instance(struct instance *instance) -{ - instance->worker = pubsub_connector_worker; - - instance->start_batch_formatting = NULL; - instance->start_host_formatting = format_host_labels_json_plaintext; - instance->start_chart_formatting = NULL; - - - if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) - instance->metric_formatting = format_dimension_collected_json_plaintext; - else - instance->metric_formatting = format_dimension_stored_json_plaintext; - - instance->end_chart_formatting = NULL; - instance->variables_formatting = NULL; - instance->end_host_formatting = flush_host_labels; - instance->end_batch_formatting = NULL; - - instance->prepare_header = NULL; - instance->check_response = NULL; - - instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters); - if (!instance->buffer) { - netdata_log_error("EXPORTING: cannot create buffer for Pub/Sub exporting connector instance %s", instance->config.name); - return 1; - } - uv_mutex_init(&instance->mutex); - uv_cond_init(&instance->cond_var); - - struct pubsub_specific_data *connector_specific_data = callocz(1, sizeof(struct pubsub_specific_data)); - instance->connector_specific_data = (void *)connector_specific_data; - - struct pubsub_specific_config *connector_specific_config = - (struct pubsub_specific_config *)instance->config.connector_specific_config; - char error_message[ERROR_LINE_MAX + 1] = ""; - if (pubsub_init( - (void *)connector_specific_data, error_message, instance->config.destination, - connector_specific_config->credentials_file, connector_specific_config->project_id, - connector_specific_config->topic_id)) { - netdata_log_error( - "EXPORTING: Cannot initialize a Pub/Sub publisher for instance %s: %s", - instance->config.name, error_message); - return 1; - } - - return 0; -} - -/** - * Clean a PubSub connector instance - * - * @param instance an instance data structure. - */ -void clean_pubsub_instance(struct instance *instance) -{ - netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name); - - struct pubsub_specific_data *connector_specific_data = - (struct pubsub_specific_data *)instance->connector_specific_data; - pubsub_cleanup(connector_specific_data); - freez(connector_specific_data); - - buffer_free(instance->buffer); - - struct pubsub_specific_config *connector_specific_config = - (struct pubsub_specific_config *)instance->config.connector_specific_config; - freez(connector_specific_config->credentials_file); - freez(connector_specific_config->project_id); - freez(connector_specific_config->topic_id); - freez(connector_specific_config); - - netdata_log_info("EXPORTING: instance %s exited", instance->config.name); - instance->exited = 1; - - return; -} - -/** - * Pub/Sub connector worker - * - * Runs in a separate thread for every instance. - * - * @param instance_p an instance data structure. - */ -void pubsub_connector_worker(void *instance_p) -{ - struct instance *instance = (struct instance *)instance_p; - struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config; - struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data; - - while (!instance->engine->exit) { - struct stats *stats = &instance->stats; - char error_message[ERROR_LINE_MAX + 1] = ""; - - uv_mutex_lock(&instance->mutex); - while (!instance->data_is_ready) - uv_cond_wait(&instance->cond_var, &instance->mutex); - instance->data_is_ready = 0; - - - if (unlikely(instance->engine->exit)) { - uv_mutex_unlock(&instance->mutex); - break; - } - - // reset the monitoring chart counters - stats->received_bytes = - stats->sent_bytes = - stats->sent_metrics = - stats->lost_metrics = - stats->receptions = - stats->transmission_successes = - stats->transmission_failures = - stats->data_lost_events = - stats->lost_bytes = - stats->reconnects = 0; - - BUFFER *buffer = (BUFFER *)instance->buffer; - size_t buffer_len = buffer_strlen(buffer); - - stats->buffered_bytes = buffer_len; - - if (pubsub_add_message(instance->connector_specific_data, (char *)buffer_tostring(buffer))) { - netdata_log_error("EXPORTING: Instance %s: Cannot add data to a message", instance->config.name); - - stats->data_lost_events++; - stats->lost_metrics += stats->buffered_metrics; - stats->lost_bytes += buffer_len; - - goto cleanup; - } - - netdata_log_debug( - D_EXPORTING, "EXPORTING: pubsub_publish(): project = %s, topic = %s, buffer = %zu", - connector_specific_config->project_id, connector_specific_config->topic_id, buffer_len); - - if (pubsub_publish((void *)connector_specific_data, error_message, stats->buffered_metrics, buffer_len)) { - netdata_log_error("EXPORTING: Instance: %s: Cannot publish a message: %s", instance->config.name, error_message); - - stats->transmission_failures++; - stats->data_lost_events++; - stats->lost_metrics += stats->buffered_metrics; - stats->lost_bytes += buffer_len; - - goto cleanup; - } - - stats->sent_bytes = buffer_len; - stats->transmission_successes++; - - size_t sent_metrics = 0, lost_metrics = 0, sent_bytes = 0, lost_bytes = 0; - - if (unlikely(pubsub_get_result( - connector_specific_data, error_message, &sent_metrics, &sent_bytes, &lost_metrics, &lost_bytes))) { - // oops! we couldn't send (all or some of the) data - netdata_log_error("EXPORTING: %s", error_message); - netdata_log_error( - "EXPORTING: failed to write data to service '%s'. Willing to write %zu bytes, wrote %zu bytes.", - instance->config.destination, lost_bytes, sent_bytes); - - stats->transmission_failures++; - stats->data_lost_events++; - stats->lost_metrics += lost_metrics; - stats->lost_bytes += lost_bytes; - } else { - stats->receptions++; - stats->sent_metrics = sent_metrics; - } - - cleanup: - send_internal_metrics(instance); - - buffer_flush(buffer); - stats->buffered_metrics = 0; - - uv_mutex_unlock(&instance->mutex); - -#ifdef UNIT_TESTING - return; -#endif - } - - clean_pubsub_instance(instance); -} diff --git a/exporting/pubsub/pubsub.h b/exporting/pubsub/pubsub.h deleted file mode 100644 index 0bcb76f9..00000000 --- a/exporting/pubsub/pubsub.h +++ /dev/null @@ -1,14 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_EXPORTING_PUBSUB_H -#define NETDATA_EXPORTING_PUBSUB_H - -#include "exporting/exporting_engine.h" -#include "exporting/json/json.h" -#include "pubsub_publish.h" - -int init_pubsub_instance(struct instance *instance); -void clean_pubsub_instance(struct instance *instance); -void pubsub_connector_worker(void *instance_p); - -#endif //NETDATA_EXPORTING_PUBSUB_H diff --git a/exporting/pubsub/pubsub_publish.cc b/exporting/pubsub/pubsub_publish.cc deleted file mode 100644 index cc14154f..00000000 --- a/exporting/pubsub/pubsub_publish.cc +++ /dev/null @@ -1,258 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include <google/pubsub/v1/pubsub.grpc.pb.h> -#include <grpcpp/grpcpp.h> -#include <stdexcept> -#include "pubsub_publish.h" - -#define EVENT_CHECK_TIMEOUT 50 - -struct response { - grpc::ClientContext *context; - google::pubsub::v1::PublishResponse *publish_response; - size_t tag; - grpc::Status *status; - - size_t published_metrics; - size_t published_bytes; -}; - -static inline void copy_error_message(char *error_message_dst, const char *error_message_src) -{ - std::strncpy(error_message_dst, error_message_src, ERROR_LINE_MAX); - error_message_dst[ERROR_LINE_MAX] = '\0'; -} - -/** - * Initialize a Pub/Sub client and a data structure for responses. - * - * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. - * @param error_message report error message to a caller. - * @param destination a Pub/Sub service endpoint. - * @param credentials_file a full path for a file with google application credentials. - * @param project_id a project ID. - * @param topic_id a topic ID. - * @return Returns 0 on success, 1 on failure. - */ -int pubsub_init( - void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file, - const char *project_id, const char *topic_id) -{ - struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; - - try { - setenv("GOOGLE_APPLICATION_CREDENTIALS", credentials_file, 0); - - std::shared_ptr<grpc::ChannelCredentials> credentials = grpc::GoogleDefaultCredentials(); - if (credentials == nullptr) { - copy_error_message(error_message, "Can't load credentials"); - return 1; - } - - std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(destination, credentials); - - google::pubsub::v1::Publisher::Stub *stub = new google::pubsub::v1::Publisher::Stub(channel); - if (!stub) { - copy_error_message(error_message, "Can't create a publisher stub"); - return 1; - } - - connector_specific_data->stub = stub; - - google::pubsub::v1::PublishRequest *request = new google::pubsub::v1::PublishRequest; - connector_specific_data->request = request; - ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request)) - ->set_topic(std::string("projects/") + project_id + "/topics/" + topic_id); - - grpc::CompletionQueue *cq = new grpc::CompletionQueue; - connector_specific_data->completion_queue = cq; - - connector_specific_data->responses = new std::list<struct response>; - - return 0; - } catch (std::exception const &ex) { - std::string em(std::string("Standard exception raised: ") + ex.what()); - copy_error_message(error_message, em.c_str()); - return 1; - } - - return 0; -} - -/** - * Clean the PubSub connector instance specific data - */ -void pubsub_cleanup(void *pubsub_specific_data_p) -{ - struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; - - std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses; - std::list<struct response>::iterator response; - for (response = responses->begin(); response != responses->end(); ++response) { - // TODO: If we do this, there are a huge amount of possibly lost records. We need to find a right way of - // cleaning up contexts - // delete response->context; - delete response->publish_response; - delete response->status; - } - delete responses; - - ((grpc::CompletionQueue *)connector_specific_data->completion_queue)->Shutdown(); - delete (grpc::CompletionQueue *)connector_specific_data->completion_queue; - delete (google::pubsub::v1::PublishRequest *)connector_specific_data->request; - delete (google::pubsub::v1::Publisher::Stub *)connector_specific_data->stub; - - // TODO: Find how to shutdown grpc gracefully. grpc_shutdown() doesn't seem to work. - // grpc_shutdown(); - - return; -} - -/** - * Add data to a Pub/Sub request message. - * - * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. - * @param data a text buffer with metrics. - * @return Returns 0 on success, 1 on failure. - */ -int pubsub_add_message(void *pubsub_specific_data_p, char *data) -{ - struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; - - try { - google::pubsub::v1::PubsubMessage *message = - ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->add_messages(); - if (!message) - return 1; - - message->set_data(data); - } catch (std::exception const &ex) { - return 1; - } - - return 0; -} - -/** - * Send data to the Pub/Sub service - * - * @param pubsub_specific_data_p a pointer to a structure with client and request outcome information. - * @param error_message report error message to a caller. - * @param buffered_metrics the number of metrics we are going to send. - * @param buffered_bytes the number of bytes we are going to send. - * @return Returns 0 on success, 1 on failure. - */ -int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes) -{ - struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; - - try { - grpc::ClientContext *context = new grpc::ClientContext; - - std::unique_ptr<grpc::ClientAsyncResponseReader<google::pubsub::v1::PublishResponse> > rpc( - ((google::pubsub::v1::Publisher::Stub *)(connector_specific_data->stub)) - ->AsyncPublish( - context, (*(google::pubsub::v1::PublishRequest *)(connector_specific_data->request)), - ((grpc::CompletionQueue *)(connector_specific_data->completion_queue)))); - - struct response response; - response.context = context; - response.publish_response = new google::pubsub::v1::PublishResponse; - response.tag = connector_specific_data->last_tag++; - response.status = new grpc::Status; - response.published_metrics = buffered_metrics; - response.published_bytes = buffered_bytes; - - rpc->Finish(response.publish_response, response.status, (void *)response.tag); - - ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->clear_messages(); - - ((std::list<struct response> *)(connector_specific_data->responses))->push_back(response); - } catch (std::exception const &ex) { - std::string em(std::string("Standard exception raised: ") + ex.what()); - copy_error_message(error_message, em.c_str()); - return 1; - } - - return 0; -} - -/** - * Get results from service responses - * - * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. - * @param error_message report error message to a caller. - * @param sent_metrics report to a caller how many metrics was successfully sent. - * @param sent_bytes report to a caller how many bytes was successfully sent. - * @param lost_metrics report to a caller how many metrics was lost during transmission. - * @param lost_bytes report to a caller how many bytes was lost during transmission. - * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission. - */ -int pubsub_get_result( - void *pubsub_specific_data_p, char *error_message, - size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes) -{ - struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; - std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses; - grpc::CompletionQueue::NextStatus next_status; - - *sent_metrics = 0; - *sent_bytes = 0; - *lost_metrics = 0; - *lost_bytes = 0; - - try { - do { - std::list<struct response>::iterator response; - void *got_tag; - bool ok = false; - - auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(50); - next_status = (*(grpc::CompletionQueue *)(connector_specific_data->completion_queue)) - .AsyncNext(&got_tag, &ok, deadline); - - if (next_status == grpc::CompletionQueue::GOT_EVENT) { - for (response = responses->begin(); response != responses->end(); ++response) { - if ((void *)response->tag == got_tag) - break; - } - - if (response == responses->end()) { - copy_error_message(error_message, "Cannot get Pub/Sub response"); - return 1; - } - - if (ok && response->publish_response->message_ids_size()) { - *sent_metrics += response->published_metrics; - *sent_bytes += response->published_bytes; - } else { - *lost_metrics += response->published_metrics; - *lost_bytes += response->published_bytes; - response->status->error_message().copy(error_message, ERROR_LINE_MAX); - error_message[ERROR_LINE_MAX] = '\0'; - } - - delete response->context; - delete response->publish_response; - delete response->status; - responses->erase(response); - } - - if (next_status == grpc::CompletionQueue::SHUTDOWN) { - copy_error_message(error_message, "Completion queue shutdown"); - return 1; - } - - } while (next_status == grpc::CompletionQueue::GOT_EVENT); - } catch (std::exception const &ex) { - std::string em(std::string("Standard exception raised: ") + ex.what()); - copy_error_message(error_message, em.c_str()); - return 1; - } - - if (*lost_metrics) { - return 1; - } - - return 0; -} diff --git a/exporting/pubsub/pubsub_publish.h b/exporting/pubsub/pubsub_publish.h deleted file mode 100644 index 567a262f..00000000 --- a/exporting/pubsub/pubsub_publish.h +++ /dev/null @@ -1,37 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_EXPORTING_PUBSUB_PUBLISH_H -#define NETDATA_EXPORTING_PUBSUB_PUBLISH_H - -#define ERROR_LINE_MAX 1023 - -#ifdef __cplusplus -extern "C" { -#endif - -struct pubsub_specific_data { - void *stub; - void *request; - void *completion_queue; - - void *responses; - size_t last_tag; -}; - -int pubsub_init( - void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file, - const char *project_id, const char *topic_id); -void pubsub_cleanup(void *pubsub_specific_data_p); - -int pubsub_add_message(void *pubsub_specific_data_p, char *data); - -int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes); -int pubsub_get_result( - void *pubsub_specific_data_p, char *error_message, - size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes); - -#ifdef __cplusplus -} -#endif - -#endif //NETDATA_EXPORTING_PUBSUB_PUBLISH_H |