summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
blob: c3c14233fa278174adb57a503db72fcb6a8e9143 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_RRDPUSH_H
#define NETDATA_RRDPUSH_H 1

#include "libnetdata/libnetdata.h"
#include "daemon/common.h"
#include "web/server/web_client.h"
#include "database/rrdfunctions.h"
#include "database/rrd.h"

#define CONNECTED_TO_SIZE 100
#define CBUFFER_INITIAL_SIZE (16 * 1024)
#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)

// ----------------------------------------------------------------------------
// obsolete versions - do not use anymore

#define STREAM_OLD_VERSION_CLAIM 3
#define STREAM_OLD_VERSION_CLABELS 4
#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production

// ----------------------------------------------------------------------------
// capabilities negotiation

typedef enum {
    // do not use the first 3 bits
    // they used to be versions 1, 2 and 3
    // before we introduce capabilities

    STREAM_CAP_V1               = (1 << 3), // v1 = the oldest protocol
    STREAM_CAP_V2               = (1 << 4), // v2 = the second version of the protocol (with host labels)
    STREAM_CAP_VN               = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
                                            // v3 = claiming supported
                                            // v4 = chart labels supported
                                            // v5 = lz4 compression supported
    STREAM_CAP_VCAPS            = (1 << 6), // capabilities negotiation supported
    STREAM_CAP_HLABELS          = (1 << 7), // host labels supported
    STREAM_CAP_CLAIM            = (1 << 8), // claiming supported
    STREAM_CAP_CLABELS          = (1 << 9), // chart labels supported
    STREAM_CAP_COMPRESSION      = (1 << 10), // lz4 compression supported
    STREAM_CAP_FUNCTIONS        = (1 << 11), // plugin functions supported
    STREAM_CAP_REPLICATION      = (1 << 12), // replication supported
    STREAM_CAP_BINARY           = (1 << 13), // streaming supports binary data
    STREAM_CAP_INTERPOLATED     = (1 << 14), // streaming supports interpolated streaming of values
    STREAM_CAP_IEEE754          = (1 << 15), // streaming supports binary/hex transfer of double values
    STREAM_CAP_DATA_WITH_ML     = (1 << 16), // streaming supports transferring anomaly bit
    STREAM_CAP_DYNCFG           = (1 << 17), // dynamic configuration of plugins trough streaming

    STREAM_CAP_INVALID          = (1 << 30), // used as an invalid value for capabilities when this is set
    // this must be signed int, so don't use the last bit
    // needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;

#ifdef  ENABLE_RRDPUSH_COMPRESSION
#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
#else
#define STREAM_HAS_COMPRESSION 0
#endif  // ENABLE_RRDPUSH_COMPRESSION

STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);

#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))

// ----------------------------------------------------------------------------
// stream handshake

#define HTTP_HEADER_SIZE 8192

#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="

#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
#define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later."
#define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."

typedef enum {
    STREAM_HANDSHAKE_OK_V3 = 3, // v3+
    STREAM_HANDSHAKE_OK_V2 = 2, // v2
    STREAM_HANDSHAKE_OK_V1 = 1, // v1
    STREAM_HANDSHAKE_NEVER = 0, // never tried to connect
    STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
    STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
    STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
    STREAM_HANDSHAKE_ERROR_DENIED = -4,
    STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
    STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
    STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
    STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
    STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9,
    STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
    STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
    STREAM_HANDSHAKE_INITIALIZATION = -12,
    STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP = -13,
    STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER = -14,
    STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
    STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
    STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
    STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18,
    STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
    STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
    STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
    STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,

} STREAM_HANDSHAKE;


// ----------------------------------------------------------------------------

typedef struct {
    char *os_name;
    char *os_id;
    char *os_version;
    char *kernel_name;
    char *kernel_version;
} stream_encoded_t;

#ifdef ENABLE_RRDPUSH_COMPRESSION
// signature MUST end with a newline
#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4

struct compressor_state {
    bool initialized;
    char *compression_result_buffer;
    size_t compression_result_buffer_size;
    struct {
        void *lz4_stream;
        char *input_ring_buffer;
        size_t input_ring_buffer_size;
        size_t input_ring_buffer_pos;
    } stream;
    size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
    void (*destroy)(struct compressor_state **state);
};

