summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/README.md40
-rw-r--r--database/engine/datafile.c224
-rw-r--r--database/engine/datafile.h7
-rw-r--r--database/engine/journalfile.c117
-rw-r--r--database/engine/journalfile.h2
-rw-r--r--database/engine/pagecache.c317
-rw-r--r--database/engine/pagecache.h89
-rw-r--r--database/engine/rrdengine.c234
-rw-r--r--database/engine/rrdengine.h22
-rw-r--r--database/engine/rrdengineapi.c277
-rw-r--r--database/engine/rrdengineapi.h12
-rw-r--r--database/engine/rrdenginelib.c97
-rw-r--r--database/engine/rrdenginelib.h21
-rw-r--r--database/engine/rrdenglocking.c233
-rw-r--r--database/engine/rrdenglocking.h17
15 files changed, 1254 insertions, 455 deletions
diff --git a/database/engine/README.md b/database/engine/README.md
index 28a2528cb..adc69ffd7 100644
--- a/database/engine/README.md
+++ b/database/engine/README.md
@@ -96,9 +96,9 @@ There are explicit memory requirements **per** DB engine **instance**, meaning *
- `page cache size` must be at least `#dimensions-being-collected x 4096 x 2` bytes.
-- an additional `#pages-on-disk x 4096 x 0.06` bytes of RAM are allocated for metadata.
+- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata.
- - roughly speaking this is 6% of the uncompressed disk space taken by the DB files.
+ - roughly speaking this is 3% of the uncompressed disk space taken by the DB files.
- for very highly compressible data (compression ratio > 90%) this RAM overhead
is comparable to the disk space footprint.
@@ -106,4 +106,40 @@ There are explicit memory requirements **per** DB engine **instance**, meaning *
An important observation is that RAM usage depends on both the `page cache size` and the
`dbengine disk space` options.
+## File descriptor requirements
+
+The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming
+slave or master server). When configuring your system you should make sure there are at least 50
+file descriptors available per `dbengine` instance.
+
+Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25%
+of the file descriptors that are available to the Netdata service are accessible by dbengine instances.
+You should take that into account when configuring your service
+or system-wide file descriptor limits. You can roughly estimate that the netdata service needs 2048 file
+descriptors for every 10 streaming slave hosts when streaming is configured to use `memory mode = dbengine`.
+
+If for example one wants to allocate 65536 file descriptors to the netdata service on a systemd system
+one needs to override the netdata service by running `sudo systemctl edit netdata` and creating a
+file with contents:
+
+```
+[Service]
+LimitNOFILE=65536
+```
+
+For other types of services one can add the line:
+```
+ulimit -n 65536
+```
+at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing `/etc/sysctl.conf`. For linux that would be:
+```
+fs.file-max = 65536
+```
+In FreeBSD and OS X you change the lines like this:
+```
+kern.maxfilesperproc=65536
+kern.maxfiles=65536
+```
+You can apply the settings by running `sysctl -p` or by rebooting.
+
[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 2d17d05e4..8ef4ed599 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->ctx = ctx;
}
-static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
+int close_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+
int destroy_data_file(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
- int ret, fd;
- char path[1024];
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
- fatal("uv_fs_close: %s", uv_strerror(ret));
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
- generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_unlink(NULL, &req, path, NULL);
- if (fd < 0) {
- fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.datafile_deletions;
- return 0;
+ return ret;
}
int create_data_file(struct rrdengine_datafile *datafile)
@@ -97,21 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile)
int ret, fd;
struct rrdeng_df_sb *superblock;
uv_buf_t iov;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
- S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
+ datafile->file = file;
+ ++ctx->stats.datafile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -125,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_write: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
- fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
+ if (ret < 0) {
+ destroy_data_file(datafile);
+ return ret;
+ }
- datafile->file = file;
datafile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
- ++ctx->stats.datafile_creations;
return 0;
}
@@ -182,25 +205,17 @@ static int load_data_file(struct rrdengine_datafile *datafile)
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
uv_file file;
- int ret, fd;
+ int ret, fd, error;
uint64_t file_size;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- /* if (UV_ENOENT != fd) */
- error("uv_fs_fsopen: %s", uv_strerror(fd));
- uv_fs_req_cleanup(&req);
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
info("Initializing data file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
@@ -221,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile)
return 0;
error:
- (void) uv_fs_close(NULL, &req, file, NULL);
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
uv_fs_req_cleanup(&req);
- return ret;
+ return error;
}
static int scan_data_files_cmp(const void *a, const void *b)
{
struct rrdengine_datafile *file1, *file2;
- char path1[1024], path2[1024];
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
file1 = *(struct rrdengine_datafile **)a;
file2 = *(struct rrdengine_datafile **)b;
@@ -238,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b)
return strcmp(path1, path2);
}
-/* Returns number of datafiles that were loaded */
+/* Returns number of datafiles that were loaded or < 0 on error */
static int scan_data_files(struct rrdengine_instance *ctx)
{
int ret;
@@ -249,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx)
struct rrdengine_journalfile *journalfile;
ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
- assert(ret >= 0);
- assert(req.result >= 0);
+ if (ret < 0) {
+ assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
info("Found %d files in path %s", ret, ctx->dbfiles_path);
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
- info("Scanning file \"%s\"", dent.name);
+ info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
- info("Matched file \"%s\"", dent.name);
+ info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
@@ -266,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
uv_fs_req_cleanup(&req);
+ if (0 == matched_files) {
+ freez(datafiles);
+ return 0;
+ }
if (matched_files == MAX_DATAFILES) {
error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
}
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ /* TODO: change this when tiering is implemented */
+ ctx->last_fileno = datafiles[matched_files - 1]->fileno;
+
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
datafile = datafiles[i];
ret = load_data_file(datafile);
if (0 != ret) {
- free(datafile);
+ freez(datafile);
++failed_to_load;
- continue;
+ break;
}
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
- free(datafile);
- free(journalfile);
+ close_data_file(datafile);
+ freez(datafile);
+ freez(journalfile);
++failed_to_load;
- continue;
+ break;
}
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
}
+ freez(datafiles);
if (failed_to_load) {
- error("%u files failed to load.", failed_to_load);
+ error("%u datafiles failed to load.", failed_to_load);
+ finalize_data_files(ctx);
+ return UV_EIO;
}
- free(datafiles);
- return matched_files - failed_to_load;
+ return matched_files;
}
/* Creates a datafile and a journalfile pair */
-void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
{
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
int ret;
+ char path[RRDENG_PATH_MAX];
- info("Creating new data and journal files.");
+ info("Creating new data and journal files in path %s", ctx->dbfiles_path);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, fileno);
ret = create_data_file(datafile);
- assert(!ret);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Created data file \"%s\".", path);
+ } else {
+ goto error_after_datafile;
+ }
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = create_journal_file(journalfile, datafile);
- assert(!ret);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Created journal file \"%s\".", path);
+ } else {
+ goto error_after_journalfile;
+ }
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
+
+ return 0;
+
+error_after_journalfile:
+ destroy_data_file(datafile);
+ freez(journalfile);
+error_after_datafile:
+ freez(datafile);
+ return ret;
}
-/* Page cache must already be initialized. */
+/* Page cache must already be initialized.
+ * Return 0 on success.
+ */
int init_data_files(struct rrdengine_instance *ctx)
{
int ret;
ret = scan_data_files(ctx);
- if (0 == ret) {
- info("Data files not found, creating.");
- create_new_datafile_pair(ctx, 1, 1);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ } else if (0 == ret) {
+ info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
+ ret = create_new_datafile_pair(ctx, 1, 1);
+ if (ret) {
+ error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ }
+ ctx->last_fileno = 1;
}
+
return 0;
+}
+
+void finalize_data_files(struct rrdengine_instance *ctx)
+{
+ struct rrdengine_datafile *datafile, *next_datafile;
+ struct rrdengine_journalfile *journalfile;
+ struct extent_info *extent, *next_extent;
+
+ for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
+ journalfile = datafile->journalfile;
+ next_datafile = datafile->next;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
+ next_extent = extent->next;
+ freez(extent);
+ }
+ close_journal_file(journalfile, datafile);
+ close_data_file(datafile);
+ freez(journalfile);
+ freez(datafile);
+
+ }
} \ No newline at end of file
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index c5c8f31f3..eeb11310b 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -26,7 +26,7 @@ struct extent_info {
uint8_t number_of_pages;
struct rrdengine_datafile *datafile;
struct extent_info *next;
- struct rrdeng_page_cache_descr *pages[];
+ struct rrdeng_page_descr *pages[];
};
struct rrdengine_df_extents {
@@ -55,9 +55,12 @@ struct rrdengine_datafile_list {
extern void df_extent_insert(struct extent_info *extent);
extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+extern int close_data_file(struct rrdengine_datafile *datafile);
extern int destroy_data_file(struct rrdengine_datafile *datafile);
extern int create_data_file(struct rrdengine_datafile *datafile);
-extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
extern int init_data_files(struct rrdengine_instance *ctx);
+extern void finalize_data_files(struct rrdengine_instance *ctx);
#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 44d8461db..30eaa0ec6 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
free(io_descr->buf);
- free(io_descr);
+ freez(io_descr);
}
/* Careful to always call this before creating a new journal file */
@@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
-static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
@@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->datafile = datafile;
}
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
- int ret, fd;
- char path[1024];
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- fatal("uv_fs_close: %s", uv_strerror(ret));
- exit(ret);
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
- generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_unlink(NULL, &req, path, NULL);
- if (fd < 0) {
- fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
- return 0;
+ return ret;
}
int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
@@ -143,21 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
int ret, fd;
struct rrdeng_jf_sb *superblock;
uv_buf_t iov;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
- S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
+ journalfile->file = file;
+ ++ctx->stats.journalfile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -170,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_write: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
- fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
+ if (ret < 0) {
+ destroy_journal_file(journalfile, datafile);
+ return ret;
+ }
- journalfile->file = file;
journalfile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
- ++ctx->stats.journalfile_creations;
return 0;
}
@@ -226,7 +247,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
{
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned i, count, payload_length, descr_size, valid_pages;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
struct extent_info *extent;
/* persistent structures */
struct rrdeng_jf_store_data *jf_metric_data;
@@ -271,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
@@ -406,25 +429,17 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
{
uv_fs_t req;
uv_file file;
- int ret, fd;
+ int ret, fd, error;
uint64_t file_size, max_id;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- /* if (UV_ENOENT != fd) */
- error("uv_fs_fsopen: %s", uv_strerror(fd));
- uv_fs_req_cleanup(&req);
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
info("Loading journal file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
@@ -449,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
return 0;
error:
- (void) uv_fs_close(NULL, &req, file, NULL);
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
uv_fs_req_cleanup(&req);
- return ret;
+ return error;
}
void init_commit_log(struct rrdengine_instance *ctx)
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index 50489aeee..0df66304d 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -33,9 +33,11 @@ struct transaction_commit_log {
unsigned buf_size;
};
+extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
+extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index c90947a67..124f2448b 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -8,28 +8,29 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
/* always inserts into tail */
static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
if (likely(NULL != pg_cache->replaceQ.tail)) {
- descr->prev = pg_cache->replaceQ.tail;
- pg_cache->replaceQ.tail->next = descr;
+ pg_cache_descr->prev = pg_cache->replaceQ.tail;
+ pg_cache->replaceQ.tail->next = pg_cache_descr;
}
if (unlikely(NULL == pg_cache->replaceQ.head)) {
- pg_cache->replaceQ.head = descr;
+ pg_cache->replaceQ.head = pg_cache_descr;
}
- pg_cache->replaceQ.tail = descr;
+ pg_cache->replaceQ.tail = pg_cache_descr;
}
static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *prev, *next;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
- prev = descr->prev;
- next = descr->next;
+ prev = pg_cache_descr->prev;
+ next = pg_cache_descr->next;
if (likely(NULL != prev)) {
prev->next = next;
@@ -37,17 +38,17 @@ static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ct
if (likely(NULL != next)) {
next->prev = prev;
}
- if (unlikely(descr == pg_cache->replaceQ.head)) {
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
pg_cache->replaceQ.head = next;
}
- if (unlikely(descr == pg_cache->replaceQ.tail)) {
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
pg_cache->replaceQ.tail = prev;
}
- descr->prev = descr->next = NULL;
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
}
void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -57,7 +58,7 @@ void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
}
void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -66,7 +67,7 @@ void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -76,40 +77,28 @@ void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
-struct rrdeng_page_cache_descr *pg_cache_create_descr(void)
+struct rrdeng_page_descr *pg_cache_create_descr(void)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
descr = mallocz(sizeof(*descr));
- descr->page = NULL;
descr->page_length = 0;
descr->start_time = INVALID_TIME;
descr->end_time = INVALID_TIME;
descr->id = NULL;
descr->extent = NULL;
- descr->flags = 0;
- descr->prev = descr->next = descr->private = NULL;
- descr->refcnt = 0;
- descr->waiters = 0;
- descr->handle = NULL;
- assert(0 == uv_cond_init(&descr->cond));
- assert(0 == uv_mutex_init(&descr->mutex));
+ descr->pg_cache_descr_state = 0;
+ descr->pg_cache_descr = NULL;
return descr;
}
-void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr)
-{
- uv_cond_destroy(&descr->cond);
- uv_mutex_destroy(&descr->mutex);
- free(descr);
-}
-
/* The caller must hold page descriptor lock. */
-void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
{
- if (descr->waiters)
- uv_cond_broadcast(&descr->cond);
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->waiters)
+ uv_cond_broadcast(&pg_cache_descr->cond);
}
/*
@@ -117,11 +106,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
-void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
{
- ++descr->waiters;
- uv_cond_wait(&descr->cond, &descr->mutex);
- --descr->waiters;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
+ --pg_cache_descr->waiters;
}
/*
@@ -129,14 +120,15 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
-unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
+unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
unsigned long flags;
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_wait_event_unsafe(descr);
- flags = descr->flags;
- uv_mutex_unlock(&descr->mutex);
+ flags = pg_cache_descr->flags;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
return flags;
}
@@ -146,15 +138,17 @@ unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
* Gets a reference to the page descriptor.
* Returns 1 on success and 0 on failure.
*/
-int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
- if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && descr->refcnt)) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
if (exclusive_access)
- descr->flags |= RRD_PAGE_LOCKED;
- ++descr->refcnt;
+ pg_cache_descr->flags |= RRD_PAGE_LOCKED;
+ ++pg_cache_descr->refcnt;
return 1;
}
@@ -163,10 +157,12 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive
* The caller must hold page descriptor lock.
* Same return values as pg_cache_try_get_unsafe() without doing anything.
*/
-int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
- if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && descr->refcnt)) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
@@ -177,23 +173,24 @@ int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive
* The caller must hold the page descriptor lock.
* This function may block doing cleanup.
*/
-void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
{
- descr->flags &= ~RRD_PAGE_LOCKED;
- if (0 == --descr->refcnt) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ if (0 == --pg_cache_descr->refcnt) {
pg_cache_wake_up_waiters_unsafe(descr);
}
- /* TODO: perform cleanup */
}
/*
* This function may block doing cleanup.
*/
-void pg_cache_put(struct rrdeng_page_cache_descr *descr)
+void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_put_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
/* The caller must hold the page cache lock */
@@ -224,7 +221,7 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1)
- debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================",
+ debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
number);
while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
@@ -266,7 +263,7 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) {
debug(D_RRDENGINE,
- "=================================\nPage cache full. Trying to reserve %u pages.\n=================================",
+ "==Page cache full. Trying to reserve %u pages.==",
number);
do {
if (!pg_cache_try_evict_one_page_unsafe(ctx))
@@ -286,18 +283,20 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n
}
/* The caller must hold the page cache and the page descriptor locks in that order */
-static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
- free(descr->page);
- descr->page = NULL;
- descr->flags &= ~RRD_PAGE_POPULATED;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ freez(pg_cache_descr->page);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
pg_cache_release_pages_unsafe(ctx, 1);
++ctx->stats.pg_cache_evictions;
}
/*
* The caller must hold the page cache lock.
- * Lock order: page cache -> replaceQ -> descriptor
+ * Lock order: page cache -> replaceQ -> page descriptor
* This function iterates all pages and tries to evict one.
* If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
* or it sets it to NULL if no write-back is in progress.
@@ -308,36 +307,40 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
{
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned long old_flags;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr = NULL;
uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) {
- uv_mutex_lock(&descr->mutex);
- old_flags = descr->flags;
+ for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
+ descr = pg_cache_descr->descr;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ old_flags = pg_cache_descr->flags;
if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
/* must evict */
pg_cache_evict_unsafe(ctx, descr);
pg_cache_put_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
pg_cache_replaceQ_delete_unsafe(ctx, descr);
+
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+
return 1;
}
- uv_mutex_unlock(&descr->mutex);
- };
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
/* failed to evict */
return 0;
}
-/*
- * TODO: last waiter frees descriptor ?
- */
-void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty)
{
struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = NULL;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
int ret;
@@ -353,8 +356,9 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
uv_rwlock_wrunlock(&page_index->lock);
if (unlikely(0 == ret)) {
error("Page under deletion was not in index.");
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ if (unlikely(debug_flags & D_RRDENGINE)) {
+ print_page_descr(descr);
+ }
goto destroy;
}
assert(1 == ret);
@@ -364,23 +368,26 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
--pg_cache->page_descriptors;
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
while (!pg_cache_try_get_unsafe(descr, 1)) {
debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
- }
- /* even a locked page could be dirty */
- while (unlikely(descr->flags & RRD_PAGE_DIRTY)) {
- debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
pg_cache_wait_event_unsafe(descr);
}
- uv_mutex_unlock(&descr->mutex);
+ if (!remove_dirty) {
+ /* even a locked page could be dirty */
+ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
+ debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
- if (descr->flags & RRD_PAGE_POPULATED) {
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
/* only after locking can it be safely deleted from LRU */
pg_cache_replaceQ_delete(ctx, descr);
@@ -388,13 +395,15 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
pg_cache_evict_unsafe(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
+
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
destroy:
- pg_cache_destroy_descr(descr);
+ freez(descr);
pg_cache_update_metric_times(page_index);
}
-static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time)
+static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
{
usec_t pg_start, pg_end;
@@ -405,13 +414,13 @@ static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, u
(pg_start >= start_time && pg_start <= end_time);
}
-static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time)
+static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
{
return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
}
/* Update metric oldest and latest timestamps efficiently when adding new values */
-void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr)
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
{
usec_t oldest_time = page_index->oldest_time;
usec_t latest_time = page_index->latest_time;
@@ -429,7 +438,7 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
{
Pvoid_t *firstPValue, *lastPValue;
Word_t firstIndex, lastIndex;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
usec_t oldest_time = INVALID_TIME;
usec_t latest_time = INVALID_TIME;
@@ -460,16 +469,23 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
/* If index is NULL lookup by UUID (descr->id) */
void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
+ unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
+
+ if (0 != pg_cache_descr_state) {
+ /* there is page cache descriptor pre-allocated state */
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- if (descr->flags & RRD_PAGE_POPULATED) {
- pg_cache_reserve_pages(ctx, 1);
- if (!(descr->flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_insert(ctx, descr);
+ assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ pg_cache_reserve_pages(ctx, 1);
+ if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_insert(ctx, descr);
+ }
}
if (unlikely(NULL == index)) {
@@ -503,7 +519,8 @@ struct pg_cache_page_index *
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ struct page_cache_descr *pg_cache_descr = NULL;
int i, j, k, count, found;
unsigned long flags;
Pvoid_t *PValue;
@@ -557,12 +574,13 @@ struct pg_cache_page_index *
if (unlikely(0 == descr->page_length))
continue;
- uv_mutex_lock(&descr->mutex);
- flags = descr->flags;
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
if (pg_cache_can_get_unsafe(descr, 0)) {
if (flags & RRD_PAGE_POPULATED) {
/* success */
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
continue;
}
@@ -570,19 +588,19 @@ struct pg_cache_page_index *
if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
preload_array[count++] = descr;
if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
break;
}
}
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
- };
+ }
uv_rwlock_rdunlock(&page_index->lock);
failed_to_reserve = 0;
for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
struct rrdeng_cmd cmd;
- struct rrdeng_page_cache_descr *next;
+ struct rrdeng_page_descr *next;
descr = preload_array[i];
if (NULL == descr) {
@@ -622,7 +640,7 @@ struct pg_cache_page_index *
if (NULL == descr) {
continue;
}
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
}
if (!count) {
@@ -637,12 +655,13 @@ struct pg_cache_page_index *
* When point_in_time is INVALID_TIME get any page.
* If index is NULL lookup by UUID (id).
*/
-struct rrdeng_page_cache_descr *
+struct rrdeng_page_descr *
pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
usec_t point_in_time)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *descr = NULL;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
unsigned long flags;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
@@ -682,11 +701,12 @@ struct rrdeng_page_cache_descr *
pg_cache_release_pages(ctx, 1);
return NULL;
}
- uv_mutex_lock(&descr->mutex);
- flags = descr->flags;
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
/* success */
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
break;
}
@@ -702,14 +722,14 @@ struct rrdeng_page_cache_descr *
debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
if(unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
- while (!(descr->flags & RRD_PAGE_POPULATED)) {
+ while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
pg_cache_wait_event_unsafe(descr);
}
/* success */
/* Downgrade exclusive reference to allow other readers */
- descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
pg_cache_wake_up_waiters_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
return descr;
}
@@ -720,7 +740,7 @@ struct rrdeng_page_cache_descr *
if (!(flags & RRD_PAGE_POPULATED))
page_not_in_cache = 1;
pg_cache_wait_event_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
/* reset scan to find again */
uv_rwlock_rdlock(&page_index->lock);
@@ -747,6 +767,7 @@ struct pg_cache_page_index *create_page_index(uuid_t *id)
assert(0 == uv_rwlock_init(&page_index->lock));
page_index->oldest_time = INVALID_TIME;
page_index->latest_time = INVALID_TIME;
+ page_index->prev = NULL;
return page_index;
}
@@ -756,6 +777,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx)
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
+ pg_cache->metrics_index.last_page_index = NULL;
assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
}
@@ -789,4 +811,65 @@ void init_page_cache(struct rrdengine_instance *ctx)
init_metrics_index(ctx);
init_replaceQ(ctx);
init_commited_page_index(ctx);
+}
+
+void free_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Word_t ret_Judy, bytes_freed = 0;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index, *prev_page_index;
+ Word_t Index;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Free commited page index */
+ ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0);
+ assert(NULL == pg_cache->commited_page_index.JudyL_array);
+ bytes_freed += ret_Judy;
+
+ for (page_index = pg_cache->metrics_index.last_page_index ;
+ page_index != NULL ;
+ page_index = prev_page_index) {
+ prev_page_index = page_index->prev;
+
+ /* Find first page in range */
+ Index = (Word_t) 0;
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ while (descr != NULL) {
+ /* Iterate all page descriptors of this metric */
+
+ if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ /* Check rrdenglocking.c */
+ pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ freez(pg_cache_descr->page);
+ bytes_freed += RRDENG_BLOCK_SIZE;
+ }
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ bytes_freed += sizeof(*pg_cache_descr);
+ }
+ freez(descr);
+ bytes_freed += sizeof(*descr);
+
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ }
+
+ /* Free page index */
+ ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
+ assert(NULL == page_index->JudyL_array);
+ bytes_freed += ret_Judy;
+ freez(page_index);
+ bytes_freed += sizeof(*page_index);
+ }
+ /* Free metrics index */
+ ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
+ assert(NULL == pg_cache->metrics_index.JudyHS_array);
+ bytes_freed += ret_Judy;
+
+ info("Freed %lu bytes of memory from page cache.", bytes_freed);
} \ No newline at end of file
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index d1e29aaab..b5670f82a 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -5,9 +5,10 @@
#include "rrdengine.h"
-/* Forward declerations */
+/* Forward declarations */
struct rrdengine_instance;
struct extent_info;
+struct rrdeng_page_descr;
#define INVALID_TIME (0)
@@ -18,24 +19,46 @@ struct extent_info;
#define RRD_PAGE_WRITE_PENDING (1LU << 3)
#define RRD_PAGE_POPULATED (1LU << 4)
-struct rrdeng_page_cache_descr {
+struct page_cache_descr {
+ struct rrdeng_page_descr *descr; /* parent descriptor */
void *page;
- uint32_t page_length;
- usec_t start_time;
- usec_t end_time;
- uuid_t *id; /* never changes */
- struct extent_info *extent;
unsigned long flags;
- void *private;
- struct rrdeng_page_cache_descr *prev;
- struct rrdeng_page_cache_descr *next;
+ struct page_cache_descr *prev; /* LRU */
+ struct page_cache_descr *next; /* LRU */
- /* TODO: move waiter logic to concurrency table */
unsigned refcnt;
uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */
uv_cond_t cond;
unsigned waiters;
- struct rrdeng_collect_handle *handle; /* API user */
+};
+
+/* Page cache descriptor flags, state = 0 means no descriptor */
+#define PG_CACHE_DESCR_ALLOCATED (1LU << 0)
+#define PG_CACHE_DESCR_DESTROY (1LU << 1)
+#define PG_CACHE_DESCR_LOCKED (1LU << 2)
+#define PG_CACHE_DESCR_SHIFT (3)
+#define PG_CACHE_DESCR_USERS_MASK (((unsigned long)-1) << PG_CACHE_DESCR_SHIFT)
+#define PG_CACHE_DESCR_FLAGS_MASK (((unsigned long)-1) >> (BITS_PER_ULONG - PG_CACHE_DESCR_SHIFT))
+
+/*
+ * Page cache descriptor state bits (works for both 32-bit and 64-bit architectures):
+ *
+ * 63 ... 31 ... 3 | 2 | 1 | 0|
+ * -----------------------------+------------+------------+-----------|
+ * number of descriptor users | DESTROY | LOCKED | ALLOCATED |
+ */
+struct rrdeng_page_descr {
+ uint32_t page_length;
+ usec_t start_time;
+ usec_t end_time;
+ uuid_t *id; /* never changes */
+ struct extent_info *extent;
+
+ /* points to ephemeral page cache descriptor if the page resides in the cache */
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Compare-And-Swap target for page cache descriptor allocation algorithm */
+ volatile unsigned long pg_cache_descr_state;
};
#define PAGE_CACHE_MAX_PRELOAD_PAGES (256)
@@ -61,12 +84,15 @@ struct pg_cache_page_index {
* It's also written by the data deletion workqueue when data collection is disabled for this metric.
*/
usec_t latest_time;
+
+ struct pg_cache_page_index *prev;
};
/* maps UUIDs to page indices */
struct pg_cache_metrics_index {
uv_rwlock_t lock;
Pvoid_t JudyHS_array;
+ struct pg_cache_page_index *last_page_index;
};
/* gathers dirty pages to be written on disk */
@@ -85,12 +111,15 @@ struct pg_cache_commited_page_index {
unsigned nr_commited_pages;
};
-/* gathers populated pages to be evicted */
+/*
+ * Gathers populated pages to be evicted.
+ * Relies on page cache descriptors being there as it uses their memory.
+ */
struct pg_cache_replaceQ {
uv_rwlock_t lock; /* LRU lock */
- struct rrdeng_page_cache_descr *head; /* LRU */
- struct rrdeng_page_cache_descr *tail; /* MRU */
+ struct page_cache_descr *head; /* LRU */
+ struct page_cache_descr *tail; /* MRU */
};
struct page_cache { /* TODO: add statistics */
@@ -104,29 +133,31 @@ struct page_cache { /* TODO: add statistics */
unsigned populated_pages;
};
-extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr);
-extern void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr);
-extern unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
+extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
+extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr);
+ struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr);
+ struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr);
-extern struct rrdeng_page_cache_descr *pg_cache_create_descr(void);
-extern void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr);
-extern void pg_cache_put(struct rrdeng_page_cache_descr *descr);
+ struct rrdeng_page_descr *descr);
+extern struct rrdeng_page_descr *pg_cache_create_descr(void);
+extern int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access);
+extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
+extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
- struct rrdeng_page_cache_descr *descr);
-extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr);
+ struct rrdeng_page_descr *descr);
+extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty);
extern struct pg_cache_page_index *
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time);
-extern struct rrdeng_page_cache_descr *
+extern struct rrdeng_page_descr *
pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
usec_t point_in_time);
extern struct pg_cache_page_index *create_page_index(uuid_t *id);
extern void init_page_cache(struct rrdengine_instance *ctx);
-extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr);
+extern void free_page_cache(struct rrdengine_instance *ctx);
+extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
-#endif /* NETDATA_PAGECACHE_H */ \ No newline at end of file
+#endif /* NETDATA_PAGECACHE_H */
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index b8e4eba01..0f2dceaa4 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -3,6 +3,10 @@
#include "rrdengine.h"
+rrdeng_stats_t global_io_errors = 0;
+rrdeng_stats_t global_fs_errors = 0;
+rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
+
void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
@@ -27,12 +31,12 @@ void read_extent_cb(uv_fs_t* req)
struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
struct extent_io_descriptor *xt_io_descr;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
int ret;
unsigned i, j, count;
void *page, *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
- struct rrdengine_datafile *datafile;
/* persistent structures */
struct rrdeng_df_extent_header *header;
struct rrdeng_df_extent_trailer *trailer;
@@ -54,9 +58,13 @@ void read_extent_cb(uv_fs_t* req)
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
ret = crc32cmp(trailer->checksum, crc);
- datafile = xt_io_descr->descr_array[0]->extent->datafile;
- debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
- xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+ }
+#endif
if (unlikely(ret)) {
/* TODO: handle errors */
exit(UV_EIO);
@@ -97,36 +105,38 @@ void read_extent_cb(uv_fs_t* req)
(void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
}
pg_cache_replaceQ_insert(ctx, descr);
- uv_mutex_lock(&descr->mutex);
- descr->page = page;
- descr->flags |= RRD_PAGE_POPULATED;
- descr->flags &= ~RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags |= RRD_PAGE_POPULATED;
+ pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
if (xt_io_descr->release_descr) {
pg_cache_put_unsafe(descr);
} else {
pg_cache_wake_up_waiters_unsafe(descr);
}
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (RRD_NO_COMPRESSION != header->compression_algorithm) {
- free(uncompressed_buf);
+ freez(uncompressed_buf);
}
if (xt_io_descr->completion)
complete(xt_io_descr->completion);
cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
- free(xt_io_descr);
+ freez(xt_io_descr);
}
static void do_read_extent(struct rrdengine_worker_config* wc,
- struct rrdeng_page_cache_descr **descr,
+ struct rrdeng_page_descr **descr,
unsigned count,
uint8_t release_descr)
{
struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache_descr *pg_cache_descr;
int ret;
unsigned i, size_bytes, pos, real_io_size;
// uint32_t payload_length;
@@ -141,14 +151,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
- /* free(xt_io_descr);
+ /* freez(xt_io_descr);
return;*/
}
for (i = 0 ; i < count; ++i) {
- uv_mutex_lock(&descr[i]->mutex);
- descr[i]->flags |= RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_lock(ctx, descr[i]);
+ pg_cache_descr = descr[i]->pg_cache_descr;
+ pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
// payload_length = descr[i]->page_length;
- uv_mutex_unlock(&descr[i]->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
xt_io_descr->descr_array[i] = descr[i];
}
@@ -227,8 +238,8 @@ void flush_pages_cb(uv_fs_t* req)
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
struct extent_io_descriptor *xt_io_descr;
- struct rrdeng_page_cache_descr *descr;
- struct rrdengine_datafile *datafile;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
int ret;
unsigned i, count;
Word_t commit_id;
@@ -238,10 +249,13 @@ void flush_pages_cb(uv_fs_t* req)
error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
goto cleanup;
}
- datafile = xt_io_descr->descr_array[0]->extent->datafile;
- debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
- __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
-
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
+ __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ }
+#endif
count = xt_io_descr->descr_count;
for (i = 0 ; i < count ; ++i) {
/* care, we don't hold the descriptor mutex */
@@ -256,18 +270,19 @@ void flush_pages_cb(uv_fs_t* req)
pg_cache_replaceQ_insert(ctx, descr);
- uv_mutex_lock(&descr->mutex);
- descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
/* wake up waiters, care no reference being held */
pg_cache_wake_up_waiters_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (xt_io_descr->completion)
complete(xt_io_descr->completion);
cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
- free(xt_io_descr);
+ freez(xt_io_descr);
}
/*
@@ -283,7 +298,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
int compressed_size, max_compressed_size = 0;
unsigned i, count, size_bytes, pos, real_io_size;
uint32_t uncompressed_payload_length, payload_offset;
- struct rrdeng_page_cache_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
+ struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
+ struct page_cache_descr *pg_cache_descr;
struct extent_io_descriptor *xt_io_descr;
void *compressed_buf = NULL;
Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
@@ -311,15 +327,16 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
assert(0 != descr->page_length);
- uv_mutex_lock(&descr->mutex);
- if (!(descr->flags & RRD_PAGE_WRITE_PENDING)) {
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) {
/* care, no reference being held */
- descr->flags |= RRD_PAGE_WRITE_PENDING;
+ pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING;
uncompressed_payload_length += descr->page_length;
descr_commit_idx_array[count] = Index;
eligible_pages[count++] = descr;
}
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock);
@@ -345,9 +362,9 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
- /* free(xt_io_descr);*/
+ /* freez(xt_io_descr);*/
}
- (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_cache_descr *) * count);
+ (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
xt_io_descr->descr_count = count;
pos = 0;
@@ -378,7 +395,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
/* care, we don't hold the descriptor mutex */
- (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length);
+ (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length);
descr->extent = extent;
extent->pages[i] = descr;
@@ -397,7 +414,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
ctx->stats.after_compress_bytes += compressed_size;
debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
(void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
- free(compressed_buf);
+ freez(compressed_buf);
size_bytes = payload_offset + compressed_size + sizeof(*trailer);
header->payload_length = compressed_size;
break;
@@ -435,23 +452,36 @@ static void after_delete_old_data(uv_work_t *req, int status)
struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
- unsigned bytes;
+ unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
+ int ret;
+ char path[RRDENG_PATH_MAX];
(void)status;
datafile = ctx->datafiles.first;
journalfile = datafile->journalfile;
- bytes = datafile->pos + journalfile->pos;
+ datafile_bytes = datafile->pos;
+ journalfile_bytes = journalfile->pos;
+ deleted_bytes = 0;
+ info("Deleting data and journal file pair.");
datafile_list_delete(ctx, datafile);
- destroy_journal_file(journalfile, datafile);
- destroy_data_file(datafile);
- info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
- datafile->tier, datafile->fileno);
- free(journalfile);
- free(datafile);
+ ret = destroy_journal_file(journalfile, datafile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Deleted journal file \"%s\".", path);
+ deleted_bytes += journalfile_bytes;
+ }
+ ret = destroy_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Deleted data file \"%s\".", path);
+ deleted_bytes += datafile_bytes;
+ }
+ freez(journalfile);
+ freez(datafile);
- ctx->disk_space -= bytes;
- info("Reclaimed %u bytes of disk space.", bytes);
+ ctx->disk_space -= deleted_bytes;
+ info("Reclaimed %u bytes of disk space.", deleted_bytes);
/* unfreeze command processing */
wc->now_deleting.data = NULL;
@@ -464,7 +494,7 @@ static void delete_old_data(uv_work_t *req)
struct rrdengine_instance *ctx = req->data;
struct rrdengine_datafile *datafile;
struct extent_info *extent, *next;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
unsigned count, i;
/* Safe to use since it will be deleted after we are done */
@@ -474,10 +504,10 @@ static void delete_old_data(uv_work_t *req)
count = extent->number_of_pages;
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
- pg_cache_punch_hole(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 0);
}
next = extent->next;
- free(extent);
+ freez(extent);
}
}
@@ -487,6 +517,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
struct rrdengine_datafile *datafile;
unsigned current_size, target_size;
uint8_t out_of_space, only_one_datafile;
+ int ret;
out_of_space = 0;
if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
@@ -501,7 +532,10 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
/* Finalize data and journal file and create a new pair */
wal_flush_transaction_buffer(wc);
- create_new_datafile_pair(ctx, 1, datafile->fileno + 1);
+ ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1);
+ if (likely(!ret)) {
+ ++ctx->last_fileno;
+ }
}
if (unlikely(out_of_space)) {
/* delete old data */
@@ -509,18 +543,30 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
/* already deleting data */
return;
}
- info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
- ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ if (NULL == ctx->datafiles.first->next) {
+ error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\""
+ " to reclaim space, there are no other file pairs left.",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ return;
+ }
+ info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
wc->now_deleting.data = ctx;
- uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data);
+ assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data));
}
}
+/* return 0 on success */
int init_rrd_files(struct rrdengine_instance *ctx)
{
return init_data_files(ctx);
}
+void finalize_rrd_files(struct rrdengine_instance *ctx)
+{
+ return finalize_data_files(ctx);
+}
+
void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
{
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
@@ -588,7 +634,6 @@ void async_cb(uv_async_t *handle)
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
- struct rrdengine_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
@@ -608,7 +653,7 @@ void timer_cb(uv_timer_t* handle)
#ifdef NETDATA_INTERNAL_CHECKS
{
char buf[4096];
- debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf)));
+ debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
}
#endif
}
@@ -623,7 +668,7 @@ void rrdeng_worker(void* arg)
struct rrdengine_worker_config* wc = arg;
struct rrdengine_instance *ctx = wc->ctx;
uv_loop_t* loop;
- int shutdown;
+ int shutdown, ret;
enum rrdeng_opcode opcode;
uv_timer_t timer_req;
struct rrdeng_cmd cmd;
@@ -631,22 +676,35 @@ void rrdeng_worker(void* arg)
rrdeng_init_cmd_queue(wc);
loop = wc->loop = mallocz(sizeof(uv_loop_t));
- uv_loop_init(loop);
+ ret = uv_loop_init(loop);
+ if (ret) {
+ error("uv_loop_init(): %s", uv_strerror(ret));
+ goto error_after_loop_init;
+ }
loop->data = wc;
- uv_async_init(wc->loop, &wc->async, async_cb);
+ ret = uv_async_init(wc->loop, &wc->async, async_cb);
+ if (ret) {
+ error("uv_async_init(): %s", uv_strerror(ret));
+ goto error_after_async_init;
+ }
wc->async.data = wc;
wc->now_deleting.data = NULL;
/* dirty page flushing timer */
- uv_timer_init(loop, &timer_req);
+ ret = uv_timer_init(loop, &timer_req);
+ if (ret) {
+ error("uv_timer_init(): %s", uv_strerror(ret));
+ goto error_after_timer_init;
+ }
timer_req.data = wc;
+ wc->error = 0;
/* wake up initialization thread */
complete(&ctx->rrdengine_completion);
- uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS);
+ assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
while (shutdown == 0 || uv_loop_alive(loop)) {
uv_run(loop, UV_RUN_DEFAULT);
@@ -661,12 +719,6 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_SHUTDOWN:
shutdown = 1;
- if (unlikely(wc->now_deleting.data)) {
- /* postpone shutdown until after deletion */
- info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
- rrdeng_enq_cmd(wc, &cmd);
- break;
- }
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
* it is however undocumented behaviour and we need to be aware if this becomes
@@ -675,10 +727,6 @@ void rrdeng_worker(void* arg)
uv_close((uv_handle_t *)&wc->async, NULL);
assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
- info("Shutting down RRD engine event loop.");
- while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all commited pages. */
- }
break;
case RRDENG_READ_PAGE:
do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
@@ -690,14 +738,14 @@ void rrdeng_worker(void* arg)
do_commit_transaction(wc, STORE_DATA, NULL);
break;
case RRDENG_FLUSH_PAGES: {
- unsigned total_bytes, bytes_written;
+ unsigned bytes_written;
/* First I/O should be enough to call completion */
bytes_written = do_flush_pages(wc, 1, cmd.completion);
- for (total_bytes = bytes_written ;
- bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
- total_bytes += bytes_written) {
- bytes_written = do_flush_pages(wc, 1, NULL);
+ if (bytes_written) {
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all commited pages. */
+ }
}
break;
}
@@ -708,6 +756,13 @@ void rrdeng_worker(void* arg)
} while (opcode != RRDENG_NOOP);
}
/* cleanup operations of the event loop */
+ if (unlikely(wc->now_deleting.data)) {
+ info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
+ }
+ info("Shutting down RRD engine event loop.");
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all commited pages. */
+ }
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);
@@ -716,7 +771,20 @@ void rrdeng_worker(void* arg)
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
assert(0 == uv_loop_close(loop));
- free(loop);
+ freez(loop);
+
+ return;
+
+error_after_timer_init:
+ uv_close((uv_handle_t *)&wc->async, NULL);
+error_after_async_init:
+ assert(0 == uv_loop_close(loop));
+error_after_loop_init:
+ freez(loop);
+
+ wc->error = UV_EAGAIN;
+ /* wake up initialization thread */
+ complete(&ctx->rrdengine_completion);
}
@@ -726,19 +794,19 @@ static void basic_functional_test(struct rrdengine_instance *ctx)
int i, j, failed_validations;
uuid_t uuid[NR_PAGES];
void *buf;
- struct rrdeng_page_cache_descr *handle[NR_PAGES];
- char uuid_str[37];
- char backup[NR_PAGES][37 * 100]; /* backup storage for page data verification */
+ struct rrdeng_page_descr *handle[NR_PAGES];
+ char uuid_str[UUID_STR_LEN];
+ char backup[NR_PAGES][UUID_STR_LEN * 100]; /* backup storage for page data verification */
for (i = 0 ; i < NR_PAGES ; ++i) {
uuid_generate(uuid[i]);
uuid_unparse_lower(uuid[i], uuid_str);
// fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str);
- buf = rrdeng_create_page(&uuid[i], &handle[i]);
+ buf = rrdeng_create_page(ctx, &uuid[i], &handle[i]);
/* Each page contains 10 times its own UUID stringified */
for (j = 0 ; j < 100 ; ++j) {
- strcpy(buf + 37 * j, uuid_str);
- strcpy(backup[i] + 37 * j, uuid_str);
+ strcpy(buf + UUID_STR_LEN * j, uuid_str);
+ strcpy(backup[i] + UUID_STR_LEN * j, uuid_str);
}
rrdeng_commit_page(ctx, handle[i], (Word_t)i);
}
@@ -750,7 +818,7 @@ static void basic_functional_test(struct rrdengine_instance *ctx)
++failed_validations;
fprintf(stderr, "Page %d was LOST.\n", i);
}
- if (memcmp(backup[i], buf, 37 * 100)) {
+ if (memcmp(backup[i], buf, UUID_STR_LEN * 100)) {
++failed_validations;
fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i);
}
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 141bb9c63..6f6a6f8ff 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -22,6 +22,7 @@
#include "journalfile.h"
#include "rrdengineapi.h"
#include "pagecache.h"
+#include "rrdenglocking.h"
#ifdef NETDATA_RRD_INTERNALS
@@ -59,10 +60,10 @@ struct rrdeng_cmd {
enum rrdeng_opcode opcode;
union {
struct rrdeng_read_page {
- struct rrdeng_page_cache_descr *page_cache_descr;
+ struct rrdeng_page_descr *page_cache_descr;
} read_page;
struct rrdeng_read_extent {
- struct rrdeng_page_cache_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
+ struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
int page_count;
} read_extent;
struct completion *completion;
@@ -85,7 +86,7 @@ struct extent_io_descriptor {
struct completion *completion;
unsigned descr_count;
int release_descr;
- struct rrdeng_page_cache_descr *descr_array[MAX_PAGES_PER_EXTENT];
+ struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
};
@@ -111,6 +112,8 @@ struct rrdengine_worker_config {
uv_cond_t cmd_cond;
volatile unsigned queue_size;
struct rrdeng_cmdqueue cmd_queue;
+
+ int error;
};
/*
@@ -142,10 +145,19 @@ struct rrdengine_statistics {
rrdeng_stats_t datafile_deletions;
rrdeng_stats_t journalfile_creations;
rrdeng_stats_t journalfile_deletions;
+ rrdeng_stats_t page_cache_descriptors;
+ rrdeng_stats_t io_errors;
+ rrdeng_stats_t fs_errors;
};
+/* I/O errors global counter */
+extern rrdeng_stats_t global_io_errors;
+/* File-System errors global counter */
+extern rrdeng_stats_t global_fs_errors;
+/* number of File-Descriptors that have been reserved by dbengine */
+extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
+
struct rrdengine_instance {
- rrdengine_state_t rrdengine_state;
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
struct page_cache pg_cache;
@@ -155,6 +167,7 @@ struct rrdengine_instance {
char dbfiles_path[FILENAME_MAX+1];
uint64_t disk_space;
uint64_t max_disk_space;
+ unsigned last_fileno; /* newest index of datafile and journalfile */
unsigned long max_cache_pages;
unsigned long cache_pages_low_watermark;
@@ -163,6 +176,7 @@ struct rrdengine_instance {
extern void sanity_check(void);
extern int init_rrd_files(struct rrdengine_instance *ctx);
+extern void finalize_rrd_files(struct rrdengine_instance *ctx);
extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
extern void rrdeng_worker(void* arg);
extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index a4e711554..a87ce6d64 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -41,6 +41,7 @@ void rrdeng_store_metric_init(RRDDIM *rd)
handle->descr = NULL;
handle->prev_descr = NULL;
+ handle->unaligned_page = 0;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t));
@@ -54,59 +55,140 @@ void rrdeng_store_metric_init(RRDDIM *rd)
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(&temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
rd->state->rrdeng_uuid = &page_index->id;
handle->page_index = page_index;
}
+/* The page must be populated and referenced */
+static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
+{
+ unsigned i;
+ uint8_t has_only_empty_metrics = 1;
+ storage_number *page;
+
+ page = descr->pg_cache_descr->page;
+ for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) {
+ if (SN_EMPTY_SLOT != page[i]) {
+ has_only_empty_metrics = 0;
+ break;
+ }
+ }
+ return has_only_empty_metrics;
+}
+
+void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_descr *descr;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (unlikely(NULL == descr)) {
+ return;
+ }
+ if (likely(descr->page_length)) {
+ int ret, page_is_empty;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+#endif
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
+ }
+ page_is_empty = page_has_only_empty_metrics(descr);
+ if (page_is_empty) {
+ debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_put(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 1);
+ handle->prev_descr = NULL;
+ } else {
+ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ ret = pg_cache_try_get_unsafe(descr, 0);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ assert (1 == ret);
+
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ handle->prev_descr = descr;
+ }
+ } else {
+ freez(descr->pg_cache_descr->page);
+ rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
+ freez(descr);
+ }
+ handle->descr = NULL;
+}
+
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
storage_number *page;
+ uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
pg_cache = &ctx->pg_cache;
descr = handle->descr;
- if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) {
- if (descr) {
- descr->handle = NULL;
- if (descr->page_length) {
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
- /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
- ++descr->refcnt;
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- if (handle->prev_descr) {
- /* unpin old second page */
- pg_cache_put(handle->prev_descr);
- }
- handle->prev_descr = descr;
- } else {
- free(descr->page);
- free(descr);
- handle->descr = NULL;
- }
+
+ if (descr) {
+ /* Make alignment decisions */
+
+ if (descr->page_length == rd->rrdset->rrddim_page_alignment) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ /* is the metric far enough out of alignment with the others? */
+ if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) {
+ handle->unaligned_page = 1;
+ debug(D_RRDENGINE, "Metric page is not aligned with chart:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
}
- page = rrdeng_create_page(&handle->page_index->id, &descr);
+ if (unlikely(handle->unaligned_page &&
+ /* did the other metrics change page? */
+ rd->rrdset->rrddim_page_alignment <= sizeof(number))) {
+ debug(D_RRDENGINE, "Flushing unaligned metric page.");
+ must_flush_unaligned_page = 1;
+ handle->unaligned_page = 0;
+ }
+ }
+ if (unlikely(NULL == descr ||
+ descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE ||
+ must_flush_unaligned_page)) {
+ rrdeng_store_metric_flush_current_page(rd);
+
+ page = rrdeng_create_page(ctx, &handle->page_index->id, &descr);
assert(page);
- handle->prev_descr = handle->descr;
+
handle->descr = descr;
- descr->handle = handle;
+
uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
- }
- page = descr->page;
+ if (0 == rd->rrdset->rrddim_page_alignment) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ }
+ page = descr->pg_cache_descr->page;
page[descr->page_length / sizeof(number)] = number;
descr->end_time = point_in_time;
descr->page_length += sizeof(number);
+ if (perfect_page_alignment)
+ rd->rrdset->rrddim_page_alignment = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time)) {
descr->start_time = point_in_time;
@@ -126,26 +208,13 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
- descr = handle->descr;
- if (descr) {
- descr->handle = NULL;
- if (descr->page_length) {
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- if (handle->prev_descr) {
- /* unpin old second page */
- pg_cache_put(handle->prev_descr);
- }
- } else {
- free(descr->page);
- free(descr);
- }
+ rrdeng_store_metric_flush_current_page(rd);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
}
}
@@ -174,7 +243,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
storage_number *page, ret;
unsigned position;
usec_t point_in_time;
@@ -198,7 +267,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
handle->descr = NULL;
}
descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time);
@@ -216,7 +285,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
ret = SN_EMPTY_SLOT;
goto out;
}
- page = descr->page;
+ page = descr->pg_cache_descr->page;
if (unlikely(descr->start_time == descr->end_time)) {
ret = page[0];
goto out;
@@ -248,7 +317,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
handle = &rrdimm_handle->rrdeng;
ctx = handle->ctx;
@@ -257,7 +326,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
}
@@ -283,30 +352,32 @@ time_t rrdeng_metric_oldest_time(RRDDIM *rd)
}
/* Also gets a reference for the page */
-void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr)
+void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
void *page;
- int ret;
-
/* TODO: check maximum number of pages in page cache limit */
- page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
descr = pg_cache_create_descr();
- descr->page = page;
descr->id = id; /* TODO: add page type: metric, log, something? */
- descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
- descr->refcnt = 1;
-
- debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------");
- if(unlikely(debug_flags & D_RRDENGINE))
+ page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
+ pg_cache_descr->refcnt = 1;
+
+ debug(D_RRDENGINE, "Created new page:");
+ if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
*ret_descr = descr;
return page;
}
/* The page must not be empty */
-void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -324,15 +395,16 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache
++pg_cache->commited_page_index.nr_commited_pages;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
/* Gets a reference for the page */
void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
- debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ debug(D_RRDENGINE, "Reading existing page:");
descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
if (NULL == descr) {
*handle = NULL;
@@ -340,16 +412,18 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **
return NULL;
}
*handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
- return descr->page;
+ return pg_cache_descr->page;
}
/* Gets a reference for the page */
void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
- debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ debug(D_RRDENGINE, "Reading existing page:");
descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
if (NULL == descr) {
*handle = NULL;
@@ -357,11 +431,18 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
return NULL;
}
*handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
- return descr->page;
+ return pg_cache_descr->page;
}
-void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+/*
+ * Gathers Database Engine statistics.
+ * Careful when modifying this function.
+ * You must not change the indices of the statistics or user code will break.
+ * You must not exceed RRDENG_NR_STATS or it will crash.
+ */
+void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -392,24 +473,46 @@ void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long
array[24] = (uint64_t)ctx->stats.datafile_deletions;
array[25] = (uint64_t)ctx->stats.journalfile_creations;
array[26] = (uint64_t)ctx->stats.journalfile_deletions;
+ array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
+ array[28] = (uint64_t)ctx->stats.io_errors;
+ array[29] = (uint64_t)ctx->stats.fs_errors;
+ array[30] = (uint64_t)global_io_errors;
+ array[31] = (uint64_t)global_fs_errors;
+ array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
+ assert(RRDENG_NR_STATS == 33);
}
/* Releases reference to page */
void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
{
(void)ctx;
- pg_cache_put((struct rrdeng_page_cache_descr *)handle);
+ pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
}
/*
- * Returns 0 on success, 1 on error
+ * Returns 0 on success, negative on error
*/
int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb)
{
struct rrdengine_instance *ctx;
int error;
+ uint32_t max_open_files;
sanity_check();
+
+ max_open_files = rlimit_nofile.rlim_cur / 4;
+
+ /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
+ if (rrdeng_reserved_file_descriptors > max_open_files) {
+ error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
+ (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files);
+
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EMFILE;
+ }
+
if (NULL == ctxp) {
/* for testing */
ctx = &default_global_ctx;
@@ -417,10 +520,6 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
} else {
*ctxp = ctx = callocz(1, sizeof(*ctx));
}
- if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) {
- return 1;
- }
- ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING;
ctx->global_compress_alg = RRD_LZ4;
if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
@@ -439,11 +538,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
init_commit_log(ctx);
error = init_rrd_files(ctx);
if (error) {
- ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED;
- if (ctx != &default_global_ctx) {
- freez(ctx);
- }
- return 1;
+ goto error_after_init_rrd_files;
}
init_completion(&ctx->rrdengine_completion);
@@ -451,9 +546,21 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
/* wait for worker thread to initialize */
wait_for_completion(&ctx->rrdengine_completion);
destroy_completion(&ctx->rrdengine_completion);
-
- ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED;
+ if (ctx->worker_config.error) {
+ goto error_after_rrdeng_worker;
+ }
return 0;
+
+error_after_rrdeng_worker:
+ finalize_rrd_files(ctx);
+error_after_init_rrd_files:
+ free_page_cache(ctx);
+ if (ctx != &default_global_ctx) {
+ freez(ctx);
+ *ctxp = NULL;
+ }
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EIO;
}
/*
@@ -464,10 +571,6 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
struct rrdeng_cmd cmd;
if (NULL == ctx) {
- /* TODO: move to per host basis */
- ctx = &default_global_ctx;
- }
- if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) {
return 1;
}
@@ -477,8 +580,12 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
assert(0 == uv_thread_join(&ctx->worker_config.thread));
+ finalize_rrd_files(ctx);
+ free_page_cache(ctx);
+
if (ctx != &default_global_ctx) {
freez(ctx);
}
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return 0;
} \ No newline at end of file
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index e76629a4b..e52aabcbe 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -7,16 +7,22 @@
#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (32)
#define RRDENG_MIN_DISK_SPACE_MB (256)
+
+#define RRDENG_NR_STATS (33)
+
+#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
+
extern int default_rrdeng_page_cache_mb;
extern int default_rrdeng_disk_quota_mb;
-extern void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr);
-extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
+extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id);
extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
extern void rrdeng_store_metric_init(RRDDIM *rd);
+extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd);
extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
extern void rrdeng_store_metric_finalize(RRDDIM *rd);
extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
@@ -26,7 +32,7 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
-extern void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+extern void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
/* must call once before using anything */
extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
index 25f57ba1b..96504b275 100644
--- a/database/engine/rrdenginelib.c
+++ b/database/engine/rrdenginelib.c
@@ -1,25 +1,52 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr)
+#define BUFSIZE (512)
+
+/* Caller must hold descriptor lock */
+void print_page_cache_descr(struct rrdeng_page_descr *descr)
{
- char uuid_str[37];
- char str[512];
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ char uuid_str[UUID_STR_LEN];
+ char str[BUFSIZE];
int pos = 0;
- uuid_unparse_lower(*page_cache_descr->id, uuid_str);
- pos += snprintfz(str, 512 - pos, "page(%p) id=%s\n"
+ uuid_unparse_lower(*descr->id, uuid_str);
+ pos += snprintfz(str, BUFSIZE - pos, "page(%p) id=%s\n"
"--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
- page_cache_descr->page, uuid_str,
- page_cache_descr->page_length,
- (uint64_t)page_cache_descr->start_time,
- (uint64_t)page_cache_descr->end_time);
- if (!page_cache_descr->extent) {
- pos += snprintfz(str + pos, 512 - pos, "N/A");
+ pg_cache_descr->page, uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time,
+ (uint64_t)descr->end_time);
+ if (!descr->extent) {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
+ } else {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
+ }
+
+ snprintfz(str + pos, BUFSIZE - pos, " flags:0x%2.2lX refcnt:%u\n\n", pg_cache_descr->flags, pg_cache_descr->refcnt);
+ debug(D_RRDENGINE, "%s", str);
+}
+
+void print_page_descr(struct rrdeng_page_descr *descr)
+{
+ char uuid_str[UUID_STR_LEN];
+ char str[BUFSIZE];
+ int pos = 0;
+
+ uuid_unparse_lower(*descr->id, uuid_str);
+ pos += snprintfz(str, BUFSIZE - pos, "id=%s\n"
+ "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
+ uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time,
+ (uint64_t)descr->end_time);
+ if (!descr->extent) {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
} else {
- pos += snprintfz(str + pos, 512 - pos, "%"PRIu64, page_cache_descr->extent->offset);
+ pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
}
- snprintfz(str + pos, 512 - pos, " flags:0x%2.2lX refcnt:%u\n\n", page_cache_descr->flags, page_cache_descr->refcnt);
+ snprintfz(str + pos, BUFSIZE - pos, "\n\n");
fputs(str, stderr);
}
@@ -51,6 +78,48 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
return 0;
}
+/*
+ * Tries to open a file in direct I/O mode, falls back to buffered mode if not possible.
+ * Returns UV error number that is < 0 on failure.
+ * On success sets (*file) to be the uv_file that was opened.
+ */
+int open_file_direct_io(char *path, int flags, uv_file *file)
+{
+ uv_fs_t req;
+ int fd, current_flags, direct;
+
+ for (direct = 1 ; direct >= 0 ; --direct) {
+#ifdef __APPLE__
+ /* Apple OS does not support O_DIRECT */
+ direct = 0;
+#endif
+ current_flags = flags;
+ if (direct) {
+ current_flags |= O_DIRECT;
+ }
+ fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ if ((direct) && (UV_EINVAL == fd)) {
+ error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path);
+ } else {
+ error("Failed to open file \"%s\".", path);
+ --direct; /* break the loop */
+ }
+ } else {
+ assert(req.result >= 0);
+ *file = req.result;
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ --direct; /* break the loop */
+ }
+ uv_fs_req_cleanup(&req);
+ }
+
+ return fd;
+}
+
char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size)
{
struct page_cache *pg_cache;
@@ -60,6 +129,7 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
"metric_API_producers: %ld\n"
"metric_API_consumers: %ld\n"
"page_cache_total_pages: %ld\n"
+ "page_cache_descriptors: %ld\n"
"page_cache_populated_pages: %ld\n"
"page_cache_commited_pages: %ld\n"
"page_cache_insertions: %ld\n"
@@ -87,6 +157,7 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
(long)ctx->stats.metric_API_producers,
(long)ctx->stats.metric_API_consumers,
(long)pg_cache->page_descriptors,
+ (long)ctx->stats.page_cache_descriptors,
(long)pg_cache->populated_pages,
(long)pg_cache->commited_page_index.nr_commited_pages,
(long)ctx->stats.pg_cache_insertions,
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
index bb6f072bf..36d414e89 100644
--- a/database/engine/rrdenginelib.h
+++ b/database/engine/rrdenginelib.h
@@ -6,11 +6,17 @@
#include "rrdengine.h"
/* Forward declarations */
-struct rrdeng_page_cache_descr;
+struct rrdeng_page_descr;
#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)
+#define BITS_PER_ULONG (sizeof(unsigned long) * 8)
+
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN (37)
+#endif
+
/* Taken from linux kernel */
#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)]))
@@ -25,6 +31,15 @@ typedef uintptr_t rrdeng_stats_t;
#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0)
#endif
+#define RRDENG_PATH_MAX (4096)
+
+/* returns old *ptr value */
+static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr,
+ unsigned long oldval, unsigned long newval)
+{
+ return __sync_val_compare_and_swap(ptr, oldval, newval);
+}
+
#ifndef O_DIRECT
/* Workaround for OS X */
#define O_DIRECT (0)
@@ -77,8 +92,10 @@ static inline void crc32set(void *crcp, uLong crc)
*(uint32_t *)crcp = crc;
}
-extern void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr);
+extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr);
+extern void print_page_descr(struct rrdeng_page_descr *descr);
extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
+extern int open_file_direct_io(char *path, int flags, uv_file *file);
extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
#endif /* NETDATA_RRDENGINELIB_H */ \ No newline at end of file
diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c
new file mode 100644
index 000000000..0eb9019b4
--- /dev/null
+++ b/database/engine/rrdenglocking.c
@@ -0,0 +1,233 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx)
+{
+ struct page_cache_descr *pg_cache_descr;
+
+ pg_cache_descr = mallocz(sizeof(*pg_cache_descr));
+ rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, 1);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags = 0;
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
+ pg_cache_descr->refcnt = 0;
+ pg_cache_descr->waiters = 0;
+ assert(0 == uv_cond_init(&pg_cache_descr->cond));
+ assert(0 == uv_mutex_init(&pg_cache_descr->mutex));
+
+ return pg_cache_descr;
+}
+
+void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr)
+{
+ uv_cond_destroy(&pg_cache_descr->cond);
+ uv_mutex_destroy(&pg_cache_descr->mutex);
+ freez(pg_cache_descr);
+ rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1);
+}
+
+/* also allocates page cache descriptor if missing */
+void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, old_users, new_state, ret_state;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ uint8_t we_locked;
+
+ we_locked = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(we_locked)) {
+ assert(old_state & PG_CACHE_DESCR_LOCKED);
+ new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ assert(0 == old_users);
+ continue; /* spin */
+ }
+ if (0 == old_state) {
+ /* no page cache descriptor has been allocated */
+
+ if (NULL == pg_cache_descr) {
+ pg_cache_descr = rrdeng_create_pg_cache_descr(ctx);
+ }
+ new_state = PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, 0, new_state);
+ if (0 == ret_state) {
+ we_locked = 1;
+ descr->pg_cache_descr = pg_cache_descr;
+ pg_cache_descr->descr = descr;
+ pg_cache_descr = NULL; /* make sure we don't free pg_cache_descr */
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ /* page cache descriptor is already allocated */
+ assert(old_state & PG_CACHE_DESCR_ALLOCATED);
+
+ new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT;
+ new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ /* spin */
+ }
+
+ if (pg_cache_descr) {
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ }
+ pg_cache_descr = descr->pg_cache_descr;
+ uv_mutex_lock(&pg_cache_descr->mutex);
+}
+
+void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, new_state, ret_state, old_users;
+ struct page_cache_descr *pg_cache_descr;
+ uint8_t we_locked;
+
+ uv_mutex_unlock(&descr->pg_cache_descr->mutex);
+
+ we_locked = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(we_locked)) {
+ assert(0 == old_users);
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ assert(0 == old_users);
+ continue; /* spin */
+ }
+ assert(old_state & PG_CACHE_DESCR_ALLOCATED);
+ pg_cache_descr = descr->pg_cache_descr;
+ /* caller is the only page cache descriptor user and there are no pending references on the page */
+ if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) &&
+ !pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ new_state = PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ we_locked = 1;
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ assert(old_users > 0);
+ new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT;
+ new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ /* spin */
+ }
+
+}
+
+/*
+ * Tries to deallocate page cache descriptor. If it fails, it postpones deallocation by setting the
+ * PG_CACHE_DESCR_DESTROY flag which will be eventually cleared by a different context after doing
+ * the deallocation.
+ */
+void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, new_state, ret_state, old_users;
+ struct page_cache_descr *pg_cache_descr;
+ uint8_t just_locked, we_freed, must_unlock;
+
+ just_locked = 0;
+ we_freed = 0;
+ must_unlock = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(just_locked)) {
+ assert(0 == old_users);
+
+ must_unlock = 1;
+ just_locked = 0;
+ /* Try deallocate if there are no pending references on the page */
+ if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ we_freed = 1;
+ /* success */
+ continue;
+ }
+ continue; /* spin */
+ }
+ if (unlikely(must_unlock)) {
+ assert(0 == old_users);
+
+ if (we_freed) {
+ /* success */
+ new_state = 0;
+ } else {
+ new_state = old_state | PG_CACHE_DESCR_DESTROY;
+ new_state &= ~PG_CACHE_DESCR_LOCKED;
+ }
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* unlocked */
+ return;
+ }
+ continue; /* spin */
+ }
+ if (!(old_state & PG_CACHE_DESCR_ALLOCATED)) {
+ /* don't do anything */
+ return;
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ assert(0 == old_users);
+ continue; /* spin */
+ }
+ pg_cache_descr = descr->pg_cache_descr;
+ /* caller is the only page cache descriptor user */
+ if (0 == old_users) {
+ new_state = old_state | PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ just_locked = 1;
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_DESTROY) {
+ /* don't do anything */
+ return;
+ }
+ /* plant PG_CACHE_DESCR_DESTROY so that other contexts eventually free the page cache descriptor */
+ new_state = old_state | PG_CACHE_DESCR_DESTROY;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ return;
+ }
+ /* spin */
+ }
+} \ No newline at end of file
diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h
new file mode 100644
index 000000000..127ddc90c
--- /dev/null
+++ b/database/engine/rrdenglocking.h
@@ -0,0 +1,17 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGLOCKING_H
+#define NETDATA_RRDENGLOCKING_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct page_cache_descr;
+
+extern struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx);
+extern void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr);
+extern void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+
+#endif /* NETDATA_RRDENGLOCKING_H */ \ No newline at end of file