diff options
Diffstat (limited to 'exporting/aws_kinesis/aws_kinesis.c')
-rw-r--r-- | exporting/aws_kinesis/aws_kinesis.c | 219 |
1 files changed, 0 insertions, 219 deletions
diff --git a/exporting/aws_kinesis/aws_kinesis.c b/exporting/aws_kinesis/aws_kinesis.c deleted file mode 100644 index 498d9ee23..000000000 --- a/exporting/aws_kinesis/aws_kinesis.c +++ /dev/null @@ -1,219 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "aws_kinesis.h" - -/** - * Clean AWS Kinesis * - */ -void aws_kinesis_cleanup(struct instance *instance) -{ - netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name); - kinesis_shutdown(instance->connector_specific_data); - - freez(instance->connector_specific_data); - - struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config; - if (connector_specific_config) { - freez(connector_specific_config->auth_key_id); - freez(connector_specific_config->secure_key); - freez(connector_specific_config->stream_name); - - freez(connector_specific_config); - } - - netdata_log_info("EXPORTING: instance %s exited", instance->config.name); - instance->exited = 1; -} - -/** - * Initialize AWS Kinesis connector instance - * - * @param instance an instance data structure. - * @return Returns 0 on success, 1 on failure. - */ -int init_aws_kinesis_instance(struct instance *instance) -{ - instance->worker = aws_kinesis_connector_worker; - - instance->start_batch_formatting = NULL; - instance->start_host_formatting = format_host_labels_json_plaintext; - instance->start_chart_formatting = NULL; - - if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) - instance->metric_formatting = format_dimension_collected_json_plaintext; - else - instance->metric_formatting = format_dimension_stored_json_plaintext; - - instance->end_chart_formatting = NULL; - instance->variables_formatting = NULL; - instance->end_host_formatting = flush_host_labels; - instance->end_batch_formatting = NULL; - - instance->prepare_header = NULL; - instance->check_response = NULL; - - instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters); - if (!instance->buffer) { - netdata_log_error("EXPORTING: cannot create buffer for AWS Kinesis exporting connector instance %s", - instance->config.name); - return 1; - } - if (uv_mutex_init(&instance->mutex)) - return 1; - if (uv_cond_init(&instance->cond_var)) - return 1; - - if (!instance->engine->aws_sdk_initialized) { - aws_sdk_init(); - instance->engine->aws_sdk_initialized = 1; - } - - struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config; - struct aws_kinesis_specific_data *connector_specific_data = callocz(1, sizeof(struct aws_kinesis_specific_data)); - instance->connector_specific_data = (void *)connector_specific_data; - - if (!strcmp(connector_specific_config->stream_name, "")) { - netdata_log_error("stream name is a mandatory Kinesis parameter but it is not configured"); - return 1; - } - - kinesis_init( - (void *)connector_specific_data, - instance->config.destination, - connector_specific_config->auth_key_id, - connector_specific_config->secure_key, - instance->config.timeoutms); - - return 0; -} - -/** - * AWS Kinesis connector worker - * - * Runs in a separate thread for every instance. - * - * @param instance_p an instance data structure. - */ -void aws_kinesis_connector_worker(void *instance_p) -{ - struct instance *instance = (struct instance *)instance_p; - struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config; - struct aws_kinesis_specific_data *connector_specific_data = instance->connector_specific_data; - - while (!instance->engine->exit) { - unsigned long long partition_key_seq = 0; - struct stats *stats = &instance->stats; - - uv_mutex_lock(&instance->mutex); - while (!instance->data_is_ready) - uv_cond_wait(&instance->cond_var, &instance->mutex); - instance->data_is_ready = 0; - - if (unlikely(instance->engine->exit)) { - uv_mutex_unlock(&instance->mutex); - break; - } - - // reset the monitoring chart counters - stats->received_bytes = - stats->sent_bytes = - stats->sent_metrics = - stats->lost_metrics = - stats->receptions = - stats->transmission_successes = - stats->transmission_failures = - stats->data_lost_events = - stats->lost_bytes = - stats->reconnects = 0; - - BUFFER *buffer = (BUFFER *)instance->buffer; - size_t buffer_len = buffer_strlen(buffer); - - stats->buffered_bytes = buffer_len; - - size_t sent = 0; - - while (sent < buffer_len) { - char partition_key[KINESIS_PARTITION_KEY_MAX + 1]; - snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++); - size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX); - - const char *first_char = buffer_tostring(buffer) + sent; - - size_t record_len = 0; - - // split buffer into chunks of maximum allowed size - if (buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) { - record_len = buffer_len - sent; - } else { - record_len = KINESIS_RECORD_MAX - partition_key_len; - while (record_len && *(first_char + record_len - 1) != '\n') - record_len--; - } - char error_message[ERROR_LINE_MAX + 1] = ""; - - netdata_log_debug(D_EXPORTING, - "EXPORTING: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, " - "buffer = %zu, record = %zu", - instance->config.destination, - connector_specific_config->auth_key_id, - connector_specific_config->secure_key, - connector_specific_config->stream_name, - partition_key, - buffer_len, - record_len); - - kinesis_put_record( - connector_specific_data, connector_specific_config->stream_name, partition_key, first_char, record_len); - - sent += record_len; - stats->transmission_successes++; - - size_t sent_bytes = 0, lost_bytes = 0; - - if (unlikely(kinesis_get_result( - connector_specific_data->request_outcomes, error_message, &sent_bytes, &lost_bytes))) { - // oops! we couldn't send (all or some of the) data - netdata_log_error("EXPORTING: %s", error_message); - netdata_log_error("EXPORTING: failed to write data to external database '%s'. Willing to write %zu bytes, wrote %zu bytes.", - instance->config.destination, - sent_bytes, - sent_bytes - lost_bytes); - - stats->transmission_failures++; - stats->data_lost_events++; - stats->lost_bytes += lost_bytes; - - // estimate the number of lost metrics - stats->lost_metrics += (collected_number)( - stats->buffered_metrics * - (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1)); - - break; - } else { - stats->receptions++; - } - - if (unlikely(instance->engine->exit)) - break; - } - - stats->sent_bytes += sent; - if (likely(sent == buffer_len)) - stats->sent_metrics = stats->buffered_metrics; - - buffer_flush(buffer); - - send_internal_metrics(instance); - - stats->buffered_metrics = 0; - - uv_mutex_unlock(&instance->mutex); - -#ifdef UNIT_TESTING - return; -#endif - } - - aws_kinesis_cleanup(instance); -} |