// SPDX-License-Identifier: GPL-3.0-or-later #include #include #include #include "pubsub_publish.h" #define EVENT_CHECK_TIMEOUT 50 struct response { grpc::ClientContext *context; google::pubsub::v1::PublishResponse *publish_response; size_t tag; grpc::Status *status; size_t published_metrics; size_t published_bytes; }; static inline void copy_error_message(char *error_message_dst, const char *error_message_src) { std::strncpy(error_message_dst, error_message_src, ERROR_LINE_MAX); error_message_dst[ERROR_LINE_MAX] = '\0'; } /** * Initialize a Pub/Sub client and a data structure for responses. * * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. * @param error_message report error message to a caller. * @param destination a Pub/Sub service endpoint. * @param credentials_file a full path for a file with google application credentials. * @param project_id a project ID. * @param topic_id a topic ID. * @return Returns 0 on success, 1 on failure. */ int pubsub_init( void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file, const char *project_id, const char *topic_id) { struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; try { setenv("GOOGLE_APPLICATION_CREDENTIALS", credentials_file, 0); std::shared_ptr credentials = grpc::GoogleDefaultCredentials(); if (credentials == nullptr) { copy_error_message(error_message, "Can't load credentials"); return 1; } std::shared_ptr channel = grpc::CreateChannel(destination, credentials); google::pubsub::v1::Publisher::Stub *stub = new google::pubsub::v1::Publisher::Stub(channel); if (!stub) { copy_error_message(error_message, "Can't create a publisher stub"); return 1; } connector_specific_data->stub = stub; google::pubsub::v1::PublishRequest *request = new google::pubsub::v1::PublishRequest; connector_specific_data->request = request; ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request)) ->set_topic(std::string("projects/") + project_id + "/topics/" + topic_id); grpc::CompletionQueue *cq = new grpc::CompletionQueue; connector_specific_data->completion_queue = cq; connector_specific_data->responses = new std::list; return 0; } catch (std::exception const &ex) { std::string em(std::string("Standard exception raised: ") + ex.what()); copy_error_message(error_message, em.c_str()); return 1; } return 0; } /** * Clean the PubSub connector instance specific data */ void pubsub_cleanup(void *pubsub_specific_data_p) { struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; std::list *responses = (std::list *)connector_specific_data->responses; std::list::iterator response; for (response = responses->begin(); response != responses->end(); ++response) { // TODO: If we do this, there are a huge amount of possibly lost records. We need to find a right way of // cleaning up contexts // delete response->context; delete response->publish_response; delete response->status; } delete responses; ((grpc::CompletionQueue *)connector_specific_data->completion_queue)->Shutdown(); delete (grpc::CompletionQueue *)connector_specific_data->completion_queue; delete (google::pubsub::v1::PublishRequest *)connector_specific_data->request; delete (google::pubsub::v1::Publisher::Stub *)connector_specific_data->stub; // TODO: Find how to shutdown grpc gracefully. grpc_shutdown() doesn't seem to work. // grpc_shutdown(); return; } /** * Add data to a Pub/Sub request message. * * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. * @param data a text buffer with metrics. * @return Returns 0 on success, 1 on failure. */ int pubsub_add_message(void *pubsub_specific_data_p, char *data) { struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; try { google::pubsub::v1::PubsubMessage *message = ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->add_messages(); if (!message) return 1; message->set_data(data); } catch (std::exception const &ex) { return 1; } return 0; } /** * Send data to the Pub/Sub service * * @param pubsub_specific_data_p a pointer to a structure with client and request outcome information. * @param error_message report error message to a caller. * @param buffered_metrics the number of metrics we are going to send. * @param buffered_bytes the number of bytes we are going to send. * @return Returns 0 on success, 1 on failure. */ int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes) { struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; try { grpc::ClientContext *context = new grpc::ClientContext; std::unique_ptr > rpc( ((google::pubsub::v1::Publisher::Stub *)(connector_specific_data->stub)) ->AsyncPublish( context, (*(google::pubsub::v1::PublishRequest *)(connector_specific_data->request)), ((grpc::CompletionQueue *)(connector_specific_data->completion_queue)))); struct response response; response.context = context; response.publish_response = new google::pubsub::v1::PublishResponse; response.tag = connector_specific_data->last_tag++; response.status = new grpc::Status; response.published_metrics = buffered_metrics; response.published_bytes = buffered_bytes; rpc->Finish(response.publish_response, response.status, (void *)response.tag); ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->clear_messages(); ((std::list *)(connector_specific_data->responses))->push_back(response); } catch (std::exception const &ex) { std::string em(std::string("Standard exception raised: ") + ex.what()); copy_error_message(error_message, em.c_str()); return 1; } return 0; } /** * Get results from service responses * * @param pubsub_specific_data_p a pointer to a structure with instance-wide data. * @param error_message report error message to a caller. * @param sent_metrics report to a caller how many metrics was successfully sent. * @param sent_bytes report to a caller how many bytes was successfully sent. * @param lost_metrics report to a caller how many metrics was lost during transmission. * @param lost_bytes report to a caller how many bytes was lost during transmission. * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission. */ int pubsub_get_result( void *pubsub_specific_data_p, char *error_message, size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes) { struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p; std::list *responses = (std::list *)connector_specific_data->responses; grpc::CompletionQueue::NextStatus next_status; *sent_metrics = 0; *sent_bytes = 0; *lost_metrics = 0; *lost_bytes = 0; try { do { std::list::iterator response; void *got_tag; bool ok = false; auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(50); next_status = (*(grpc::CompletionQueue *)(connector_specific_data->completion_queue)) .AsyncNext(&got_tag, &ok, deadline); if (next_status == grpc::CompletionQueue::GOT_EVENT) { for (response = responses->begin(); response != responses->end(); ++response) { if ((void *)response->tag == got_tag) break; } if (response == responses->end()) { copy_error_message(error_message, "Cannot get Pub/Sub response"); return 1; } if (ok && response->publish_response->message_ids_size()) { *sent_metrics += response->published_metrics; *sent_bytes += response->published_bytes; } else { *lost_metrics += response->published_metrics; *lost_bytes += response->published_bytes; response->status->error_message().copy(error_message, ERROR_LINE_MAX); error_message[ERROR_LINE_MAX] = '\0'; } delete response->context; delete response->publish_response; delete response->status; responses->erase(response); } if (next_status == grpc::CompletionQueue::SHUTDOWN) { copy_error_message(error_message, "Completion queue shutdown"); return 1; } } while (next_status == grpc::CompletionQueue::GOT_EVENT); } catch (std::exception const &ex) { std::string em(std::string("Standard exception raised: ") + ex.what()); copy_error_message(error_message, em.c_str()); return 1; } if (*lost_metrics) { return 1; } return 0; }