void rrdpush_compressor_reset(struct compressor_state *state);
void rrdpush_compressor_destroy(struct compressor_state *state);
size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out);

struct decompressor_state {
    bool initialized;
    size_t signature_size;
    size_t total_compressed;
    size_t total_uncompressed;
    size_t packet_count;
    struct {
        void *lz4_stream;
        char *buffer;
        size_t size;
        size_t write_at;
        size_t read_at;
    } stream;
};

void rrdpush_decompressor_destroy(struct decompressor_state *state);
void rrdpush_decompressor_reset(struct decompressor_state *state);
size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);

static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) {
    if (unlikely(!data || !data_size))
        return 0;

    if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
        return 0;

    uint32_t sign = *(uint32_t *)data;
    if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
        return 0;

    size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
    return length;
}

static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
    if(unlikely(state->stream.read_at != state->stream.write_at))
        fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");

    return rrdpush_decompress_decode_header(header, header_size);
}

static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
    if(unlikely(state->stream.read_at > state->stream.write_at))
        fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");

    return state->stream.write_at - state->stream.read_at;
}

static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
    if (unlikely(!state || !size || !dst))
        return 0;

    size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);

    if(unlikely(!remaining))
        return 0;

    size_t bytes_to_return = size;
    if(bytes_to_return > remaining)
        bytes_to_return = remaining;

    memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return);
    state->stream.read_at += bytes_to_return;

    if(unlikely(state->stream.read_at > state->stream.write_at))
        fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");

    return bytes_to_return;
}
#endif

// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.

typedef enum __attribute__((packed)) {
    STREAM_TRAFFIC_TYPE_REPLICATION = 0,
    STREAM_TRAFFIC_TYPE_FUNCTIONS,
    STREAM_TRAFFIC_TYPE_METADATA,
    STREAM_TRAFFIC_TYPE_DATA,

    // terminator
    STREAM_TRAFFIC_TYPE_MAX,
} STREAM_TRAFFIC_TYPE;

typedef enum __attribute__((packed)) {
    SENDER_FLAG_OVERFLOW     = (1 << 0), // The buffer has been overflown
    SENDER_FLAG_COMPRESSION  = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;

struct function_payload_state {
    BUFFER *payload;
    char *txid;
    char *fn_name;
    char *timeout;
};

struct sender_state {
    RRDHOST *host;
    pid_t tid;                              // the thread id of the sender, from gettid()
    SENDER_FLAGS flags;
    int timeout;
    int default_port;
    uint32_t reconnect_delay;
    char connected_to[CONNECTED_TO_SIZE + 1];   // We don't know which proxy we connect to, passed back from socket.c
    size_t begin;
    size_t reconnects_counter;
    size_t sent_bytes;
    size_t sent_bytes_on_this_connection;
    size_t send_attempts;
    time_t last_traffic_seen_t;
    time_t last_state_since_t;              // the timestamp of the last state (online/offline) change
    size_t not_connected_loops;
    // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
    // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
    SPINLOCK spinlock;
    struct circular_buffer *buffer;
    char read_buffer[PLUGINSD_LINE_MAX + 1];
    ssize_t read_len;
    STREAM_CAPABILITIES capabilities;

    size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];

    int rrdpush_sender_pipe[2];                     // collector to sender thread signaling
    int rrdpush_sender_socket;

    int receiving_function_payload;
    struct function_payload_state function_payload; // state when receiving function with payload

    uint16_t hops;

#ifdef ENABLE_RRDPUSH_COMPRESSION
    struct compressor_state compressor;
#endif // ENABLE_RRDPUSH_COMPRESSION

#ifdef ENABLE_HTTPS
    NETDATA_SSL ssl;                     // structure used to encrypt the connection
#endif

    struct {
        bool shutdown;
        STREAM_HANDSHAKE reason;
    } exit;

    struct {
        DICTIONARY *requests;                   // de-duplication of replication requests, per chart
        time_t oldest_request_after_t;          // the timestamp of the oldest replication request
        time_t latest_completed_before_t;       // the timestamp of the latest replication request

        struct {
            size_t pending_requests;            // the currently outstanding replication requests
            size_t charts_replicating;          // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
            bool reached_max;                   // true when the sender buffer should not get more replication responses
        } atomic;

    } replication;

    struct {
        bool pending_data;
        size_t buffer_used_percentage;          // the current utilization of the sending buffer
        usec_t last_flush_time_ut;              // the last time the sender flushed the sending buffer in USEC
        time_t last_buffer_recreate_s;          // true when the sender buffer should be re-created
    } atomic;
};

