summaryrefslogtreecommitdiffstats
path: root/exporting/exporting_engine.h
blob: 2141caa41a4cf316d993881f2aac580552dc2af0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_EXPORTING_ENGINE_H
#define NETDATA_EXPORTING_ENGINE_H 1

#include "daemon/common.h"
#include <uv.h>

#define exporter_get(section, name, value) expconfig_get(&exporting_config, section, name, value)
#define exporter_get_number(section, name, value) expconfig_get_number(&exporting_config, section, name, value)
#define exporter_get_boolean(section, name, value) expconfig_get_boolean(&exporting_config, section, name, value)

extern struct config exporting_config;

#define EXPORTING_UPDATE_EVERY_OPTION_NAME "update every"
#define EXPORTING_UPDATE_EVERY_DEFAULT 10

typedef enum exporting_options {
    EXPORTING_OPTION_NON                    = 0,

    EXPORTING_SOURCE_DATA_AS_COLLECTED      = (1 << 0),
    EXPORTING_SOURCE_DATA_AVERAGE           = (1 << 1),
    EXPORTING_SOURCE_DATA_SUM               = (1 << 2),

    EXPORTING_OPTION_SEND_CONFIGURED_LABELS = (1 << 3),
    EXPORTING_OPTION_SEND_AUTOMATIC_LABELS  = (1 << 4),
    EXPORTING_OPTION_USE_TLS                = (1 << 5),

    EXPORTING_OPTION_SEND_NAMES             = (1 << 16),
    EXPORTING_OPTION_SEND_VARIABLES         = (1 << 17)
} EXPORTING_OPTIONS;

#define EXPORTING_OPTIONS_SOURCE_BITS                                                                                  \
    (EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM)
#define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) (exporting_options & EXPORTING_OPTIONS_SOURCE_BITS)

extern EXPORTING_OPTIONS global_exporting_options;
extern const char *global_exporting_prefix;

#define sending_labels_configured(instance)                                                                            \
    (instance->config.options & (EXPORTING_OPTION_SEND_CONFIGURED_LABELS | EXPORTING_OPTION_SEND_AUTOMATIC_LABELS))

#define should_send_label(instance, label_source)                                                                      \
    ((instance->config.options & EXPORTING_OPTION_SEND_CONFIGURED_LABELS &&                                            \
      label_source & RRDLABEL_SRC_CONFIG) ||                                                                           \
     (instance->config.options & EXPORTING_OPTION_SEND_AUTOMATIC_LABELS &&                                             \
      label_source & RRDLABEL_SRC_AUTO))

#define should_send_variables(instance) (instance->config.options & EXPORTING_OPTION_SEND_VARIABLES)

typedef enum exporting_connector_types {
    EXPORTING_CONNECTOR_TYPE_UNKNOWN,                 // Invalid type
    EXPORTING_CONNECTOR_TYPE_GRAPHITE,                // Send plain text to Graphite
    EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP,           // Send data to Graphite using HTTP API
    EXPORTING_CONNECTOR_TYPE_JSON,                    // Send data in JSON format
    EXPORTING_CONNECTOR_TYPE_JSON_HTTP,               // Send data in JSON format using HTTP API
    EXPORTING_CONNECTOR_TYPE_OPENTSDB,                // Send data to OpenTSDB using telnet API
    EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP,           // Send data to OpenTSDB using HTTP API
    EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE, // Send data using Prometheus remote write protocol
    EXPORTING_CONNECTOR_TYPE_KINESIS,                 // Send message to AWS Kinesis
    EXPORTING_CONNECTOR_TYPE_PUBSUB,                  // Send message to Google Cloud Pub/Sub
    EXPORTING_CONNECTOR_TYPE_MONGODB,                 // Send data to MongoDB collection
    EXPORTING_CONNECTOR_TYPE_NUM                      // Number of exporting connector types
} EXPORTING_CONNECTOR_TYPE;

struct engine;

struct instance_config {
    EXPORTING_CONNECTOR_TYPE type;
    const char *type_name;

    const char *name;
    const char *destination;
    const char *username;
    const char *password;
    const char *prefix;
    const char *hostname;

    int update_every;
    int buffer_on_failures;
    long timeoutms;

    EXPORTING_OPTIONS options;
    SIMPLE_PATTERN *charts_pattern;
    SIMPLE_PATTERN *hosts_pattern;

    int initialized;

    void *connector_specific_config;
};

struct simple_connector_config {
    int default_port;
};

struct simple_connector_buffer {
    BUFFER *header;
    BUFFER *buffer;

    size_t buffered_metrics;
    size_t buffered_bytes;

    int used;

    struct simple_connector_buffer *next;
};

#define CONNECTED_TO_MAX 1024

struct simple_connector_data {
    void *connector_specific_data;

    char connected_to[CONNECTED_TO_MAX];
    
    char *auth_string;

    size_t total_buffered_metrics;

    BUFFER *header;
    BUFFER *buffer;
    size_t buffered_metrics;
    size_t buffered_bytes;

    struct simple_connector_buffer *previous_buffer;
    struct simple_connector_buffer *first_buffer;
    struct simple_connector_buffer *last_buffer;

#ifdef ENABLE_HTTPS
    SSL *conn; //SSL connection
    int flags; //The flags for SSL connection
#endif
};

struct prometheus_remote_write_specific_config {
    char *remote_write_path;
};

struct aws_kinesis_specific_config {
    char *stream_name;
    char *auth_key_id;
    char *secure_key;
};

