// SPDX-License-Identifier: GPL-3.0-or-later #ifndef NETDATA_EXPORTING_ENGINE_H #define NETDATA_EXPORTING_ENGINE_H 1 #include "daemon/common.h" #include #define exporter_get(section, name, value) expconfig_get(&exporting_config, section, name, value) #define exporter_get_number(section, name, value) expconfig_get_number(&exporting_config, section, name, value) #define exporter_get_boolean(section, name, value) expconfig_get_boolean(&exporting_config, section, name, value) extern struct config exporting_config; #define EXPORTING_UPDATE_EVERY_OPTION_NAME "update every" #define EXPORTING_UPDATE_EVERY_DEFAULT 10 typedef enum exporting_options { EXPORTING_OPTION_NON = 0, EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0), EXPORTING_SOURCE_DATA_AVERAGE = (1 << 1), EXPORTING_SOURCE_DATA_SUM = (1 << 2), EXPORTING_OPTION_SEND_CONFIGURED_LABELS = (1 << 3), EXPORTING_OPTION_SEND_AUTOMATIC_LABELS = (1 << 4), EXPORTING_OPTION_USE_TLS = (1 << 5), EXPORTING_OPTION_SEND_NAMES = (1 << 16), EXPORTING_OPTION_SEND_VARIABLES = (1 << 17) } EXPORTING_OPTIONS; #define EXPORTING_OPTIONS_SOURCE_BITS \ (EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM) #define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) ((exporting_options) & EXPORTING_OPTIONS_SOURCE_BITS) extern EXPORTING_OPTIONS global_exporting_options; extern const char *global_exporting_prefix; #define sending_labels_configured(instance) \ ((instance)->config.options & (EXPORTING_OPTION_SEND_CONFIGURED_LABELS | EXPORTING_OPTION_SEND_AUTOMATIC_LABELS)) #define should_send_label(instance, label_source) \ (((instance)->config.options & EXPORTING_OPTION_SEND_CONFIGURED_LABELS && (label_source)&RRDLABEL_SRC_CONFIG) || \ ((instance)->config.options & EXPORTING_OPTION_SEND_AUTOMATIC_LABELS && (label_source)&RRDLABEL_SRC_AUTO)) #define should_send_variables(instance) ((instance)->config.options & EXPORTING_OPTION_SEND_VARIABLES) typedef enum exporting_connector_types { EXPORTING_CONNECTOR_TYPE_UNKNOWN, // Invalid type EXPORTING_CONNECTOR_TYPE_GRAPHITE, // Send plain text to Graphite EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP, // Send data to Graphite using HTTP API EXPORTING_CONNECTOR_TYPE_JSON, // Send data in JSON format EXPORTING_CONNECTOR_TYPE_JSON_HTTP, // Send data in JSON format using HTTP API EXPORTING_CONNECTOR_TYPE_OPENTSDB, // Send data to OpenTSDB using telnet API EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP, // Send data to OpenTSDB using HTTP API EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE, // Send data using Prometheus remote write protocol EXPORTING_CONNECTOR_TYPE_KINESIS, // Send message to AWS Kinesis EXPORTING_CONNECTOR_TYPE_PUBSUB, // Send message to Google Cloud Pub/Sub EXPORTING_CONNECTOR_TYPE_MONGODB, // Send data to MongoDB collection EXPORTING_CONNECTOR_TYPE_NUM // Number of exporting connector types } EXPORTING_CONNECTOR_TYPE; struct engine; struct instance_config { EXPORTING_CONNECTOR_TYPE type; const char *type_name; const char *name; const char *destination; const char *username; const char *password; const char *prefix; const char *label_prefix; const char *hostname; int update_every; int buffer_on_failures; long timeoutms; EXPORTING_OPTIONS options; SIMPLE_PATTERN *charts_pattern; SIMPLE_PATTERN *hosts_pattern; int initialized; void *connector_specific_config; }; struct simple_connector_config { int default_port; }; struct simple_connector_buffer { BUFFER *header; BUFFER *buffer; size_t buffered_metrics; size_t buffered_bytes; int used; struct simple_connector_buffer *next; }; #define CONNECTED_TO_MAX 1024 struct simple_connector_data { void *connector_specific_data; char connected_to[CONNECTED_TO_MAX]; char *auth_string; size_t total_buffered_metrics; BUFFER *header; BUFFER *buffer; size_t buffered_metrics; size_t buffered_bytes; struct simple_connector_buffer *previous_buffer; struct simple_connector_buffer *first_buffer; struct simple_connector_buffer *last_buffer; #ifdef ENABLE_HTTPS NETDATA_SSL ssl; #endif }; struct prometheus_remote_write_specific_config { char *remote_write_path; }; struct aws_kinesis_specific_config { char *stream_name; char *auth_key_id; char *secure_key; }; struct pubsub_specific_config { char *credentials_file; char *project_id; char *topic_id; }; struct mongodb_specific_config { char *database; char *collection; }; struct engine_config { const char *hostname; int update_every; }; struct stats { collected_number buffered_metrics; collected_number lost_metrics; collected_number sent_metrics; collected_number buffered_bytes; collected_number lost_bytes; collected_number sent_bytes; collected_number received_bytes; collected_number transmission_successes; collected_number data_lost_events; collected_number reconnects; collected_number transmission_failures; collected_number receptions; int initialized; RRDSET *st_metrics; RRDDIM *rd_buffered_metrics; RRDDIM *rd_lost_metrics; RRDDIM *rd_sent_metrics; RRDSET *st_bytes; RRDDIM *rd_buffered_bytes; RRDDIM *rd_lost_bytes; RRDDIM *rd_sent_bytes; RRDDIM *rd_received_bytes; RRDSET *st_ops; RRDDIM *rd_transmission_successes; RRDDIM *rd_data_lost_events; RRDDIM *rd_reconnects; RRDDIM *rd_transmission_failures; RRDDIM *rd_receptions; RRDSET *st_rusage; RRDDIM *rd_user; RRDDIM *rd_system; }; struct instance { struct instance_config config; void *buffer; void (*worker)(void *instance_p); struct stats stats; int scheduled; int disabled; int skip_host; int skip_chart; BUFFER *labels_buffer; time_t after; time_t before; uv_thread_t thread; uv_mutex_t mutex; uv_cond_t cond_var; int data_is_ready; int (*start_batch_formatting)(struct instance *instance); int (*start_host_formatting)(struct instance *instance, RRDHOST *host); int (*start_chart_formatting)(struct instance *instance, RRDSET *st); int (*metric_formatting)(struct instance *instance, RRDDIM *rd); int (*end_chart_formatting)(struct instance *instance, RRDSET *st); int (*variables_formatting)(struct instance *instance, RRDHOST *host); int (*end_host_formatting)(struct instance *instance, RRDHOST *host); int (*end_batch_formatting)(struct instance *instance); void (*prepare_header)(struct instance *instance); int (*check_response)(BUFFER *buffer, struct instance *instance); void *connector_specific_data; size_t index; struct instance *next; struct engine *engine; volatile sig_atomic_t exited; }; struct engine { struct engine_config config; size_t instance_num; time_t now; int aws_sdk_initialized; int protocol_buffers_initialized; int mongoc_initialized; struct instance *instance_root; volatile sig_atomic_t exit; }; extern struct instance *prometheus_exporter_instance; void *exporting_main(void *ptr); struct engine *read_exporting_config(); EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type); int init_connectors(struct engine *engine); void simple_connector_init(struct instance *instance); int mark_scheduled_instances(struct engine *engine); void prepare_buffers(struct engine *engine); size_t exporting_name_copy(char *dst, const char *src, size_t max_len); int rrdhost_is_exportable(struct instance *instance, RRDHOST *host); int rrdset_is_exportable(struct instance *instance, RRDSET *st); EXPORTING_OPTIONS exporting_parse_data_source(const char *source, EXPORTING_OPTIONS exporting_options); NETDATA_DOUBLE exporting_calculate_value_from_stored_data( struct instance *instance, RRDDIM *rd, time_t *last_timestamp); void start_batch_formatting(struct engine *engine); void start_host_formatting(struct engine *engine, RRDHOST *host); void start_chart_formatting(struct engine *engine, RRDSET *st); void metric_formatting(struct engine *engine, RRDDIM *rd); void end_chart_formatting(struct engine *engine, RRDSET *st); void variables_formatting(struct engine *engine, RRDHOST *host); void end_host_formatting(struct engine *engine, RRDHOST *host); void end_batch_formatting(struct engine *engine); int flush_host_labels(struct instance *instance, RRDHOST *host); int simple_connector_end_batch(struct instance *instance); int exporting_discard_response(BUFFER *buffer, struct instance *instance); void simple_connector_receive_response(int *sock, struct instance *instance); void simple_connector_send_buffer( int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics); void simple_connector_worker(void *instance_p); void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system); void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system); void send_internal_metrics(struct instance *instance); void clean_instance(struct instance *ptr); void simple_connector_cleanup(struct instance *instance); static inline void disable_instance(struct instance *instance) { instance->disabled = 1; instance->scheduled = 0; uv_mutex_unlock(&instance->mutex); netdata_log_error("EXPORTING: Instance %s disabled", instance->config.name); } #include "exporting/prometheus/prometheus.h" #include "exporting/opentsdb/opentsdb.h" #if ENABLE_PROMETHEUS_REMOTE_WRITE #include "exporting/prometheus/remote_write/remote_write.h" #endif #if HAVE_KINESIS #include "exporting/aws_kinesis/aws_kinesis.h" #endif #endif /* NETDATA_EXPORTING_ENGINE_H */