#define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
#define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock)

#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)

#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)

#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)

#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)

#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)

#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)

#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)

/*
typedef enum {
    STREAM_NODE_INSTANCE_FEATURE_CLOUD_ONLINE   = (1 << 0),
    STREAM_NODE_INSTANCE_FEATURE_VIRTUAL_HOST   = (1 << 1),
    STREAM_NODE_INSTANCE_FEATURE_HEALTH_ENABLED = (1 << 2),
    STREAM_NODE_INSTANCE_FEATURE_ML_SELF        = (1 << 3),
    STREAM_NODE_INSTANCE_FEATURE_ML_RECEIVED    = (1 << 4),
    STREAM_NODE_INSTANCE_FEATURE_SSL            = (1 << 5),
} STREAM_NODE_INSTANCE_FEATURES;

typedef struct stream_node_instance {
    uuid_t uuid;
    STRING *agent;
    STREAM_NODE_INSTANCE_FEATURES features;
    uint32_t hops;

    // receiver information on that agent
    int32_t capabilities;
    uint32_t local_port;
    uint32_t remote_port;
    STRING *local_ip;
    STRING *remote_ip;
} STREAM_NODE_INSTANCE;
*/

struct buffered_reader {
    ssize_t read_len;
    ssize_t pos;
    char read_buffer[PLUGINSD_LINE_MAX + 1];
};

bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst);
static inline void buffered_reader_init(struct buffered_reader *reader) {
    reader->read_buffer[0] = '\0';
    reader->read_len = 0;
    reader->pos = 0;
}

struct receiver_state {
    RRDHOST *host;
    pid_t tid;
    netdata_thread_t thread;
    int fd;
    char *key;
    char *hostname;
    char *registry_hostname;
    char *machine_guid;
    char *os;
    char *timezone;         // Unused?
    char *abbrev_timezone;
    int32_t utc_offset;
    char *tags;
    char *client_ip;        // Duplicated in pluginsd
    char *client_port;        // Duplicated in pluginsd
    char *program_name;        // Duplicated in pluginsd
    char *program_version;
    struct rrdhost_system_info *system_info;
    STREAM_CAPABILITIES capabilities;
    time_t last_msg_t;

    struct buffered_reader reader;

    uint16_t hops;

    struct {
        bool shutdown;      // signal the streaming parser to exit
        STREAM_HANDSHAKE reason;
    } exit;

    struct {
        RRD_MEMORY_MODE mode;
        int history;
        int update_every;
        int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
        time_t alarms_delay;
        uint32_t alarms_history;
        int rrdpush_enabled;
        char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
        char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
        bool rrdpush_enable_replication;
        time_t rrdpush_seconds_to_replicate;
        time_t rrdpush_replication_step;
        char *rrdpush_destination;  // DONT FREE - it is allocated in appconfig
        unsigned int rrdpush_compression;
    } config;

#ifdef ENABLE_HTTPS
    NETDATA_SSL ssl;
#endif

    time_t replication_first_time_t;

#ifdef ENABLE_RRDPUSH_COMPRESSION
    struct decompressor_state decompressor;
#endif // ENABLE_RRDPUSH_COMPRESSION
/*
    struct {
        uint32_t count;
        STREAM_NODE_INSTANCE *array;
    } instances;
*/
};

struct rrdpush_destinations {
    STRING *destination;
    bool ssl;
    uint32_t attempts;
    time_t since;
    time_t postpone_reconnection_until;
    STREAM_HANDSHAKE reason;

    struct rrdpush_destinations *prev;
    struct rrdpush_destinations *next;
};

extern unsigned int default_rrdpush_enabled;
#ifdef ENABLE_RRDPUSH_COMPRESSION
extern unsigned int default_rrdpush_compression_enabled;
#endif // ENABLE_RRDPUSH_COMPRESSION
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
extern bool default_rrdpush_enable_replication;
extern time_t default_rrdpush_seconds_to_replicate;
extern time_t default_rrdpush_replication_step;
extern unsigned int remote_clock_resync_iterations;

void rrdpush_destinations_init(RRDHOST *host);
void rrdpush_destinations_free(RRDHOST *host);

BUFFER *sender_start(struct sender_state *s);
void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();