struct pubsub_specific_config {
    char *credentials_file;
    char *project_id;
    char *topic_id;
};

struct mongodb_specific_config {
    char *database;
    char *collection;
};

struct engine_config {
    const char *hostname;
    int update_every;
};

struct stats {
    collected_number buffered_metrics;
    collected_number lost_metrics;
    collected_number sent_metrics;
    collected_number buffered_bytes;
    collected_number lost_bytes;
    collected_number sent_bytes;
    collected_number received_bytes;
    collected_number transmission_successes;
    collected_number data_lost_events;
    collected_number reconnects;
    collected_number transmission_failures;
    collected_number receptions;

    int initialized;

    RRDSET *st_metrics;
    RRDDIM *rd_buffered_metrics;
    RRDDIM *rd_lost_metrics;
    RRDDIM *rd_sent_metrics;

    RRDSET *st_bytes;
    RRDDIM *rd_buffered_bytes;
    RRDDIM *rd_lost_bytes;
    RRDDIM *rd_sent_bytes;
    RRDDIM *rd_received_bytes;

    RRDSET *st_ops;
    RRDDIM *rd_transmission_successes;
    RRDDIM *rd_data_lost_events;
    RRDDIM *rd_reconnects;
    RRDDIM *rd_transmission_failures;
    RRDDIM *rd_receptions;

    RRDSET *st_rusage;
    RRDDIM *rd_user;
    RRDDIM *rd_system;
};

struct instance {
    struct instance_config config;
    void *buffer;
    void (*worker)(void *instance_p);
    struct stats stats;

    int scheduled;
    int disabled;
    int skip_host;
    int skip_chart;

    BUFFER *labels_buffer;

    time_t after;
    time_t before;

    uv_thread_t thread;
    uv_mutex_t mutex;
    uv_cond_t cond_var;
    int data_is_ready;

    int (*start_batch_formatting)(struct instance *instance);
    int (*start_host_formatting)(struct instance *instance, RRDHOST *host);
    int (*start_chart_formatting)(struct instance *instance, RRDSET *st);
    int (*metric_formatting)(struct instance *instance, RRDDIM *rd);
    int (*end_chart_formatting)(struct instance *instance, RRDSET *st);
    int (*variables_formatting)(struct instance *instance, RRDHOST *host);
    int (*end_host_formatting)(struct instance *instance, RRDHOST *host);
    int (*end_batch_formatting)(struct instance *instance);

    void (*prepare_header)(struct instance *instance);
    int (*check_response)(BUFFER *buffer, struct instance *instance);

    void *connector_specific_data;

    size_t index;
    struct instance *next;
    struct engine *engine;

    volatile sig_atomic_t exited;
};

struct engine {
    struct engine_config config;

    size_t instance_num;
    time_t now;

    int aws_sdk_initialized;
    int protocol_buffers_initialized;
    int mongoc_initialized;

    struct instance *instance_root;

    volatile sig_atomic_t exit;
};

extern struct instance *prometheus_exporter_instance;

void *exporting_main(void *ptr);

struct engine *read_exporting_config();
EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type);

int init_connectors(struct engine *engine);
void simple_connector_init(struct instance *instance);

int mark_scheduled_instances(struct engine *engine);
void prepare_buffers(struct engine *engine);

size_t exporting_name_copy(char *dst, const char *src, size_t max_len);

int rrdhost_is_exportable(struct instance *instance, RRDHOST *host);
int rrdset_is_exportable(struct instance *instance, RRDSET *st);

extern EXPORTING_OPTIONS exporting_parse_data_source(const char *source, EXPORTING_OPTIONS exporting_options);

NETDATA_DOUBLE
exporting_calculate_value_from_stored_data(
    struct instance *instance,
    RRDDIM *rd,
    time_t *last_timestamp);

void start_batch_formatting(struct engine *engine);
void start_host_formatting(struct engine *engine, RRDHOST *host);
void start_chart_formatting(struct engine *engine, RRDSET *st);
void metric_formatting(struct engine *engine, RRDDIM *rd);
void end_chart_formatting(struct engine *engine, RRDSET *st);
void variables_formatting(struct engine *engine, RRDHOST *host);
void end_host_formatting(struct engine *engine, RRDHOST *host);
void end_batch_formatting(struct engine *engine);
int flush_host_labels(struct instance *instance, RRDHOST *host);
int simple_connector_end_batch(struct instance *instance);

int exporting_discard_response(BUFFER *buffer, struct instance *instance);
void simple_connector_receive_response(int *sock, struct instance *instance);
void simple_connector_send_buffer(
    int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics);
void simple_connector_worker(void *instance_p);

void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system);
void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system);
void send_internal_metrics(struct instance *instance);

extern void clean_instance(struct instance *ptr);
void simple_connector_cleanup(struct instance *instance);

static inline void disable_instance(struct instance *instance)
{
    instance->disabled = 1;
    instance->scheduled = 0;
    uv_mutex_unlock(&instance->mutex);
    error("EXPORTING: Instance %s disabled", instance->config.name);
}

#include "exporting/prometheus/prometheus.h"
#include "exporting/opentsdb/opentsdb.h"
#if ENABLE_PROMETHEUS_REMOTE_WRITE
#include "exporting/prometheus/remote_write/remote_write.h"
#endif

#if HAVE_KINESIS
#include "exporting/aws_kinesis/aws_kinesis.h"
#endif

#endif /* NETDATA_EXPORTING_ENGINE_H */