diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 11:08:07 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 11:08:07 +0000 |
commit | c69cb8cc094cc916adbc516b09e944cd3d137c01 (patch) | |
tree | f2878ec41fb6d0e3613906c6722fc02b934eeb80 /exporting/exporting_engine.h | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.29.3.upstream/1.29.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | exporting/exporting_engine.h | 304 |
1 files changed, 304 insertions, 0 deletions
diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h new file mode 100644 index 0000000..1d9feb7 --- /dev/null +++ b/exporting/exporting_engine.h @@ -0,0 +1,304 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_ENGINE_H +#define NETDATA_EXPORTING_ENGINE_H 1 + +#include "daemon/common.h" + +#include <uv.h> + +#define exporter_get(section, name, value) expconfig_get(&exporting_config, section, name, value) +#define exporter_get_number(section, name, value) expconfig_get_number(&exporting_config, section, name, value) +#define exporter_get_boolean(section, name, value) expconfig_get_boolean(&exporting_config, section, name, value) + +extern struct config exporting_config; + +#define EXPORTING_UPDATE_EVERY_OPTION_NAME "update every" +#define EXPORTING_UPDATE_EVERY_DEFAULT 10 + +typedef enum exporting_options { + EXPORTING_OPTION_NON = 0, + + EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0), + EXPORTING_SOURCE_DATA_AVERAGE = (1 << 1), + EXPORTING_SOURCE_DATA_SUM = (1 << 2), + + EXPORTING_OPTION_SEND_CONFIGURED_LABELS = (1 << 3), + EXPORTING_OPTION_SEND_AUTOMATIC_LABELS = (1 << 4), + EXPORTING_OPTION_USE_TLS = (1 << 5), + + EXPORTING_OPTION_SEND_NAMES = (1 << 16) +} EXPORTING_OPTIONS; + +#define EXPORTING_OPTIONS_SOURCE_BITS \ + (EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM) +#define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) (exporting_options & EXPORTING_OPTIONS_SOURCE_BITS) + +#define sending_labels_configured(instance) \ + (instance->config.options & (EXPORTING_OPTION_SEND_CONFIGURED_LABELS | EXPORTING_OPTION_SEND_AUTOMATIC_LABELS)) + +#define should_send_label(instance, label) \ + ((instance->config.options & EXPORTING_OPTION_SEND_CONFIGURED_LABELS && \ + label->label_source == LABEL_SOURCE_NETDATA_CONF) || \ + (instance->config.options & EXPORTING_OPTION_SEND_AUTOMATIC_LABELS && \ + label->label_source != LABEL_SOURCE_NETDATA_CONF)) + +typedef enum exporting_connector_types { + EXPORTING_CONNECTOR_TYPE_UNKNOWN, // Invalid type + EXPORTING_CONNECTOR_TYPE_GRAPHITE, // Send plain text to Graphite + EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP, // Send data to Graphite using HTTP API + EXPORTING_CONNECTOR_TYPE_JSON, // Send data in JSON format + EXPORTING_CONNECTOR_TYPE_JSON_HTTP, // Send data in JSON format using HTTP API + EXPORTING_CONNECTOR_TYPE_OPENTSDB, // Send data to OpenTSDB using telnet API + EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP, // Send data to OpenTSDB using HTTP API + EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE, // User selected to use Prometheus backend + EXPORTING_CONNECTOR_TYPE_KINESIS, // Send message to AWS Kinesis + EXPORTING_CONNECTOR_TYPE_PUBSUB, // Send message to Google Cloud Pub/Sub + EXPORTING_CONNECTOR_TYPE_MONGODB, // Send data to MongoDB collection + EXPORTING_CONNECTOR_TYPE_NUM // Number of backend types +} EXPORTING_CONNECTOR_TYPE; + +struct engine; + +struct instance_config { + EXPORTING_CONNECTOR_TYPE type; + const char *type_name; + + const char *name; + const char *destination; + const char *prefix; + const char *hostname; + + int update_every; + int buffer_on_failures; + long timeoutms; + + EXPORTING_OPTIONS options; + SIMPLE_PATTERN *charts_pattern; + SIMPLE_PATTERN *hosts_pattern; + + void *connector_specific_config; +}; + +struct simple_connector_config { + int default_port; +}; + +struct simple_connector_buffer { + BUFFER *header; + BUFFER *buffer; + + size_t buffered_metrics; + size_t buffered_bytes; + + int used; + + struct simple_connector_buffer *next; +}; + +struct simple_connector_data { + void *connector_specific_data; + + size_t total_buffered_metrics; + + BUFFER *header; + BUFFER *buffer; + size_t buffered_metrics; + size_t buffered_bytes; + + struct simple_connector_buffer *previous_buffer; + struct simple_connector_buffer *first_buffer; + struct simple_connector_buffer *last_buffer; + +#ifdef ENABLE_HTTPS + SSL *conn; //SSL connection + int flags; //The flags for SSL connection +#endif +}; + +struct prometheus_remote_write_specific_config { + char *remote_write_path; +}; + +struct aws_kinesis_specific_config { + char *stream_name; + char *auth_key_id; + char *secure_key; +}; + +struct pubsub_specific_config { + char *credentials_file; + char *project_id; + char *topic_id; +}; + +struct mongodb_specific_config { + char *database; + char *collection; +}; + +struct engine_config { + const char *hostname; + int update_every; +}; + +struct stats { + collected_number buffered_metrics; + collected_number lost_metrics; + collected_number sent_metrics; + collected_number buffered_bytes; + collected_number lost_bytes; + collected_number sent_bytes; + collected_number received_bytes; + collected_number transmission_successes; + collected_number data_lost_events; + collected_number reconnects; + collected_number transmission_failures; + collected_number receptions; + + int initialized; + + RRDSET *st_metrics; + RRDDIM *rd_buffered_metrics; + RRDDIM *rd_lost_metrics; + RRDDIM *rd_sent_metrics; + + RRDSET *st_bytes; + RRDDIM *rd_buffered_bytes; + RRDDIM *rd_lost_bytes; + RRDDIM *rd_sent_bytes; + RRDDIM *rd_received_bytes; + + RRDSET *st_ops; + RRDDIM *rd_transmission_successes; + RRDDIM *rd_data_lost_events; + RRDDIM *rd_reconnects; + RRDDIM *rd_transmission_failures; + RRDDIM *rd_receptions; + + RRDSET *st_rusage; + RRDDIM *rd_user; + RRDDIM *rd_system; +}; + +struct instance { + struct instance_config config; + void *buffer; + void (*worker)(void *instance_p); + struct stats stats; + + int scheduled; + int disabled; + int skip_host; + int skip_chart; + + BUFFER *labels; + + time_t after; + time_t before; + + uv_thread_t thread; + uv_mutex_t mutex; + uv_cond_t cond_var; + int data_is_ready; + + int (*start_batch_formatting)(struct instance *instance); + int (*start_host_formatting)(struct instance *instance, RRDHOST *host); + int (*start_chart_formatting)(struct instance *instance, RRDSET *st); + int (*metric_formatting)(struct instance *instance, RRDDIM *rd); + int (*end_chart_formatting)(struct instance *instance, RRDSET *st); + int (*end_host_formatting)(struct instance *instance, RRDHOST *host); + int (*end_batch_formatting)(struct instance *instance); + + void (*prepare_header)(struct instance *instance); + int (*check_response)(BUFFER *buffer, struct instance *instance); + + void *connector_specific_data; + + size_t index; + struct instance *next; + struct engine *engine; + + volatile sig_atomic_t exited; +}; + +struct engine { + struct engine_config config; + + size_t instance_num; + time_t now; + + int aws_sdk_initialized; + int protocol_buffers_initialized; + int mongoc_initialized; + + struct instance *instance_root; + + volatile sig_atomic_t exit; +}; + +extern struct instance *prometheus_exporter_instance; + +void *exporting_main(void *ptr); + +struct engine *read_exporting_config(); +EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type); + +int init_connectors(struct engine *engine); +void simple_connector_init(struct instance *instance); + +int mark_scheduled_instances(struct engine *engine); +void prepare_buffers(struct engine *engine); + +size_t exporting_name_copy(char *dst, const char *src, size_t max_len); + +int rrdhost_is_exportable(struct instance *instance, RRDHOST *host); +int rrdset_is_exportable(struct instance *instance, RRDSET *st); + +calculated_number exporting_calculate_value_from_stored_data( + struct instance *instance, + RRDDIM *rd, + time_t *last_timestamp); + +void start_batch_formatting(struct engine *engine); +void start_host_formatting(struct engine *engine, RRDHOST *host); +void start_chart_formatting(struct engine *engine, RRDSET *st); +void metric_formatting(struct engine *engine, RRDDIM *rd); +void end_chart_formatting(struct engine *engine, RRDSET *st); +void end_host_formatting(struct engine *engine, RRDHOST *host); +void end_batch_formatting(struct engine *engine); +int flush_host_labels(struct instance *instance, RRDHOST *host); +int simple_connector_end_batch(struct instance *instance); + +int exporting_discard_response(BUFFER *buffer, struct instance *instance); +void simple_connector_receive_response(int *sock, struct instance *instance); +void simple_connector_send_buffer( + int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics); +void simple_connector_worker(void *instance_p); + +void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system); +void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system); +void send_internal_metrics(struct instance *instance); + +extern void clean_instance(struct instance *ptr); +void simple_connector_cleanup(struct instance *instance); + +static inline void disable_instance(struct instance *instance) +{ + instance->disabled = 1; + instance->scheduled = 0; + uv_mutex_unlock(&instance->mutex); + error("EXPORTING: Instance %s disabled", instance->config.name); +} + +#include "exporting/prometheus/prometheus.h" +#include "exporting/opentsdb/opentsdb.h" +#if ENABLE_PROMETHEUS_REMOTE_WRITE +#include "exporting/prometheus/remote_write/remote_write.h" +#endif + +#if HAVE_KINESIS +#include "exporting/aws_kinesis/aws_kinesis.h" +#endif + +#endif /* NETDATA_EXPORTING_ENGINE_H */ |