1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_RRDENGINE_H
#define NETDATA_RRDENGINE_H
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <fcntl.h>
#include <lz4.h>
#include <Judy.h>
#include <openssl/sha.h>
#include <openssl/evp.h>
#include "daemon/common.h"
#include "../rrd.h"
#include "rrddiskprotocol.h"
#include "rrdenginelib.h"
#include "datafile.h"
#include "journalfile.h"
#include "rrdengineapi.h"
#include "pagecache.h"
#include "rrdenglocking.h"
#ifdef NETDATA_RRD_INTERNALS
#endif /* NETDATA_RRD_INTERNALS */
extern unsigned rrdeng_pages_per_extent;
/* Forward declarations */
struct rrdengine_instance;
#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */
#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
struct rrdeng_collect_handle {
struct pg_cache_page_index *page_index;
struct rrdeng_page_descr *descr;
unsigned long page_correlation_id;
// set to 1 when this dimension is not page aligned with the other dimensions in the chart
uint8_t unaligned_page;
struct pg_alignment *alignment;
};
struct rrdeng_query_handle {
struct rrdeng_page_descr *descr;
struct rrdengine_instance *ctx;
struct pg_cache_page_index *page_index;
time_t wanted_start_time_s;
time_t now_s;
unsigned position;
unsigned entries;
storage_number *page;
usec_t page_end_time_ut;
uint32_t page_length;
time_t dt_s;
};
typedef enum {
RRDENGINE_STATUS_UNINITIALIZED = 0,
RRDENGINE_STATUS_INITIALIZING,
RRDENGINE_STATUS_INITIALIZED
} rrdengine_state_t;
enum rrdeng_opcode {
/* can be used to return empty status or flush the command queue */
RRDENG_NOOP = 0,
RRDENG_READ_PAGE,
RRDENG_READ_EXTENT,
RRDENG_COMMIT_PAGE,
RRDENG_FLUSH_PAGES,
RRDENG_SHUTDOWN,
RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE,
RRDENG_QUIESCE,
RRDENG_MAX_OPCODE
};
struct rrdeng_read_page {
struct rrdeng_page_descr *page_cache_descr;
};
struct rrdeng_read_extent {
struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
int page_count;
};
struct rrdeng_cmd {
enum rrdeng_opcode opcode;
union {
struct rrdeng_read_page read_page;
struct rrdeng_read_extent read_extent;
struct completion *completion;
};
};
#define RRDENG_CMD_Q_MAX_SIZE (2048)
struct rrdeng_cmdqueue {
unsigned head, tail;
struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE];
};
struct extent_io_descriptor {
uv_fs_t req;
uv_work_t req_worker;
uv_buf_t iov;
uv_file file;
void *buf;
void *map_base;
size_t map_length;
uint64_t pos;
unsigned bytes;
struct completion *completion;
unsigned descr_count;
int release_descr;
struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
struct rrdeng_page_descr descr_read_array[MAX_PAGES_PER_EXTENT];
Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */
};
struct generic_io_descriptor {
uv_fs_t req;
uv_buf_t iov;
void *buf;
uint64_t pos;
unsigned bytes;
struct completion *completion;
};
struct extent_cache_element {
struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */
unsigned fileno;
struct extent_cache_element *prev; /* LRU */
struct extent_cache_element *next; /* LRU */
struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */
uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE];
};
#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */
/* Initialize by setting the structure to zero */
struct extent_cache {
struct extent_cache_element extent_array[MAX_CACHED_EXTENTS];
unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */
unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */
struct extent_cache_element *replaceQ_head; /* LRU */
struct extent_cache_element *replaceQ_tail; /* MRU */
};
struct rrdengine_worker_config {
struct rrdengine_instance *ctx;
uv_thread_t thread;
uv_loop_t* loop;
uv_async_t async;
/* file deletion thread */
uv_thread_t *now_deleting_files;
unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */
/* dirty page deletion thread */
uv_thread_t *now_invalidating_dirty_pages;
/* set to 0 when now_invalidating_dirty_pages is still running */
unsigned long cleanup_thread_invalidating_dirty_pages;
unsigned inflight_dirty_pages;
/* FIFO command queue */
uv_mutex_t cmd_mutex;
uv_cond_t cmd_cond;
volatile unsigned queue_size;
struct rrdeng_cmdqueue cmd_queue;
struct extent_cache xt_cache;
int error;
};
/*
* Debug statistics not used by code logic.
* They only describe operations since DB engine instance load time.
*/
struct rrdengine_statistics {
rrdeng_stats_t metric_API_producers;
rrdeng_stats_t metric_API_consumers;
rrdeng_stats_t pg_cache_insertions;
rrdeng_stats_t pg_cache_deletions;
rrdeng_stats_t pg_cache_hits;
rrdeng_stats_t pg_cache_misses;
rrdeng_stats_t pg_cache_backfills;
rrdeng_stats_t pg_cache_evictions;
rrdeng_stats_t before_decompress_bytes;
rrdeng_stats_t after_decompress_bytes;
rrdeng_stats_t before_compress_bytes;
rrdeng_stats_t after_compress_bytes;
rrdeng_stats_t io_write_bytes;
rrdeng_stats_t io_write_requests;
rrdeng_stats_t io_read_bytes;
rrdeng_stats_t io_read_requests;
rrdeng_stats_t io_write_extent_bytes;
rrdeng_stats_t io_write_extents;
rrdeng_stats_t io_read_extent_bytes;
rrdeng_stats_t io_read_extents;
rrdeng_stats_t datafile_creations;
rrdeng_stats_t datafile_deletions;
rrdeng_stats_t journalfile_creations;
rrdeng_stats_t journalfile_deletions;
rrdeng_stats_t page_cache_descriptors;
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
rrdeng_stats_t pg_cache_over_half_dirty_events;
rrdeng_stats_t flushing_pressure_page_deletions;
};
/* I/O errors global counter */
extern rrdeng_stats_t global_io_errors;
/* File-System errors global counter */
extern rrdeng_stats_t global_fs_errors;
/* number of File-Descriptors that have been reserved by dbengine */
extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
/* inability to flush global counters */
extern rrdeng_stats_t global_pg_cache_over_half_dirty_events;
extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */
#define NO_QUIESCE (0) /* initial state when all operations function normally */
#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */
#define QUIESCED (2) /* is set after all threads have finished running */
typedef enum {
LOAD_ERRORS_PAGE_FLIPPED_TIME = 0,
LOAD_ERRORS_PAGE_EQUAL_TIME = 1,
LOAD_ERRORS_PAGE_ZERO_ENTRIES = 2,
LOAD_ERRORS_PAGE_UPDATE_ZERO = 3,
LOAD_ERRORS_PAGE_FLEXY_TIME = 4,
LOAD_ERRORS_DROPPED_EXTENT = 5,
} INVALID_PAGE_ID;
struct rrdengine_instance {
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
struct page_cache pg_cache;
uint8_t drop_metrics_under_page_cache_pressure; /* boolean */
uint8_t global_compress_alg;
struct transaction_commit_log commit_log;
struct rrdengine_datafile_list datafiles;
RRDHOST *host; /* the legacy host, or NULL for multi-host DB */
char dbfiles_path[FILENAME_MAX + 1];
char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */
uint64_t disk_space;
uint64_t max_disk_space;
int tier;
unsigned last_fileno; /* newest index of datafile and journalfile */
unsigned long max_cache_pages;
unsigned long cache_pages_low_watermark;
unsigned long metric_API_max_producers;
uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */
uint8_t page_type; /* Default page type for this context */
struct rrdengine_statistics stats;
struct {
size_t counter;
usec_t latest_end_time_ut;
} load_errors[6];
};
void *dbengine_page_alloc(void);
void dbengine_page_free(void *page);
int init_rrd_files(struct rrdengine_instance *ctx);
void finalize_rrd_files(struct rrdengine_instance *ctx);
void rrdeng_test_quota(struct rrdengine_worker_config* wc);
void rrdeng_worker(void* arg);
void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
#endif /* NETDATA_RRDENGINE_H */
|