summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.h
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.h')
-rw-r--r--database/engine/rrdengine.h569
1 files changed, 402 insertions, 167 deletions
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 521d2521a..492666815 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -19,202 +19,315 @@
#include "journalfile.h"
#include "rrdengineapi.h"
#include "pagecache.h"
-#include "rrdenglocking.h"
-
-#ifdef NETDATA_RRD_INTERNALS
-
-#endif /* NETDATA_RRD_INTERNALS */
+#include "metric.h"
+#include "cache.h"
+#include "pdc.h"
extern unsigned rrdeng_pages_per_extent;
/* Forward declarations */
struct rrdengine_instance;
+struct rrdeng_cmd;
#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */
#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
+typedef struct page_details_control {
+ struct rrdengine_instance *ctx;
+ struct metric *metric;
+
+ struct completion prep_completion;
+ struct completion page_completion; // sync between the query thread and the workers
+
+ Pvoid_t page_list_JudyL; // the list of page details
+ unsigned completed_jobs; // the number of jobs completed last time the query thread checked
+ bool workers_should_stop; // true when the query thread left and the workers should stop
+ bool prep_done;
+
+ SPINLOCK refcount_spinlock; // spinlock to protect refcount
+ int32_t refcount; // the number of workers currently working on this request + 1 for the query thread
+ size_t executed_with_gaps;
+
+ time_t start_time_s;
+ time_t end_time_s;
+ STORAGE_PRIORITY priority;
+
+ time_t optimal_end_time_s;
+} PDC;
+
+PDC *pdc_get(void);
+
+typedef enum __attribute__ ((__packed__)) {
+ // final status for all pages
+ // if a page does not have one of these, it is considered unroutable
+ PDC_PAGE_READY = (1 << 0), // ready to be processed (pd->page is not null)
+ PDC_PAGE_FAILED = (1 << 1), // failed to be loaded (pd->page is null)
+ PDC_PAGE_SKIP = (1 << 2), // don't use this page, it is not good for us
+ PDC_PAGE_INVALID = (1 << 3), // don't use this page, it is invalid
+ PDC_PAGE_EMPTY = (1 << 4), // the page is empty, does not have any data
+
+ // other statuses for tracking issues
+ PDC_PAGE_PREPROCESSED = (1 << 5), // used during preprocessing
+ PDC_PAGE_PROCESSED = (1 << 6), // processed by the query caller
+ PDC_PAGE_RELEASED = (1 << 7), // already released
+
+ // data found in cache (preloaded) or on disk?
+ PDC_PAGE_PRELOADED = (1 << 8), // data found in memory
+ PDC_PAGE_DISK_PENDING = (1 << 9), // data need to be loaded from disk
+
+ // worker related statuses
+ PDC_PAGE_FAILED_INVALID_EXTENT = (1 << 10),
+ PDC_PAGE_FAILED_NOT_IN_EXTENT = (1 << 11),
+ PDC_PAGE_FAILED_TO_MAP_EXTENT = (1 << 12),
+ PDC_PAGE_FAILED_TO_ACQUIRE_DATAFILE= (1 << 13),
+
+ PDC_PAGE_EXTENT_FROM_CACHE = (1 << 14),
+ PDC_PAGE_EXTENT_FROM_DISK = (1 << 15),
+
+ PDC_PAGE_CANCELLED = (1 << 16), // the query thread had left when we try to load the page
+
+ PDC_PAGE_SOURCE_MAIN_CACHE = (1 << 17),
+ PDC_PAGE_SOURCE_OPEN_CACHE = (1 << 18),
+ PDC_PAGE_SOURCE_JOURNAL_V2 = (1 << 19),
+ PDC_PAGE_PRELOADED_PASS4 = (1 << 20),
+
+ // datafile acquired
+ PDC_PAGE_DATAFILE_ACQUIRED = (1 << 30),
+} PDC_PAGE_STATUS;
+
+#define PDC_PAGE_QUERY_GLOBAL_SKIP_LIST (PDC_PAGE_FAILED | PDC_PAGE_SKIP | PDC_PAGE_INVALID | PDC_PAGE_RELEASED)
+
+struct page_details {
+ struct {
+ struct rrdengine_datafile *ptr;
+ uv_file file;
+ unsigned fileno;
+
+ struct {
+ uint64_t pos;
+ uint32_t bytes;
+ } extent;
+ } datafile;
+
+ struct pgc_page *page;
+ Word_t metric_id;
+ time_t first_time_s;
+ time_t last_time_s;
+ uint32_t update_every_s;
+ uint16_t page_length;
+ PDC_PAGE_STATUS status;
+
+ struct {
+ struct page_details *prev;
+ struct page_details *next;
+ } load;
+};
+
+struct page_details *page_details_get(void);
+
+#define pdc_page_status_check(pd, flag) (__atomic_load_n(&((pd)->status), __ATOMIC_ACQUIRE) & (flag))
+#define pdc_page_status_set(pd, flag) __atomic_or_fetch(&((pd)->status), flag, __ATOMIC_RELEASE)
+#define pdc_page_status_clear(pd, flag) __atomic_and_fetch(&((od)->status), ~(flag), __ATOMIC_RELEASE)
+
+struct jv2_extents_info {
+ size_t index;
+ uint64_t pos;
+ unsigned bytes;
+ size_t number_of_pages;
+};
+
+struct jv2_metrics_info {
+ uuid_t *uuid;
+ uint32_t page_list_header;
+ time_t first_time_s;
+ time_t last_time_s;
+ size_t number_of_pages;
+ Pvoid_t JudyL_pages_by_start_time;
+};
+
+struct jv2_page_info {
+ time_t start_time_s;
+ time_t end_time_s;
+ time_t update_every_s;
+ size_t page_length;
+ uint32_t extent_index;
+ void *custom_data;
+
+ // private
+ struct pgc_page *page;
+};
+
+typedef enum __attribute__ ((__packed__)) {
+ RRDENG_CHO_UNALIGNED = (1 << 0), // set when this metric is not page aligned according to page alignment
+ RRDENG_FIRST_PAGE_ALLOCATED = (1 << 1), // set when this metric has allocated its first page
+ RRDENG_1ST_METRIC_WRITER = (1 << 2),
+} RRDENG_COLLECT_HANDLE_OPTIONS;
+
+typedef enum __attribute__ ((__packed__)) {
+ RRDENG_PAGE_PAST_COLLECTION = (1 << 0),
+ RRDENG_PAGE_REPEATED_COLLECTION = (1 << 1),
+ RRDENG_PAGE_BIG_GAP = (1 << 2),
+ RRDENG_PAGE_GAP = (1 << 3),
+ RRDENG_PAGE_FUTURE_POINT = (1 << 4),
+ RRDENG_PAGE_CREATED_IN_FUTURE = (1 << 5),
+ RRDENG_PAGE_COMPLETED_IN_FUTURE = (1 << 6),
+ RRDENG_PAGE_UNALIGNED = (1 << 7),
+ RRDENG_PAGE_CONFLICT = (1 << 8),
+ RRDENG_PAGE_FULL = (1 << 9),
+ RRDENG_PAGE_COLLECT_FINALIZE = (1 << 10),
+ RRDENG_PAGE_UPDATE_EVERY_CHANGE = (1 << 11),
+ RRDENG_PAGE_STEP_TOO_SMALL = (1 << 12),
+ RRDENG_PAGE_STEP_UNALIGNED = (1 << 13),
+} RRDENG_COLLECT_PAGE_FLAGS;
+
struct rrdeng_collect_handle {
- struct pg_cache_page_index *page_index;
- struct rrdeng_page_descr *descr;
- unsigned long page_correlation_id;
- // set to 1 when this dimension is not page aligned with the other dimensions in the chart
- uint8_t unaligned_page;
+ struct metric *metric;
+ struct pgc_page *page;
struct pg_alignment *alignment;
+ RRDENG_COLLECT_HANDLE_OPTIONS options;
+ uint8_t type;
+ RRDENG_COLLECT_PAGE_FLAGS page_flags;
+ uint32_t page_entries_max;
+ uint32_t page_position; // keep track of the current page size, to make sure we don't exceed it
+ usec_t page_start_time_ut;
+ usec_t page_end_time_ut;
+ usec_t update_every_ut;
};
struct rrdeng_query_handle {
- struct rrdeng_page_descr *descr;
+ struct metric *metric;
+ struct pgc_page *page;
struct rrdengine_instance *ctx;
- struct pg_cache_page_index *page_index;
- time_t wanted_start_time_s;
+ storage_number *metric_data;
+ struct page_details_control *pdc;
+
+ // the request
+ time_t start_time_s;
+ time_t end_time_s;
+ STORAGE_PRIORITY priority;
+
+ // internal data
time_t now_s;
+ time_t dt_s;
+
unsigned position;
unsigned entries;
- storage_number *page;
- usec_t page_end_time_ut;
- uint32_t page_length;
- time_t dt_s;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ usec_t started_time_s;
+ pid_t query_pid;
+ struct rrdeng_query_handle *prev, *next;
+#endif
};
-typedef enum {
- RRDENGINE_STATUS_UNINITIALIZED = 0,
- RRDENGINE_STATUS_INITIALIZING,
- RRDENGINE_STATUS_INITIALIZED
-} rrdengine_state_t;
+struct rrdeng_query_handle *rrdeng_query_handle_get(void);
+void rrdeng_query_handle_release(struct rrdeng_query_handle *handle);
enum rrdeng_opcode {
/* can be used to return empty status or flush the command queue */
- RRDENG_NOOP = 0,
-
- RRDENG_READ_PAGE,
- RRDENG_READ_EXTENT,
- RRDENG_COMMIT_PAGE,
- RRDENG_FLUSH_PAGES,
- RRDENG_SHUTDOWN,
- RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE,
- RRDENG_QUIESCE,
-
- RRDENG_MAX_OPCODE
-};
-
-struct rrdeng_read_page {
- struct rrdeng_page_descr *page_cache_descr;
+ RRDENG_OPCODE_NOOP = 0,
+
+ RRDENG_OPCODE_QUERY,
+ RRDENG_OPCODE_EXTENT_WRITE,
+ RRDENG_OPCODE_EXTENT_READ,
+ RRDENG_OPCODE_FLUSHED_TO_OPEN,
+ RRDENG_OPCODE_DATABASE_ROTATE,
+ RRDENG_OPCODE_JOURNAL_INDEX,
+ RRDENG_OPCODE_FLUSH_INIT,
+ RRDENG_OPCODE_EVICT_INIT,
+ RRDENG_OPCODE_CTX_SHUTDOWN,
+ RRDENG_OPCODE_CTX_QUIESCE,
+ RRDENG_OPCODE_CTX_POPULATE_MRG,
+ RRDENG_OPCODE_CLEANUP,
+
+ RRDENG_OPCODE_MAX
};
-struct rrdeng_read_extent {
- struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
- int page_count;
-};
-
-struct rrdeng_cmd {
- enum rrdeng_opcode opcode;
- union {
- struct rrdeng_read_page read_page;
- struct rrdeng_read_extent read_extent;
- struct completion *completion;
- };
-};
-
-#define RRDENG_CMD_Q_MAX_SIZE (2048)
-
-struct rrdeng_cmdqueue {
- unsigned head, tail;
- struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE];
+// WORKERS IDS:
+// RRDENG_MAX_OPCODE : reserved for the cleanup
+// RRDENG_MAX_OPCODE + opcode : reserved for the callbacks of each opcode
+// RRDENG_MAX_OPCODE + RRDENG_MAX_OPCODE : reserved for the timer
+#define RRDENG_TIMER_CB (RRDENG_OPCODE_MAX + RRDENG_OPCODE_MAX)
+#define RRDENG_FLUSH_TRANSACTION_BUFFER_CB (RRDENG_TIMER_CB + 1)
+#define RRDENG_OPCODES_WAITING (RRDENG_TIMER_CB + 2)
+#define RRDENG_WORKS_DISPATCHED (RRDENG_TIMER_CB + 3)
+#define RRDENG_WORKS_EXECUTING (RRDENG_TIMER_CB + 4)
+
+struct extent_io_data {
+ unsigned fileno;
+ uv_file file;
+ uint64_t pos;
+ unsigned bytes;
+ uint16_t page_length;
};
struct extent_io_descriptor {
- uv_fs_t req;
- uv_work_t req_worker;
+ struct rrdengine_instance *ctx;
+ uv_fs_t uv_fs_request;
uv_buf_t iov;
uv_file file;
void *buf;
- void *map_base;
- size_t map_length;
+ struct wal *wal;
uint64_t pos;
unsigned bytes;
struct completion *completion;
unsigned descr_count;
- int release_descr;
- struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
- struct rrdeng_page_descr descr_read_array[MAX_PAGES_PER_EXTENT];
- Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ struct page_descr_with_data *descr_array[MAX_PAGES_PER_EXTENT];
+ struct rrdengine_datafile *datafile;
struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */
};
struct generic_io_descriptor {
+ struct rrdengine_instance *ctx;
uv_fs_t req;
uv_buf_t iov;
void *buf;
+ void *data;
uint64_t pos;
unsigned bytes;
struct completion *completion;
};
-struct extent_cache_element {
- struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */
- unsigned fileno;
- struct extent_cache_element *prev; /* LRU */
- struct extent_cache_element *next; /* LRU */
- struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */
- uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE];
-};
-
-#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */
-
-/* Initialize by setting the structure to zero */
-struct extent_cache {
- struct extent_cache_element extent_array[MAX_CACHED_EXTENTS];
- unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */
- unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */
-
- struct extent_cache_element *replaceQ_head; /* LRU */
- struct extent_cache_element *replaceQ_tail; /* MRU */
-};
-
-struct rrdengine_worker_config {
- struct rrdengine_instance *ctx;
-
- uv_thread_t thread;
- uv_loop_t* loop;
- uv_async_t async;
-
- /* file deletion thread */
- uv_thread_t *now_deleting_files;
- unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */
-
- /* dirty page deletion thread */
- uv_thread_t *now_invalidating_dirty_pages;
- /* set to 0 when now_invalidating_dirty_pages is still running */
- unsigned long cleanup_thread_invalidating_dirty_pages;
- unsigned inflight_dirty_pages;
-
- /* FIFO command queue */
- uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
- volatile unsigned queue_size;
- struct rrdeng_cmdqueue cmd_queue;
+typedef struct wal {
+ uint64_t transaction_id;
+ void *buf;
+ size_t size;
+ size_t buf_size;
+ struct generic_io_descriptor io_descr;
- struct extent_cache xt_cache;
+ struct {
+ struct wal *prev;
+ struct wal *next;
+ } cache;
+} WAL;
- int error;
-};
+WAL *wal_get(struct rrdengine_instance *ctx, unsigned size);
+void wal_release(WAL *wal);
/*
* Debug statistics not used by code logic.
* They only describe operations since DB engine instance load time.
*/
struct rrdengine_statistics {
- rrdeng_stats_t metric_API_producers;
- rrdeng_stats_t metric_API_consumers;
- rrdeng_stats_t pg_cache_insertions;
- rrdeng_stats_t pg_cache_deletions;
- rrdeng_stats_t pg_cache_hits;
- rrdeng_stats_t pg_cache_misses;
- rrdeng_stats_t pg_cache_backfills;
- rrdeng_stats_t pg_cache_evictions;
rrdeng_stats_t before_decompress_bytes;
rrdeng_stats_t after_decompress_bytes;
rrdeng_stats_t before_compress_bytes;
rrdeng_stats_t after_compress_bytes;
+
rrdeng_stats_t io_write_bytes;
rrdeng_stats_t io_write_requests;
rrdeng_stats_t io_read_bytes;
rrdeng_stats_t io_read_requests;
- rrdeng_stats_t io_write_extent_bytes;
- rrdeng_stats_t io_write_extents;
- rrdeng_stats_t io_read_extent_bytes;
- rrdeng_stats_t io_read_extents;
+
rrdeng_stats_t datafile_creations;
rrdeng_stats_t datafile_deletions;
rrdeng_stats_t journalfile_creations;
rrdeng_stats_t journalfile_deletions;
- rrdeng_stats_t page_cache_descriptors;
+
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
- rrdeng_stats_t pg_cache_over_half_dirty_events;
- rrdeng_stats_t flushing_pressure_page_deletions;
};
/* I/O errors global counter */
@@ -227,57 +340,179 @@ extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
extern rrdeng_stats_t global_pg_cache_over_half_dirty_events;
extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */
-#define NO_QUIESCE (0) /* initial state when all operations function normally */
-#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */
-#define QUIESCED (2) /* is set after all threads have finished running */
+struct rrdengine_instance {
+ struct {
+ bool legacy; // true when the db is autonomous for a single host
-typedef enum {
- LOAD_ERRORS_PAGE_FLIPPED_TIME = 0,
- LOAD_ERRORS_PAGE_EQUAL_TIME = 1,
- LOAD_ERRORS_PAGE_ZERO_ENTRIES = 2,
- LOAD_ERRORS_PAGE_UPDATE_ZERO = 3,
- LOAD_ERRORS_PAGE_FLEXY_TIME = 4,
- LOAD_ERRORS_DROPPED_EXTENT = 5,
-} INVALID_PAGE_ID;
+ int tier; // the tier of this ctx
+ uint8_t page_type; // default page type for this context
-struct rrdengine_instance {
- struct rrdengine_worker_config worker_config;
- struct completion rrdengine_completion;
- struct page_cache pg_cache;
- uint8_t drop_metrics_under_page_cache_pressure; /* boolean */
- uint8_t global_compress_alg;
- struct transaction_commit_log commit_log;
- struct rrdengine_datafile_list datafiles;
- RRDHOST *host; /* the legacy host, or NULL for multi-host DB */
- char dbfiles_path[FILENAME_MAX + 1];
- char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */
- uint64_t disk_space;
- uint64_t max_disk_space;
- int tier;
- unsigned last_fileno; /* newest index of datafile and journalfile */
- unsigned long max_cache_pages;
- unsigned long cache_pages_low_watermark;
- unsigned long metric_API_max_producers;
-
- uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */
- uint8_t page_type; /* Default page type for this context */
+ uint64_t max_disk_space; // the max disk space this ctx is allowed to use
+ uint8_t global_compress_alg; // the wanted compression algorithm
- struct rrdengine_statistics stats;
+ char dbfiles_path[FILENAME_MAX + 1];
+ } config;
struct {
- size_t counter;
- usec_t latest_end_time_ut;
- } load_errors[6];
+ uv_rwlock_t rwlock; // the linked list of datafiles is protected by this lock
+ struct rrdengine_datafile *first; // oldest - the newest with ->first->prev
+ } datafiles;
+
+ struct {
+ unsigned last_fileno; // newest index of datafile and journalfile
+ unsigned last_flush_fileno; // newest index of datafile received data
+
+ size_t collectors_running;
+ size_t collectors_running_duplicate;
+ size_t inflight_queries; // the number of queries currently running
+ uint64_t current_disk_space; // the current disk space size used
+
+ uint64_t transaction_id; // the transaction id of the next extent flushing
+
+ bool migration_to_v2_running;
+ bool now_deleting_files;
+ unsigned extents_currently_being_flushed; // non-zero until we commit data to disk (both datafile and journal file)
+ } atomic;
+
+ struct {
+ bool exit_mode;
+ bool enabled; // when set (before shutdown), queries are prohibited
+ struct completion completion;
+ } quiesce;
+
+ struct {
+ struct {
+ size_t size;
+ struct completion *array;
+ } populate_mrg;
+
+ bool create_new_datafile_pair;
+ } loading;
+
+ struct rrdengine_statistics stats;
};
-void *dbengine_page_alloc(void);
-void dbengine_page_free(void *page);
+#define ctx_current_disk_space_get(ctx) __atomic_load_n(&(ctx)->atomic.current_disk_space, __ATOMIC_RELAXED)
+#define ctx_current_disk_space_increase(ctx, size) __atomic_add_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED)
+#define ctx_current_disk_space_decrease(ctx, size) __atomic_sub_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED)
+
+static inline void ctx_io_read_op_bytes(struct rrdengine_instance *ctx, size_t bytes) {
+ __atomic_add_fetch(&ctx->stats.io_read_bytes, bytes, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.io_read_requests, 1, __ATOMIC_RELAXED);
+}
+
+static inline void ctx_io_write_op_bytes(struct rrdengine_instance *ctx, size_t bytes) {
+ __atomic_add_fetch(&ctx->stats.io_write_bytes, bytes, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.io_write_requests, 1, __ATOMIC_RELAXED);
+}
+static inline void ctx_io_error(struct rrdengine_instance *ctx) {
+ __atomic_add_fetch(&ctx->stats.io_errors, 1, __ATOMIC_RELAXED);
+ rrd_stat_atomic_add(&global_io_errors, 1);
+}
+
+static inline void ctx_fs_error(struct rrdengine_instance *ctx) {
+ __atomic_add_fetch(&ctx->stats.fs_errors, 1, __ATOMIC_RELAXED);
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+}
+
+#define ctx_last_fileno_get(ctx) __atomic_load_n(&(ctx)->atomic.last_fileno, __ATOMIC_RELAXED)
+#define ctx_last_fileno_increment(ctx) __atomic_add_fetch(&(ctx)->atomic.last_fileno, 1, __ATOMIC_RELAXED)
+
+#define ctx_last_flush_fileno_get(ctx) __atomic_load_n(&(ctx)->atomic.last_flush_fileno, __ATOMIC_RELAXED)
+static inline void ctx_last_flush_fileno_set(struct rrdengine_instance *ctx, unsigned fileno) {
+ unsigned old_fileno = ctx_last_flush_fileno_get(ctx);
+
+ do {
+ if(old_fileno >= fileno)
+ return;
+
+ } while(!__atomic_compare_exchange_n(&ctx->atomic.last_flush_fileno, &old_fileno, fileno, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+}
+
+#define ctx_is_available_for_queries(ctx) (__atomic_load_n(&(ctx)->quiesce.enabled, __ATOMIC_RELAXED) == false && __atomic_load_n(&(ctx)->quiesce.exit_mode, __ATOMIC_RELAXED) == false)
+
+void *dbengine_page_alloc(size_t size);
+void dbengine_page_free(void *page, size_t size);
+
+void *dbengine_extent_alloc(size_t size);
+void dbengine_extent_free(void *extent, size_t size);
+
+bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx);
int init_rrd_files(struct rrdengine_instance *ctx);
void finalize_rrd_files(struct rrdengine_instance *ctx);
-void rrdeng_test_quota(struct rrdengine_worker_config* wc);
-void rrdeng_worker(void* arg);
-void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
-struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
+bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx);
+void dbengine_event_loop(void *arg);
+
+typedef void (*enqueue_callback_t)(struct rrdeng_cmd *cmd);
+typedef void (*dequeue_callback_t)(struct rrdeng_cmd *cmd);
+
+void rrdeng_enqueue_epdl_cmd(struct rrdeng_cmd *cmd);
+void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd);
+
+typedef struct rrdeng_cmd *(*requeue_callback_t)(void *data);
+void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority);
+
+void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data,
+ struct completion *completion, enum storage_priority priority,
+ enqueue_callback_t enqueue_cb, dequeue_callback_t dequeue_cb);
+
+void pdc_route_asynchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc);
+void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc);
+
+void pdc_acquire(PDC *pdc);
+bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router);
+
+unsigned rrdeng_target_data_file_size(struct rrdengine_instance *ctx);
+
+struct page_descr_with_data *page_descriptor_get(void);
+
+typedef struct validated_page_descriptor {
+ time_t start_time_s;
+ time_t end_time_s;
+ time_t update_every_s;
+ size_t page_length;
+ size_t point_size;
+ size_t entries;
+ uint8_t type;
+ bool is_valid;
+} VALIDATED_PAGE_DESCRIPTOR;
+
+#define DBENGINE_EMPTY_PAGE (void *)(-1)
+
+#define page_entries_by_time(start_time_s, end_time_s, update_every_s) \
+ ((update_every_s) ? (((end_time_s) - ((start_time_s) - (update_every_s))) / (update_every_s)) : 1)
+
+#define page_entries_by_size(page_length_in_bytes, point_size_in_bytes) \
+ ((page_length_in_bytes) / (point_size_in_bytes))
+
+VALIDATED_PAGE_DESCRIPTOR validate_page(uuid_t *uuid,
+ time_t start_time_s,
+ time_t end_time_s,
+ time_t update_every_s,
+ size_t page_length,
+ uint8_t page_type,
+ size_t entries,
+ time_t now_s,
+ time_t overwrite_zero_update_every_s,
+ bool have_read_error,
+ const char *msg,
+ RRDENG_COLLECT_PAGE_FLAGS flags);
+VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error);
+void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags);
+
+typedef enum {
+ PAGE_IS_IN_THE_PAST = -1,
+ PAGE_IS_IN_RANGE = 0,
+ PAGE_IS_IN_THE_FUTURE = 1,
+} TIME_RANGE_COMPARE;
+
+TIME_RANGE_COMPARE is_page_in_time_range(time_t page_first_time_s, time_t page_last_time_s, time_t wanted_start_time_s, time_t wanted_end_time_s);
+
+static inline time_t max_acceptable_collected_time(void) {
+ return now_realtime_sec() + 1;
+}
+
+void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool update_retention, bool worker);
#endif /* NETDATA_RRDENGINE_H */