typedef struct rrdset_stream_buffer {
    STREAM_CAPABILITIES capabilities;
    bool v2;
    bool begin_v2_added;
    time_t wall_clock_time;
    uint64_t rrdset_flags; // RRDSET_FLAGS
    time_t last_point_end_time_s;
    BUFFER *wb;
} RRDSET_STREAM_BUFFER;

RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);

bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_send_claimed_id(RRDHOST *host);
void rrdpush_send_global_functions(RRDHOST *host);
void rrdpush_send_dyncfg(RRDHOST *host);

#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended

int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);

void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
int connect_to_one_of_destinations(
    RRDHOST *host,
    int default_port,
    struct timeval *timeout,
    size_t *reconnects_counter,
    char *connected_to,
    size_t connected_to_size,
    struct rrdpush_destinations **destination);

void rrdpush_signal_sender_to_wake_up(struct sender_state *s);

#ifdef ENABLE_RRDPUSH_COMPRESSION
struct compressor_state *create_compressor();
#endif // ENABLE_RRDPUSH_COMPRESSION

void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
int32_t stream_capabilities_to_vn(uint32_t caps);

void receiver_state_free(struct receiver_state *rpt);
bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason);

void sender_thread_buffer_free(void);

#include "replication.h"

typedef enum __attribute__((packed)) {
    RRDHOST_DB_STATUS_INITIALIZING = 0,
    RRDHOST_DB_STATUS_QUERYABLE,
} RRDHOST_DB_STATUS;

static inline const char *rrdhost_db_status_to_string(RRDHOST_DB_STATUS status) {
    switch(status) {
        default:
        case RRDHOST_DB_STATUS_INITIALIZING:
            return "initializing";

        case RRDHOST_DB_STATUS_QUERYABLE:
            return "online";
    }
}

typedef enum __attribute__((packed)) {
    RRDHOST_DB_LIVENESS_STALE = 0,
    RRDHOST_DB_LIVENESS_LIVE,
} RRDHOST_DB_LIVENESS;

static inline const char *rrdhost_db_liveness_to_string(RRDHOST_DB_LIVENESS status) {
    switch(status) {
        default:
        case RRDHOST_DB_LIVENESS_STALE:
            return "stale";

        case RRDHOST_DB_LIVENESS_LIVE:
            return "live";
    }
}

typedef enum __attribute__((packed)) {
    RRDHOST_INGEST_STATUS_ARCHIVED = 0,
    RRDHOST_INGEST_STATUS_INITIALIZING,
    RRDHOST_INGEST_STATUS_REPLICATING,
    RRDHOST_INGEST_STATUS_ONLINE,
    RRDHOST_INGEST_STATUS_OFFLINE,
} RRDHOST_INGEST_STATUS;

static inline const char *rrdhost_ingest_status_to_string(RRDHOST_INGEST_STATUS status) {
    switch(status) {
        case RRDHOST_INGEST_STATUS_ARCHIVED:
            return "archived";

        case RRDHOST_INGEST_STATUS_INITIALIZING:
            return "initializing";

        case RRDHOST_INGEST_STATUS_REPLICATING:
            return "replicating";

        case RRDHOST_INGEST_STATUS_ONLINE:
            return "online";

        default:
        case RRDHOST_INGEST_STATUS_OFFLINE:
            return "offline";
    }
}

typedef enum __attribute__((packed)) {
    RRDHOST_INGEST_TYPE_LOCALHOST = 0,
    RRDHOST_INGEST_TYPE_VIRTUAL,
    RRDHOST_INGEST_TYPE_CHILD,
    RRDHOST_INGEST_TYPE_ARCHIVED,
} RRDHOST_INGEST_TYPE;

static inline const char *rrdhost_ingest_type_to_string(RRDHOST_INGEST_TYPE type) {
    switch(type) {
        case RRDHOST_INGEST_TYPE_LOCALHOST:
            return "localhost";

        case RRDHOST_INGEST_TYPE_VIRTUAL:
            return "virtual";

        case RRDHOST_INGEST_TYPE_CHILD:
            return "child";

        default:
        case RRDHOST_INGEST_TYPE_ARCHIVED:
            return "archived";
    }
}

typedef enum __attribute__((packed)) {
    RRDHOST_STREAM_STATUS_DISABLED = 0,
    RRDHOST_STREAM_STATUS_REPLICATING,
    RRDHOST_STREAM_STATUS_ONLINE,
    RRDHOST_STREAM_STATUS_OFFLINE,
} RRDHOST_STREAMING_STATUS;

