From 5da14042f70711ea5cf66e034699730335462f66 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 14:08:03 +0200 Subject: Merging upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- src/exporting/pubsub/README.md | 1 + .../pubsub/integrations/google_cloud_pub_sub.md | 145 ++++++++++++ src/exporting/pubsub/metadata.yaml | 152 ++++++++++++ src/exporting/pubsub/pubsub.c | 195 ++++++++++++++++ src/exporting/pubsub/pubsub.h | 14 ++ src/exporting/pubsub/pubsub_publish.cc | 258 +++++++++++++++++++++ src/exporting/pubsub/pubsub_publish.h | 37 +++ 7 files changed, 802 insertions(+) create mode 120000 src/exporting/pubsub/README.md create mode 100644 src/exporting/pubsub/integrations/google_cloud_pub_sub.md create mode 100644 src/exporting/pubsub/metadata.yaml create mode 100644 src/exporting/pubsub/pubsub.c create mode 100644 src/exporting/pubsub/pubsub.h create mode 100644 src/exporting/pubsub/pubsub_publish.cc create mode 100644 src/exporting/pubsub/pubsub_publish.h (limited to 'src/exporting/pubsub') diff --git a/src/exporting/pubsub/README.md b/src/exporting/pubsub/README.md new file mode 120000 index 000000000..8633f1725 --- /dev/null +++ b/src/exporting/pubsub/README.md @@ -0,0 +1 @@ +integrations/google_cloud_pub_sub.md \ No newline at end of file diff --git a/src/exporting/pubsub/integrations/google_cloud_pub_sub.md b/src/exporting/pubsub/integrations/google_cloud_pub_sub.md new file mode 100644 index 000000000..ce2ba2865 --- /dev/null +++ b/src/exporting/pubsub/integrations/google_cloud_pub_sub.md @@ -0,0 +1,145 @@ + + +# Google Cloud Pub Sub + + + + + +Export metrics to Google Cloud Pub/Sub Service + + + + + +## Setup + +### Prerequisites + +#### + +- First [install](https://github.com/googleapis/google-cloud-cpp/) install Google Cloud Platform C++ Client Libraries +- Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc` +- Next, Netdata should be re-installed from the source. The installer will detect that the required libraries are now available. + + + +### Configuration + +#### File + +The configuration file name for this integration is `exporting.conf`. + + +You can edit the configuration file using the `edit-config` script from the +Netdata [config directory](https://github.com/netdata/netdata/blob/master/docs/netdata-agent/configuration.md#the-netdata-config-directory). + +```bash +cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata +sudo ./edit-config exporting.conf +``` +#### Options + +The following options can be defined for this exporter. + + +
Config options + +| Name | Description | Default | Required | +|:----|:-----------|:-------|:--------:| +| enabled | Enables or disables an exporting connector instance (yes/no). | no | yes | +| destination | Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics. | pubsub.googleapis.com | yes | +| username | Username for HTTP authentication | my_username | no | +| password | Password for HTTP authentication | my_password | no | +| data source | Selects the kind of data that will be sent to the external database. (as collected/average/sum) | | no | +| hostname | The hostname to be used for sending data to the external database server. | [global].hostname | no | +| prefix | The prefix to add to all metrics. | Netdata | no | +| update every | Frequency of sending sending data to the external database, in seconds. | 10 | no | +| buffer on failures | The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. | 10 | no | +| timeout ms | The timeout in milliseconds to wait for the external database server to process the data. | 2 * update_every * 1000 | no | +| send hosts matching | Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/src/libnetdata/simple_pattern#simple-patterns). | localhost * | no | +| send charts matching | One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. | * | no | +| send names instead of ids | Controls the metric names Netdata should send to the external database (yes/no). | | no | +| send configured labels | Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes/no). | | no | +| send automatic labels | Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes/no). | | no | + +##### destination + +The format of each item in this list, is: [PROTOCOL:]IP[:PORT]. +- PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine. +- IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port. +- PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used. + +Example IPv4: + ```yaml + destination = pubsub.googleapis.com + ``` +When multiple servers are defined, Netdata will try the next one when the previous one fails. + + +##### update every + +Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers +send data to the same database. This randomness does not affect the quality of the data, only the time they are sent. + + +##### buffer on failures + +If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it). + + +##### send hosts matching + +Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern). +The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to +filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts. + +A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`, +use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative). + + +##### send charts matching + +A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads, +use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used, +positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter +has a higher priority than the configuration option. + + +##### send names instead of ids + +Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names +are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are +different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc. + + +
+ +#### Examples + +##### Basic configuration + +- Set the destination option to a Pub/Sub service endpoint. pubsub.googleapis.com is the default one. +- Create the credentials JSON file by following Google Cloud's authentication guide. +- The user running the Agent (typically netdata) needs read access to google_cloud_credentials.json, which you can set + `chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json` +- Set the credentials file option to the full path of the file. + + +```yaml +[pubsub:my_instance] + enabled = yes + destination = pubsub.googleapis.com + credentials file = /etc/netdata/google_cloud_credentials.json + project id = my_project + topic id = my_topic + +``` + diff --git a/src/exporting/pubsub/metadata.yaml b/src/exporting/pubsub/metadata.yaml new file mode 100644 index 000000000..48ec02c47 --- /dev/null +++ b/src/exporting/pubsub/metadata.yaml @@ -0,0 +1,152 @@ +# yamllint disable rule:line-length +--- +id: 'export-google-pubsub' +meta: + name: 'Google Cloud Pub Sub' + link: 'https://cloud.google.com/pubsub' + categories: + - export + icon_filename: 'pubsub.png' +keywords: + - exporter + - Google Cloud + - Pub Sub +overview: + exporter_description: | + Export metrics to Google Cloud Pub/Sub Service + exporter_limitations: '' +setup: + prerequisites: + list: + - title: '' + description: | + - First [install](https://github.com/googleapis/google-cloud-cpp/) install Google Cloud Platform C++ Client Libraries + - Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc` + - Next, Netdata should be re-installed from the source. The installer will detect that the required libraries are now available. + configuration: + file: + name: 'exporting.conf' + options: + description: | + The following options can be defined for this exporter. + folding: + title: 'Config options' + enabled: true + list: + - name: 'enabled' + default_value: 'no' + description: 'Enables or disables an exporting connector instance (yes|no).' + required: true + - name: 'destination' + default_value: 'pubsub.googleapis.com' + description: 'Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics.' + required: true + detailed_description: | + The format of each item in this list, is: [PROTOCOL:]IP[:PORT]. + - PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine. + - IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port. + - PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used. + + Example IPv4: + ```yaml + destination = pubsub.googleapis.com + ``` + When multiple servers are defined, Netdata will try the next one when the previous one fails. + - name: 'username' + default_value: 'my_username' + description: 'Username for HTTP authentication' + required: false + - name: 'password' + default_value: 'my_password' + description: 'Password for HTTP authentication' + required: false + - name: 'data source' + default_value: '' + description: 'Selects the kind of data that will be sent to the external database. (as collected|average|sum)' + required: false + - name: 'hostname' + default_value: '[global].hostname' + description: 'The hostname to be used for sending data to the external database server.' + required: false + - name: 'prefix' + default_value: 'Netdata' + description: 'The prefix to add to all metrics.' + required: false + - name: 'update every' + default_value: '10' + description: | + Frequency of sending sending data to the external database, in seconds. + required: false + detailed_description: | + Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers + send data to the same database. This randomness does not affect the quality of the data, only the time they are sent. + - name: 'buffer on failures' + default_value: '10' + description: | + The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. + required: false + detailed_description: | + If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it). + - name: 'timeout ms' + default_value: '2 * update_every * 1000' + description: 'The timeout in milliseconds to wait for the external database server to process the data.' + required: false + - name: 'send hosts matching' + default_value: 'localhost *' + description: | + Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/src/libnetdata/simple_pattern#simple-patterns). + required: false + detailed_description: | + Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern). + The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to + filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts. + + A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`, + use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative). + - name: 'send charts matching' + default_value: '*' + description: | + One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. + required: false + detailed_description: | + A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads, + use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used, + positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter + has a higher priority than the configuration option. + - name: 'send names instead of ids' + default_value: '' + description: 'Controls the metric names Netdata should send to the external database (yes|no).' + required: false + detailed_description: | + Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names + are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are + different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc. + - name: 'send configured labels' + default_value: '' + description: 'Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes|no).' + required: false + - name: 'send automatic labels' + default_value: '' + description: 'Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes|no).' + required: false + examples: + folding: + enabled: true + title: '' + list: + - name: 'Basic configuration' + folding: + enabled: false + description: | + - Set the destination option to a Pub/Sub service endpoint. pubsub.googleapis.com is the default one. + - Create the credentials JSON file by following Google Cloud's authentication guide. + - The user running the Agent (typically netdata) needs read access to google_cloud_credentials.json, which you can set + `chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json` + - Set the credentials file option to the full path of the file. + config: | + [pubsub:my_instance] + enabled = yes + destination = pubsub.googleapis.com + credentials file = /etc/netdata/google_cloud_credentials.json + project id = my_project + topic id = my_topic diff --git a/src/exporting/pubsub/pubsub.c b/src/exporting/pubsub/pubsub.c new file mode 100644 index 000000000..4989160a4 --- /dev/null +++ b/src/exporting/pubsub/pubsub.c @@ -0,0 +1,195 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "pubsub.h" + +/** + * Initialize Pub/Sub connector instance + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int init_pubsub_instance(struct instance *instance) +{ + instance->worker = pubsub_connector_worker; + + instance->start_batch_formatting = NULL; + instance->start_host_formatting = format_host_labels_json_plaintext; + instance->start_chart_formatting = NULL; + + + if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) + instance->metric_formatting = format_dimension_collected_json_plaintext; + else + instance->metric_formatting = format_dimension_stored_json_plaintext; + + instance->end_chart_formatting = NULL; + instance->variables_formatting = NULL; + instance->end_host_formatting = flush_host_labels; + instance->end_batch_formatting = NULL; + + instance->prepare_header = NULL; + instance->check_response = NULL; + + instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters); + if (!instance->buffer) { + netdata_log_error("EXPORTING: cannot create buffer for Pub/Sub exporting connector instance %s", instance->config.name); + return 1; + } + uv_mutex_init(&instance->mutex); + uv_cond_init(&instance->cond_var); + + struct pubsub_specific_data *connector_specific_data = callocz(1, sizeof(struct pubsub_specific_data)); + instance->connector_specific_data = (void *)connector_specific_data; + + struct pubsub_specific_config *connector_specific_config = + (struct pubsub_specific_config *)instance->config.connector_specific_config; + char error_message[ERROR_LINE_MAX + 1] = ""; + if (pubsub_init( + (void *)connector_specific_data, error_message, instance->config.destination, + connector_specific_config->credentials_file, connector_specific_config->project_id, + connector_specific_config->topic_id)) { + netdata_log_error( + "EXPORTING: Cannot initialize a Pub/Sub publisher for instance %s: %s", + instance->config.name, error_message); + return 1; + } + + return 0; +} + +/** + * Clean a PubSub connector instance + * + * @param instance an instance data structure. + */ +void clean_pubsub_instance(struct instance *instance) +{ + netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name); + + struct pubsub_specific_data *connector_specific_data = + (struct pubsub_specific_data *)instance->connector_specific_data; + pubsub_cleanup(connector_specific_data); + freez(connector_specific_data); + + buffer_free(instance->buffer); + + struct pubsub_specific_config *connector_specific_config = + (struct pubsub_specific_config *)instance->config.connector_specific_config; + freez(connector_specific_config->credentials_file); + freez(connector_specific_config->project_id); + freez(connector_specific_config->topic_id); + freez(connector_specific_config); + + netdata_log_info("EXPORTING: instance %s exited", instance->config.name); + instance->exited = 1; + + return; +} + +/** + * Pub/Sub connector worker + * + * Runs in a separate thread for every instance. + * + * @param instance_p an instance data structure. + */ +void pubsub_connector_worker(void *instance_p) +{ + struct instance *instance = (struct instance *)instance_p; + struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config; + struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data; + + while (!instance->engine->exit) { + struct stats *stats = &instance->stats; + char error_message[ERROR_LINE_MAX + 1] = ""; + + uv_mutex_lock(&instance->mutex); + while (!instance->data_is_ready) + uv_cond_wait(&instance->cond_var, &instance->mutex); + instance->data_is_ready = 0; + + + if (unlikely(instance->engine->exit)) { + uv_mutex_unlock(&instance->mutex); + break; + } + + // reset the monitoring chart counters + stats->received_bytes = + stats->sent_bytes = + stats->sent_metrics = + stats->lost_metrics = + stats->receptions = + stats->transmission_successes = + stats->transmission_failures = + stats->data_lost_events = + stats->lost_bytes = + stats->reconnects = 0; + + BUFFER *buffer = (BUFFER *)instance->buffer; + size_t buffer_len = buffer_strlen(buffer); + + stats->buffered_bytes = buffer_len; + + if (pubsub_add_message(instance->connector_specific_data, (char *)buffer_tostring(buffer))) { + netdata_log_error("EXPORTING: Instance %s: Cannot add data to a message", instance->config.name); + + stats->data_lost_events++; + stats->lost_metrics += stats->buffered_metrics; + stats->lost_bytes += buffer_len; + + goto cleanup; + } + + netdata_log_debug( + D_EXPORTING, "EXPORTING: pubsub_publish(): project = %s, topic = %s, buffer = %zu", + connector_specific_config->project_id, connector_specific_config->topic_id, buffer_len); + + if (pubsub_publish((void *)connector_specific_data, error_message, stats->buffered_metrics, buffer_len)) { + netdata_log_error("EXPORTING: Instance: %s: Cannot publish a message: %s", instance->config.name, error_message); + + stats->transmission_failures++; + stats->data_lost_events++; + stats->lost_metrics += stats->buffered_metrics; + stats->lost_bytes += buffer_len; + + goto cleanup; + } + + stats->sent_bytes = buffer_len; + stats->transmission_successes++; + + size_t sent_metrics = 0, lost_metrics = 0, sent_bytes = 0, lost_bytes = 0; + + if (unlikely(pubsub_get_result( + connector_specific_data, error_message, &sent_metrics, &sent_bytes, &lost_metrics, &lost_bytes))) { + // oops! we couldn't send (all or some of the) data + netdata_log_error("EXPORTING: %s", error_message); + netdata_log_error( + "EXPORTING: failed to write data to service '%s'. Willing to write %zu bytes, wrote %zu bytes.", + instance->config.destination, lost_bytes, sent_bytes); + + stats->transmission_failures++; + stats->data_lost_events++; + stats->lost_metrics += lost_metrics; + stats->lost_bytes += lost_bytes; + } else { + stats->receptions++; + stats->sent_metrics = sent_metrics; + } + + cleanup: + send_internal_metrics(instance); + + buffer_flush(buffer); + stats->buffered_metrics = 0; + + uv_mutex_unlock(&instance->mutex); + +#ifdef UNIT_TESTING + return; +#endif + } + + clean_pubsub_instance(instance); +} diff --git a/src/exporting/pubsub/pubsub.h b/src/exporting/pubsub/pubsub.h new file mode 100644 index 000000000..0bcb76f9b --- /dev/null +++ b/src/exporting/pubsub/pubsub.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_PUBSUB_H +#define NETDATA_EXPORTING_PUBSUB_H + +#include "exporting/exporting_engine.h" +#include "exporting/json/json.h" +#include "pubsub_publish.h" + +int init_pubsub_instance(struct instance *instance); +void clean_pubsub_instance(struct instance *instance); +void pubsub_connector_worker(void *instance_p); + +#endif //NETDATA_EXPORTING_PUBSUB_H diff --git a/src/exporting/pubsub/pubsub_publish.cc b/src/exporting/pubsub/pubsub_publish.cc new file mode 100644 index 000000000..cc14154f8 --- /dev/null +++ b/src/exporting/pubsub/pubsub_publish.cc @@ -0,0 +1,258 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include +#include +#include "pubsub_publish.h" + +#define EVENT_CHECK_TIMEOUT 50 + +struct response { + grpc::ClientContext *context; + google::pubsub::v1::PublishResponse *publish_response; + size_t tag; + grpc::Status *status; + + size_t published_metrics; + size_t published_bytes; +}; + +static inline void copy_error_message(char *error_message_dst, const char *error_message_src) +{ + std::strncpy(error_message_dst, error_message_src, ERROR_LINE_MAX); + error_message_dst[ERROR_LINE_MAX] = '\0'; +} + +/** + * Initialize a Pub/Sub client and a data structure for responses. + * + * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. + * @param error_message report error message to a caller. + * @param destination a Pub/Sub service endpoint. + * @param credentials_file a full path for a file with google application credentials. + * @param project_id a project ID. + * @param topic_id a topic ID. + * @return Returns 0 on success, 1 on failure. + */ +int pubsub_init( + void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file, + const char *project_id, const char *topic_id) +{ + struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; + + try { + setenv("GOOGLE_APPLICATION_CREDENTIALS", credentials_file, 0); + + std::shared_ptr credentials = grpc::GoogleDefaultCredentials(); + if (credentials == nullptr) { + copy_error_message(error_message, "Can't load credentials"); + return 1; + } + + std::shared_ptr channel = grpc::CreateChannel(destination, credentials); + + google::pubsub::v1::Publisher::Stub *stub = new google::pubsub::v1::Publisher::Stub(channel); + if (!stub) { + copy_error_message(error_message, "Can't create a publisher stub"); + return 1; + } + + connector_specific_data->stub = stub; + + google::pubsub::v1::PublishRequest *request = new google::pubsub::v1::PublishRequest; + connector_specific_data->request = request; + ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request)) + ->set_topic(std::string("projects/") + project_id + "/topics/" + topic_id); + + grpc::CompletionQueue *cq = new grpc::CompletionQueue; + connector_specific_data->completion_queue = cq; + + connector_specific_data->responses = new std::list; + + return 0; + } catch (std::exception const &ex) { + std::string em(std::string("Standard exception raised: ") + ex.what()); + copy_error_message(error_message, em.c_str()); + return 1; + } + + return 0; +} + +/** + * Clean the PubSub connector instance specific data + */ +void pubsub_cleanup(void *pubsub_specific_data_p) +{ + struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; + + std::list *responses = (std::list *)connector_specific_data->responses; + std::list::iterator response; + for (response = responses->begin(); response != responses->end(); ++response) { + // TODO: If we do this, there are a huge amount of possibly lost records. We need to find a right way of + // cleaning up contexts + // delete response->context; + delete response->publish_response; + delete response->status; + } + delete responses; + + ((grpc::CompletionQueue *)connector_specific_data->completion_queue)->Shutdown(); + delete (grpc::CompletionQueue *)connector_specific_data->completion_queue; + delete (google::pubsub::v1::PublishRequest *)connector_specific_data->request; + delete (google::pubsub::v1::Publisher::Stub *)connector_specific_data->stub; + + // TODO: Find how to shutdown grpc gracefully. grpc_shutdown() doesn't seem to work. + // grpc_shutdown(); + + return; +} + +/** + * Add data to a Pub/Sub request message. + * + * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. + * @param data a text buffer with metrics. + * @return Returns 0 on success, 1 on failure. + */ +int pubsub_add_message(void *pubsub_specific_data_p, char *data) +{ + struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; + + try { + google::pubsub::v1::PubsubMessage *message = + ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->add_messages(); + if (!message) + return 1; + + message->set_data(data); + } catch (std::exception const &ex) { + return 1; + } + + return 0; +} + +/** + * Send data to the Pub/Sub service + * + * @param pubsub_specific_data_p a pointer to a structure with client and request outcome information. + * @param error_message report error message to a caller. + * @param buffered_metrics the number of metrics we are going to send. + * @param buffered_bytes the number of bytes we are going to send. + * @return Returns 0 on success, 1 on failure. + */ +int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes) +{ + struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; + + try { + grpc::ClientContext *context = new grpc::ClientContext; + + std::unique_ptr > rpc( + ((google::pubsub::v1::Publisher::Stub *)(connector_specific_data->stub)) + ->AsyncPublish( + context, (*(google::pubsub::v1::PublishRequest *)(connector_specific_data->request)), + ((grpc::CompletionQueue *)(connector_specific_data->completion_queue)))); + + struct response response; + response.context = context; + response.publish_response = new google::pubsub::v1::PublishResponse; + response.tag = connector_specific_data->last_tag++; + response.status = new grpc::Status; + response.published_metrics = buffered_metrics; + response.published_bytes = buffered_bytes; + + rpc->Finish(response.publish_response, response.status, (void *)response.tag); + + ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->clear_messages(); + + ((std::list *)(connector_specific_data->responses))->push_back(response); + } catch (std::exception const &ex) { + std::string em(std::string("Standard exception raised: ") + ex.what()); + copy_error_message(error_message, em.c_str()); + return 1; + } + + return 0; +} + +/** + * Get results from service responses + * + * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. + * @param error_message report error message to a caller. + * @param sent_metrics report to a caller how many metrics was successfully sent. + * @param sent_bytes report to a caller how many bytes was successfully sent. + * @param lost_metrics report to a caller how many metrics was lost during transmission. + * @param lost_bytes report to a caller how many bytes was lost during transmission. + * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission. + */ +int pubsub_get_result( + void *pubsub_specific_data_p, char *error_message, + size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes) +{ + struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; + std::list *responses = (std::list *)connector_specific_data->responses; + grpc::CompletionQueue::NextStatus next_status; + + *sent_metrics = 0; + *sent_bytes = 0; + *lost_metrics = 0; + *lost_bytes = 0; + + try { + do { + std::list::iterator response; + void *got_tag; + bool ok = false; + + auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(50); + next_status = (*(grpc::CompletionQueue *)(connector_specific_data->completion_queue)) + .AsyncNext(&got_tag, &ok, deadline); + + if (next_status == grpc::CompletionQueue::GOT_EVENT) { + for (response = responses->begin(); response != responses->end(); ++response) { + if ((void *)response->tag == got_tag) + break; + } + + if (response == responses->end()) { + copy_error_message(error_message, "Cannot get Pub/Sub response"); + return 1; + } + + if (ok && response->publish_response->message_ids_size()) { + *sent_metrics += response->published_metrics; + *sent_bytes += response->published_bytes; + } else { + *lost_metrics += response->published_metrics; + *lost_bytes += response->published_bytes; + response->status->error_message().copy(error_message, ERROR_LINE_MAX); + error_message[ERROR_LINE_MAX] = '\0'; + } + + delete response->context; + delete response->publish_response; + delete response->status; + responses->erase(response); + } + + if (next_status == grpc::CompletionQueue::SHUTDOWN) { + copy_error_message(error_message, "Completion queue shutdown"); + return 1; + } + + } while (next_status == grpc::CompletionQueue::GOT_EVENT); + } catch (std::exception const &ex) { + std::string em(std::string("Standard exception raised: ") + ex.what()); + copy_error_message(error_message, em.c_str()); + return 1; + } + + if (*lost_metrics) { + return 1; + } + + return 0; +} diff --git a/src/exporting/pubsub/pubsub_publish.h b/src/exporting/pubsub/pubsub_publish.h new file mode 100644 index 000000000..567a262f0 --- /dev/null +++ b/src/exporting/pubsub/pubsub_publish.h @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_PUBSUB_PUBLISH_H +#define NETDATA_EXPORTING_PUBSUB_PUBLISH_H + +#define ERROR_LINE_MAX 1023 + +#ifdef __cplusplus +extern "C" { +#endif + +struct pubsub_specific_data { + void *stub; + void *request; + void *completion_queue; + + void *responses; + size_t last_tag; +}; + +int pubsub_init( + void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file, + const char *project_id, const char *topic_id); +void pubsub_cleanup(void *pubsub_specific_data_p); + +int pubsub_add_message(void *pubsub_specific_data_p, char *data); + +int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes); +int pubsub_get_result( + void *pubsub_specific_data_p, char *error_message, + size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes); + +#ifdef __cplusplus +} +#endif + +#endif //NETDATA_EXPORTING_PUBSUB_PUBLISH_H -- cgit v1.2.3