static inline const char *rrdhost_streaming_status_to_string(RRDHOST_STREAMING_STATUS status) {
    switch(status) {
        case RRDHOST_STREAM_STATUS_DISABLED:
            return "disabled";

        case RRDHOST_STREAM_STATUS_REPLICATING:
            return "replicating";

        case RRDHOST_STREAM_STATUS_ONLINE:
            return "online";

        default:
        case RRDHOST_STREAM_STATUS_OFFLINE:
            return "offline";
    }
}

typedef enum __attribute__((packed)) {
    RRDHOST_ML_STATUS_DISABLED = 0,
    RRDHOST_ML_STATUS_OFFLINE,
    RRDHOST_ML_STATUS_RUNNING,
} RRDHOST_ML_STATUS;

static inline const char *rrdhost_ml_status_to_string(RRDHOST_ML_STATUS status) {
    switch(status) {
        case RRDHOST_ML_STATUS_RUNNING:
            return "online";

        case RRDHOST_ML_STATUS_OFFLINE:
            return "offline";

        default:
        case RRDHOST_ML_STATUS_DISABLED:
            return "disabled";
    }
}

typedef enum __attribute__((packed)) {
    RRDHOST_ML_TYPE_DISABLED = 0,
    RRDHOST_ML_TYPE_SELF,
    RRDHOST_ML_TYPE_RECEIVED,
} RRDHOST_ML_TYPE;

static inline const char *rrdhost_ml_type_to_string(RRDHOST_ML_TYPE type) {
    switch(type) {
        case RRDHOST_ML_TYPE_SELF:
            return "self";

        case RRDHOST_ML_TYPE_RECEIVED:
            return "received";

        default:
        case RRDHOST_ML_TYPE_DISABLED:
            return "disabled";
    }
}

typedef enum __attribute__((packed)) {
    RRDHOST_HEALTH_STATUS_DISABLED = 0,
    RRDHOST_HEALTH_STATUS_INITIALIZING,
    RRDHOST_HEALTH_STATUS_RUNNING,
} RRDHOST_HEALTH_STATUS;

static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS status) {
    switch(status) {
        default:
        case RRDHOST_HEALTH_STATUS_DISABLED:
            return "disabled";

        case RRDHOST_HEALTH_STATUS_INITIALIZING:
            return "initializing";

        case RRDHOST_HEALTH_STATUS_RUNNING:
            return "online";
    }
}

typedef struct rrdhost_status {
    RRDHOST *host;
    time_t now;

    struct {
        RRDHOST_DB_STATUS status;
        RRDHOST_DB_LIVENESS liveness;
        RRD_MEMORY_MODE mode;
        time_t first_time_s;
        time_t last_time_s;
        size_t metrics;
        size_t instances;
        size_t contexts;
    } db;

    struct {
        RRDHOST_ML_STATUS status;
        RRDHOST_ML_TYPE type;
        struct ml_metrics_statistics metrics;
    } ml;

    struct {
        size_t hops;
        RRDHOST_INGEST_TYPE  type;
        RRDHOST_INGEST_STATUS status;
        SOCKET_PEERS peers;
        bool ssl;
        STREAM_CAPABILITIES capabilities;
        uint32_t id;
        time_t since;
        STREAM_HANDSHAKE reason;

        struct {
            bool in_progress;
            NETDATA_DOUBLE completion;
            size_t instances;
        } replication;
    } ingest;

    struct {
        size_t hops;
        RRDHOST_STREAMING_STATUS status;
        SOCKET_PEERS peers;
        bool ssl;
        bool compression;
        STREAM_CAPABILITIES capabilities;
        uint32_t id;
        time_t since;
        STREAM_HANDSHAKE reason;

        struct {
            bool in_progress;
            NETDATA_DOUBLE completion;
            size_t instances;
        } replication;

        size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
    } stream;

    struct {
        RRDHOST_HEALTH_STATUS status;
        struct {
            uint32_t undefined;
            uint32_t uninitialized;
            uint32_t clear;
            uint32_t warning;
            uint32_t critical;
        } alerts;
    } health;
} RRDHOST_STATUS;

void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
bool rrdhost_state_cloud_emulation(RRDHOST *host);

void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job);
void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name);

void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name);
void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type);
void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x

#endif //NETDATA_RRDPUSH_H