summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
Diffstat (limited to 'database')
-rw-r--r--database/README.md10
-rw-r--r--database/engine/README.md40
-rw-r--r--database/engine/datafile.c224
-rw-r--r--database/engine/datafile.h7
-rw-r--r--database/engine/journalfile.c117
-rw-r--r--database/engine/journalfile.h2
-rw-r--r--database/engine/pagecache.c317
-rw-r--r--database/engine/pagecache.h89
-rw-r--r--database/engine/rrdengine.c234
-rw-r--r--database/engine/rrdengine.h22
-rw-r--r--database/engine/rrdengineapi.c277
-rw-r--r--database/engine/rrdengineapi.h12
-rw-r--r--database/engine/rrdenginelib.c97
-rw-r--r--database/engine/rrdenginelib.h21
-rw-r--r--database/engine/rrdenglocking.c233
-rw-r--r--database/engine/rrdenglocking.h17
-rw-r--r--database/rrd.c5
-rw-r--r--database/rrd.h32
-rw-r--r--database/rrdcalc.c108
-rw-r--r--database/rrdcalc.h25
-rw-r--r--database/rrdcalctemplate.c2
-rw-r--r--database/rrdcalctemplate.h6
-rw-r--r--database/rrddim.c6
-rw-r--r--database/rrdhost.c106
-rw-r--r--database/rrdset.c33
25 files changed, 1518 insertions, 524 deletions
diff --git a/database/README.md b/database/README.md
index dc40a3e40..de0aa9b53 100644
--- a/database/README.md
+++ b/database/README.md
@@ -87,7 +87,9 @@ server that will maintain the entire database for all nodes, and will also run h
for all nodes.
For this central netdata, memory size can be a problem. Fortunately, netdata supports several
-memory modes. One interesting option for this setup is `memory mode = map`.
+memory modes. **One interesting option** for this setup is `memory mode = map`.
+
+### map
In this mode, the database of netdata is stored in memory mapped files. netdata continues to read
and write the database in memory, but the kernel automatically loads and saves memory pages from/to
@@ -162,8 +164,10 @@ vm.dirty_ratio = 90
vm.dirty_writeback_centisecs = 0
```
-There is another memory mode to help overcome the memory size problem. What is most interesting
-for this setup is `memory mode = dbengine`.
+There is another memory mode to help overcome the memory size problem. What is **most interesting
+for this setup** is `memory mode = dbengine`.
+
+### dbengine
In this mode, the database of netdata is stored in database files. The [Database Engine](engine/)
works like a traditional database. There is some amount of RAM dedicated to data caching and
diff --git a/database/engine/README.md b/database/engine/README.md
index 28a2528cb..adc69ffd7 100644
--- a/database/engine/README.md
+++ b/database/engine/README.md
@@ -96,9 +96,9 @@ There are explicit memory requirements **per** DB engine **instance**, meaning *
- `page cache size` must be at least `#dimensions-being-collected x 4096 x 2` bytes.
-- an additional `#pages-on-disk x 4096 x 0.06` bytes of RAM are allocated for metadata.
+- an additional `#pages-on-disk x 4096 x 0.03` bytes of RAM are allocated for metadata.
- - roughly speaking this is 6% of the uncompressed disk space taken by the DB files.
+ - roughly speaking this is 3% of the uncompressed disk space taken by the DB files.
- for very highly compressible data (compression ratio > 90%) this RAM overhead
is comparable to the disk space footprint.
@@ -106,4 +106,40 @@ There are explicit memory requirements **per** DB engine **instance**, meaning *
An important observation is that RAM usage depends on both the `page cache size` and the
`dbengine disk space` options.
+## File descriptor requirements
+
+The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming
+slave or master server). When configuring your system you should make sure there are at least 50
+file descriptors available per `dbengine` instance.
+
+Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25%
+of the file descriptors that are available to the Netdata service are accessible by dbengine instances.
+You should take that into account when configuring your service
+or system-wide file descriptor limits. You can roughly estimate that the netdata service needs 2048 file
+descriptors for every 10 streaming slave hosts when streaming is configured to use `memory mode = dbengine`.
+
+If for example one wants to allocate 65536 file descriptors to the netdata service on a systemd system
+one needs to override the netdata service by running `sudo systemctl edit netdata` and creating a
+file with contents:
+
+```
+[Service]
+LimitNOFILE=65536
+```
+
+For other types of services one can add the line:
+```
+ulimit -n 65536
+```
+at the beginning of the service file. Alternatively you can change the system-wide limits of the kernel by changing `/etc/sysctl.conf`. For linux that would be:
+```
+fs.file-max = 65536
+```
+In FreeBSD and OS X you change the lines like this:
+```
+kern.maxfilesperproc=65536
+kern.maxfiles=65536
+```
+You can apply the settings by running `sysctl -p` or by rebooting.
+
[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 2d17d05e4..8ef4ed599 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->ctx = ctx;
}
-static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
+int close_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+
int destroy_data_file(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
- int ret, fd;
- char path[1024];
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
- fatal("uv_fs_close: %s", uv_strerror(ret));
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
- generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_unlink(NULL, &req, path, NULL);
- if (fd < 0) {
- fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.datafile_deletions;
- return 0;
+ return ret;
}
int create_data_file(struct rrdengine_datafile *datafile)
@@ -97,21 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile)
int ret, fd;
struct rrdeng_df_sb *superblock;
uv_buf_t iov;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
- S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
+ datafile->file = file;
+ ++ctx->stats.datafile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -125,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_write: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
- fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
+ if (ret < 0) {
+ destroy_data_file(datafile);
+ return ret;
+ }
- datafile->file = file;
datafile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
- ++ctx->stats.datafile_creations;
return 0;
}
@@ -182,25 +205,17 @@ static int load_data_file(struct rrdengine_datafile *datafile)
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
uv_file file;
- int ret, fd;
+ int ret, fd, error;
uint64_t file_size;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- /* if (UV_ENOENT != fd) */
- error("uv_fs_fsopen: %s", uv_strerror(fd));
- uv_fs_req_cleanup(&req);
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
info("Initializing data file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
@@ -221,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile)
return 0;
error:
- (void) uv_fs_close(NULL, &req, file, NULL);
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
uv_fs_req_cleanup(&req);
- return ret;
+ return error;
}
static int scan_data_files_cmp(const void *a, const void *b)
{
struct rrdengine_datafile *file1, *file2;
- char path1[1024], path2[1024];
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
file1 = *(struct rrdengine_datafile **)a;
file2 = *(struct rrdengine_datafile **)b;
@@ -238,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b)
return strcmp(path1, path2);
}
-/* Returns number of datafiles that were loaded */
+/* Returns number of datafiles that were loaded or < 0 on error */
static int scan_data_files(struct rrdengine_instance *ctx)
{
int ret;
@@ -249,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx)
struct rrdengine_journalfile *journalfile;
ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
- assert(ret >= 0);
- assert(req.result >= 0);
+ if (ret < 0) {
+ assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
info("Found %d files in path %s", ret, ctx->dbfiles_path);
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
- info("Scanning file \"%s\"", dent.name);
+ info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
- info("Matched file \"%s\"", dent.name);
+ info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
@@ -266,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
uv_fs_req_cleanup(&req);
+ if (0 == matched_files) {
+ freez(datafiles);
+ return 0;
+ }
if (matched_files == MAX_DATAFILES) {
error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
}
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ /* TODO: change this when tiering is implemented */
+ ctx->last_fileno = datafiles[matched_files - 1]->fileno;
+
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
datafile = datafiles[i];
ret = load_data_file(datafile);
if (0 != ret) {
- free(datafile);
+ freez(datafile);
++failed_to_load;
- continue;
+ break;
}
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
- free(datafile);
- free(journalfile);
+ close_data_file(datafile);
+ freez(datafile);
+ freez(journalfile);
++failed_to_load;
- continue;
+ break;
}
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
}
+ freez(datafiles);
if (failed_to_load) {
- error("%u files failed to load.", failed_to_load);
+ error("%u datafiles failed to load.", failed_to_load);
+ finalize_data_files(ctx);
+ return UV_EIO;
}
- free(datafiles);
- return matched_files - failed_to_load;
+ return matched_files;
}
/* Creates a datafile and a journalfile pair */
-void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
{
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
int ret;
+ char path[RRDENG_PATH_MAX];
- info("Creating new data and journal files.");
+ info("Creating new data and journal files in path %s", ctx->dbfiles_path);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, fileno);
ret = create_data_file(datafile);
- assert(!ret);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Created data file \"%s\".", path);
+ } else {
+ goto error_after_datafile;
+ }
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = create_journal_file(journalfile, datafile);
- assert(!ret);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Created journal file \"%s\".", path);
+ } else {
+ goto error_after_journalfile;
+ }
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
+
+ return 0;
+
+error_after_journalfile:
+ destroy_data_file(datafile);
+ freez(journalfile);
+error_after_datafile:
+ freez(datafile);
+ return ret;
}
-/* Page cache must already be initialized. */
+/* Page cache must already be initialized.
+ * Return 0 on success.
+ */
int init_data_files(struct rrdengine_instance *ctx)
{
int ret;
ret = scan_data_files(ctx);
- if (0 == ret) {
- info("Data files not found, creating.");
- create_new_datafile_pair(ctx, 1, 1);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ } else if (0 == ret) {
+ info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
+ ret = create_new_datafile_pair(ctx, 1, 1);
+ if (ret) {
+ error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ }
+ ctx->last_fileno = 1;
}
+
return 0;
+}
+
+void finalize_data_files(struct rrdengine_instance *ctx)
+{
+ struct rrdengine_datafile *datafile, *next_datafile;
+ struct rrdengine_journalfile *journalfile;
+ struct extent_info *extent, *next_extent;
+
+ for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
+ journalfile = datafile->journalfile;
+ next_datafile = datafile->next;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
+ next_extent = extent->next;
+ freez(extent);
+ }
+ close_journal_file(journalfile, datafile);
+ close_data_file(datafile);
+ freez(journalfile);
+ freez(datafile);
+
+ }
} \ No newline at end of file
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index c5c8f31f3..eeb11310b 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -26,7 +26,7 @@ struct extent_info {
uint8_t number_of_pages;
struct rrdengine_datafile *datafile;
struct extent_info *next;
- struct rrdeng_page_cache_descr *pages[];
+ struct rrdeng_page_descr *pages[];
};
struct rrdengine_df_extents {
@@ -55,9 +55,12 @@ struct rrdengine_datafile_list {
extern void df_extent_insert(struct extent_info *extent);
extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+extern int close_data_file(struct rrdengine_datafile *datafile);
extern int destroy_data_file(struct rrdengine_datafile *datafile);
extern int create_data_file(struct rrdengine_datafile *datafile);
-extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
extern int init_data_files(struct rrdengine_instance *ctx);
+extern void finalize_data_files(struct rrdengine_instance *ctx);
#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 44d8461db..30eaa0ec6 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
free(io_descr->buf);
- free(io_descr);
+ freez(io_descr);
}
/* Careful to always call this before creating a new journal file */
@@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
-static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
@@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->datafile = datafile;
}
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
- int ret, fd;
- char path[1024];
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- fatal("uv_fs_close: %s", uv_strerror(ret));
- exit(ret);
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
- generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_unlink(NULL, &req, path, NULL);
- if (fd < 0) {
- fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
- return 0;
+ return ret;
}
int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
@@ -143,21 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
int ret, fd;
struct rrdeng_jf_sb *superblock;
uv_buf_t iov;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
- S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
+ journalfile->file = file;
+ ++ctx->stats.journalfile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -170,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_write: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
- fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
+ if (ret < 0) {
+ destroy_journal_file(journalfile, datafile);
+ return ret;
+ }
- journalfile->file = file;
journalfile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
- ++ctx->stats.journalfile_creations;
return 0;
}
@@ -226,7 +247,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
{
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned i, count, payload_length, descr_size, valid_pages;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
struct extent_info *extent;
/* persistent structures */
struct rrdeng_jf_store_data *jf_metric_data;
@@ -271,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
@@ -406,25 +429,17 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
{
uv_fs_t req;
uv_file file;
- int ret, fd;
+ int ret, fd, error;
uint64_t file_size, max_id;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- /* if (UV_ENOENT != fd) */
- error("uv_fs_fsopen: %s", uv_strerror(fd));
- uv_fs_req_cleanup(&req);
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
info("Loading journal file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
@@ -449,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
return 0;
error:
- (void) uv_fs_close(NULL, &req, file, NULL);
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
uv_fs_req_cleanup(&req);
- return ret;
+ return error;
}
void init_commit_log(struct rrdengine_instance *ctx)
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index 50489aeee..0df66304d 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -33,9 +33,11 @@ struct transaction_commit_log {
unsigned buf_size;
};
+extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
+extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index c90947a67..124f2448b 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -8,28 +8,29 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
/* always inserts into tail */
static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
if (likely(NULL != pg_cache->replaceQ.tail)) {
- descr->prev = pg_cache->replaceQ.tail;
- pg_cache->replaceQ.tail->next = descr;
+ pg_cache_descr->prev = pg_cache->replaceQ.tail;
+ pg_cache->replaceQ.tail->next = pg_cache_descr;
}
if (unlikely(NULL == pg_cache->replaceQ.head)) {
- pg_cache->replaceQ.head = descr;
+ pg_cache->replaceQ.head = pg_cache_descr;
}
- pg_cache->replaceQ.tail = descr;
+ pg_cache->replaceQ.tail = pg_cache_descr;
}
static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *prev, *next;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
- prev = descr->prev;
- next = descr->next;
+ prev = pg_cache_descr->prev;
+ next = pg_cache_descr->next;
if (likely(NULL != prev)) {
prev->next = next;
@@ -37,17 +38,17 @@ static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ct
if (likely(NULL != next)) {
next->prev = prev;
}
- if (unlikely(descr == pg_cache->replaceQ.head)) {
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
pg_cache->replaceQ.head = next;
}
- if (unlikely(descr == pg_cache->replaceQ.tail)) {
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
pg_cache->replaceQ.tail = prev;
}
- descr->prev = descr->next = NULL;
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
}
void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -57,7 +58,7 @@ void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
}
void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -66,7 +67,7 @@ void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -76,40 +77,28 @@ void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
-struct rrdeng_page_cache_descr *pg_cache_create_descr(void)
+struct rrdeng_page_descr *pg_cache_create_descr(void)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
descr = mallocz(sizeof(*descr));
- descr->page = NULL;
descr->page_length = 0;
descr->start_time = INVALID_TIME;
descr->end_time = INVALID_TIME;
descr->id = NULL;
descr->extent = NULL;
- descr->flags = 0;
- descr->prev = descr->next = descr->private = NULL;
- descr->refcnt = 0;
- descr->waiters = 0;
- descr->handle = NULL;
- assert(0 == uv_cond_init(&descr->cond));
- assert(0 == uv_mutex_init(&descr->mutex));
+ descr->pg_cache_descr_state = 0;
+ descr->pg_cache_descr = NULL;
return descr;
}
-void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr)
-{
- uv_cond_destroy(&descr->cond);
- uv_mutex_destroy(&descr->mutex);
- free(descr);
-}
-
/* The caller must hold page descriptor lock. */
-void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
{
- if (descr->waiters)
- uv_cond_broadcast(&descr->cond);
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->waiters)
+ uv_cond_broadcast(&pg_cache_descr->cond);
}
/*
@@ -117,11 +106,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
-void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
{
- ++descr->waiters;
- uv_cond_wait(&descr->cond, &descr->mutex);
- --descr->waiters;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
+ --pg_cache_descr->waiters;
}
/*
@@ -129,14 +120,15 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
-unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
+unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
unsigned long flags;
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_wait_event_unsafe(descr);
- flags = descr->flags;
- uv_mutex_unlock(&descr->mutex);
+ flags = pg_cache_descr->flags;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
return flags;
}
@@ -146,15 +138,17 @@ unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
* Gets a reference to the page descriptor.
* Returns 1 on success and 0 on failure.
*/
-int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
- if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && descr->refcnt)) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
if (exclusive_access)
- descr->flags |= RRD_PAGE_LOCKED;
- ++descr->refcnt;
+ pg_cache_descr->flags |= RRD_PAGE_LOCKED;
+ ++pg_cache_descr->refcnt;
return 1;
}
@@ -163,10 +157,12 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive
* The caller must hold page descriptor lock.
* Same return values as pg_cache_try_get_unsafe() without doing anything.
*/
-int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
- if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && descr->refcnt)) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
@@ -177,23 +173,24 @@ int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive
* The caller must hold the page descriptor lock.
* This function may block doing cleanup.
*/
-void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
{
- descr->flags &= ~RRD_PAGE_LOCKED;
- if (0 == --descr->refcnt) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ if (0 == --pg_cache_descr->refcnt) {
pg_cache_wake_up_waiters_unsafe(descr);
}
- /* TODO: perform cleanup */
}
/*
* This function may block doing cleanup.
*/
-void pg_cache_put(struct rrdeng_page_cache_descr *descr)
+void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_put_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
/* The caller must hold the page cache lock */
@@ -224,7 +221,7 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1)
- debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================",
+ debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
number);
while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
@@ -266,7 +263,7 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) {
debug(D_RRDENGINE,
- "=================================\nPage cache full. Trying to reserve %u pages.\n=================================",
+ "==Page cache full. Trying to reserve %u pages.==",
number);
do {
if (!pg_cache_try_evict_one_page_unsafe(ctx))
@@ -286,18 +283,20 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n
}
/* The caller must hold the page cache and the page descriptor locks in that order */
-static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
- free(descr->page);
- descr->page = NULL;
- descr->flags &= ~RRD_PAGE_POPULATED;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ freez(pg_cache_descr->page);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
pg_cache_release_pages_unsafe(ctx, 1);
++ctx->stats.pg_cache_evictions;
}
/*
* The caller must hold the page cache lock.
- * Lock order: page cache -> replaceQ -> descriptor
+ * Lock order: page cache -> replaceQ -> page descriptor
* This function iterates all pages and tries to evict one.
* If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
* or it sets it to NULL if no write-back is in progress.
@@ -308,36 +307,40 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
{
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned long old_flags;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr = NULL;
uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) {
- uv_mutex_lock(&descr->mutex);
- old_flags = descr->flags;
+ for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
+ descr = pg_cache_descr->descr;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ old_flags = pg_cache_descr->flags;
if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
/* must evict */
pg_cache_evict_unsafe(ctx, descr);
pg_cache_put_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
pg_cache_replaceQ_delete_unsafe(ctx, descr);
+
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+
return 1;
}
- uv_mutex_unlock(&descr->mutex);
- };
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
/* failed to evict */
return 0;
}
-/*
- * TODO: last waiter frees descriptor ?
- */
-void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty)
{
struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = NULL;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
int ret;
@@ -353,8 +356,9 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
uv_rwlock_wrunlock(&page_index->lock);
if (unlikely(0 == ret)) {
error("Page under deletion was not in index.");
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ if (unlikely(debug_flags & D_RRDENGINE)) {
+ print_page_descr(descr);
+ }
goto destroy;
}
assert(1 == ret);
@@ -364,23 +368,26 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
--pg_cache->page_descriptors;
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
while (!pg_cache_try_get_unsafe(descr, 1)) {
debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
- }
- /* even a locked page could be dirty */
- while (unlikely(descr->flags & RRD_PAGE_DIRTY)) {
- debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
pg_cache_wait_event_unsafe(descr);
}
- uv_mutex_unlock(&descr->mutex);
+ if (!remove_dirty) {
+ /* even a locked page could be dirty */
+ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
+ debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
- if (descr->flags & RRD_PAGE_POPULATED) {
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
/* only after locking can it be safely deleted from LRU */
pg_cache_replaceQ_delete(ctx, descr);
@@ -388,13 +395,15 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
pg_cache_evict_unsafe(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
+
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
destroy:
- pg_cache_destroy_descr(descr);
+ freez(descr);
pg_cache_update_metric_times(page_index);
}
-static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time)
+static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
{
usec_t pg_start, pg_end;
@@ -405,13 +414,13 @@ static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, u
(pg_start >= start_time && pg_start <= end_time);
}
-static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time)
+static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
{
return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
}
/* Update metric oldest and latest timestamps efficiently when adding new values */
-void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr)
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
{
usec_t oldest_time = page_index->oldest_time;
usec_t latest_time = page_index->latest_time;
@@ -429,7 +438,7 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
{
Pvoid_t *firstPValue, *lastPValue;
Word_t firstIndex, lastIndex;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
usec_t oldest_time = INVALID_TIME;
usec_t latest_time = INVALID_TIME;
@@ -460,16 +469,23 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
/* If index is NULL lookup by UUID (descr->id) */
void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
+ unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
+
+ if (0 != pg_cache_descr_state) {
+ /* there is page cache descriptor pre-allocated state */
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- if (descr->flags & RRD_PAGE_POPULATED) {
- pg_cache_reserve_pages(ctx, 1);
- if (!(descr->flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_insert(ctx, descr);
+ assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ pg_cache_reserve_pages(ctx, 1);
+ if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_insert(ctx, descr);
+ }
}
if (unlikely(NULL == index)) {
@@ -503,7 +519,8 @@ struct pg_cache_page_index *
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ struct page_cache_descr *pg_cache_descr = NULL;
int i, j, k, count, found;
unsigned long flags;
Pvoid_t *PValue;
@@ -557,12 +574,13 @@ struct pg_cache_page_index *
if (unlikely(0 == descr->page_length))
continue;
- uv_mutex_lock(&descr->mutex);
- flags = descr->flags;
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
if (pg_cache_can_get_unsafe(descr, 0)) {
if (flags & RRD_PAGE_POPULATED) {
/* success */
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
continue;
}
@@ -570,19 +588,19 @@ struct pg_cache_page_index *
if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
preload_array[count++] = descr;
if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
break;
}
}
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
- };
+ }
uv_rwlock_rdunlock(&page_index->lock);
failed_to_reserve = 0;
for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
struct rrdeng_cmd cmd;
- struct rrdeng_page_cache_descr *next;
+ struct rrdeng_page_descr *next;
descr = preload_array[i];
if (NULL == descr) {
@@ -622,7 +640,7 @@ struct pg_cache_page_index *
if (NULL == descr) {
continue;
}
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
}
if (!count) {
@@ -637,12 +655,13 @@ struct pg_cache_page_index *
* When point_in_time is INVALID_TIME get any page.
* If index is NULL lookup by UUID (id).
*/
-struct rrdeng_page_cache_descr *
+struct rrdeng_page_descr *
pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
usec_t point_in_time)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *descr = NULL;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
unsigned long flags;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
@@ -682,11 +701,12 @@ struct rrdeng_page_cache_descr *
pg_cache_release_pages(ctx, 1);
return NULL;
}
- uv_mutex_lock(&descr->mutex);
- flags = descr->flags;
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
/* success */
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
break;
}
@@ -702,14 +722,14 @@ struct rrdeng_page_cache_descr *
debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
if(unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
- while (!(descr->flags & RRD_PAGE_POPULATED)) {
+ while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
pg_cache_wait_event_unsafe(descr);
}
/* success */
/* Downgrade exclusive reference to allow other readers */
- descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
pg_cache_wake_up_waiters_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
return descr;
}
@@ -720,7 +740,7 @@ struct rrdeng_page_cache_descr *
if (!(flags & RRD_PAGE_POPULATED))
page_not_in_cache = 1;
pg_cache_wait_event_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
/* reset scan to find again */
uv_rwlock_rdlock(&page_index->lock);
@@ -747,6 +767,7 @@ struct pg_cache_page_index *create_page_index(uuid_t *id)
assert(0 == uv_rwlock_init(&page_index->lock));
page_index->oldest_time = INVALID_TIME;
page_index->latest_time = INVALID_TIME;
+ page_index->prev = NULL;
return page_index;
}
@@ -756,6 +777,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx)
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
+ pg_cache->metrics_index.last_page_index = NULL;
assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
}
@@ -789,4 +811,65 @@ void init_page_cache(struct rrdengine_instance *ctx)
init_metrics_index(ctx);
init_replaceQ(ctx);
init_commited_page_index(ctx);
+}
+
+void free_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Word_t ret_Judy, bytes_freed = 0;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index, *prev_page_index;
+ Word_t Index;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Free commited page index */
+ ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0);
+ assert(NULL == pg_cache->commited_page_index.JudyL_array);
+ bytes_freed += ret_Judy;
+
+ for (page_index = pg_cache->metrics_index.last_page_index ;
+ page_index != NULL ;
+ page_index = prev_page_index) {
+ prev_page_index = page_index->prev;
+
+ /* Find first page in range */
+ Index = (Word_t) 0;
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ while (descr != NULL) {
+ /* Iterate all page descriptors of this metric */
+
+ if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ /* Check rrdenglocking.c */
+ pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ freez(pg_cache_descr->page);
+ bytes_freed += RRDENG_BLOCK_SIZE;
+ }
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ bytes_freed += sizeof(*pg_cache_descr);
+ }
+ freez(descr);
+ bytes_freed += sizeof(*descr);
+
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ }
+
+ /* Free page index */
+ ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
+ assert(NULL == page_index->JudyL_array);
+ bytes_freed += ret_Judy;
+ freez(page_index);
+ bytes_freed += sizeof(*page_index);
+ }
+ /* Free metrics index */
+ ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
+ assert(NULL == pg_cache->metrics_index.JudyHS_array);
+ bytes_freed += ret_Judy;
+
+ info("Freed %lu bytes of memory from page cache.", bytes_freed);
} \ No newline at end of file
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index d1e29aaab..b5670f82a 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -5,9 +5,10 @@
#include "rrdengine.h"
-/* Forward declerations */
+/* Forward declarations */
struct rrdengine_instance;
struct extent_info;
+struct rrdeng_page_descr;
#define INVALID_TIME (0)
@@ -18,24 +19,46 @@ struct extent_info;
#define RRD_PAGE_WRITE_PENDING (1LU << 3)
#define RRD_PAGE_POPULATED (1LU << 4)
-struct rrdeng_page_cache_descr {
+struct page_cache_descr {
+ struct rrdeng_page_descr *descr; /* parent descriptor */
void *page;
- uint32_t page_length;
- usec_t start_time;
- usec_t end_time;
- uuid_t *id; /* never changes */
- struct extent_info *extent;
unsigned long flags;
- void *private;
- struct rrdeng_page_cache_descr *prev;
- struct rrdeng_page_cache_descr *next;
+ struct page_cache_descr *prev; /* LRU */
+ struct page_cache_descr *next; /* LRU */
- /* TODO: move waiter logic to concurrency table */
unsigned refcnt;
uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */
uv_cond_t cond;
unsigned waiters;
- struct rrdeng_collect_handle *handle; /* API user */
+};
+
+/* Page cache descriptor flags, state = 0 means no descriptor */
+#define PG_CACHE_DESCR_ALLOCATED (1LU << 0)
+#define PG_CACHE_DESCR_DESTROY (1LU << 1)
+#define PG_CACHE_DESCR_LOCKED (1LU << 2)
+#define PG_CACHE_DESCR_SHIFT (3)
+#define PG_CACHE_DESCR_USERS_MASK (((unsigned long)-1) << PG_CACHE_DESCR_SHIFT)
+#define PG_CACHE_DESCR_FLAGS_MASK (((unsigned long)-1) >> (BITS_PER_ULONG - PG_CACHE_DESCR_SHIFT))
+
+/*
+ * Page cache descriptor state bits (works for both 32-bit and 64-bit architectures):
+ *
+ * 63 ... 31 ... 3 | 2 | 1 | 0|
+ * -----------------------------+------------+------------+-----------|
+ * number of descriptor users | DESTROY | LOCKED | ALLOCATED |
+ */
+struct rrdeng_page_descr {
+ uint32_t page_length;
+ usec_t start_time;
+ usec_t end_time;
+ uuid_t *id; /* never changes */
+ struct extent_info *extent;
+
+ /* points to ephemeral page cache descriptor if the page resides in the cache */
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Compare-And-Swap target for page cache descriptor allocation algorithm */
+ volatile unsigned long pg_cache_descr_state;
};
#define PAGE_CACHE_MAX_PRELOAD_PAGES (256)
@@ -61,12 +84,15 @@ struct pg_cache_page_index {
* It's also written by the data deletion workqueue when data collection is disabled for this metric.
*/
usec_t latest_time;
+
+ struct pg_cache_page_index *prev;
};
/* maps UUIDs to page indices */
struct pg_cache_metrics_index {
uv_rwlock_t lock;
Pvoid_t JudyHS_array;
+ struct pg_cache_page_index *last_page_index;
};
/* gathers dirty pages to be written on disk */
@@ -85,12 +111,15 @@ struct pg_cache_commited_page_index {
unsigned nr_commited_pages;
};
-/* gathers populated pages to be evicted */
+/*
+ * Gathers populated pages to be evicted.
+ * Relies on page cache descriptors being there as it uses their memory.
+ */
struct pg_cache_replaceQ {
uv_rwlock_t lock; /* LRU lock */
- struct rrdeng_page_cache_descr *head; /* LRU */
- struct rrdeng_page_cache_descr *tail; /* MRU */
+ struct page_cache_descr *head; /* LRU */
+ struct page_cache_descr *tail; /* MRU */
};
struct page_cache { /* TODO: add statistics */
@@ -104,29 +133,31 @@ struct page_cache { /* TODO: add statistics */
unsigned populated_pages;
};
-extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr);
-extern void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr);
-extern unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
+extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
+extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr);
+ struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr);
+ struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr);
-extern struct rrdeng_page_cache_descr *pg_cache_create_descr(void);
-extern void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr);
-extern void pg_cache_put(struct rrdeng_page_cache_descr *descr);
+ struct rrdeng_page_descr *descr);
+extern struct rrdeng_page_descr *pg_cache_create_descr(void);
+extern int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access);
+extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
+extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
- struct rrdeng_page_cache_descr *descr);
-extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr);
+ struct rrdeng_page_descr *descr);
+extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty);
extern struct pg_cache_page_index *
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time);
-extern struct rrdeng_page_cache_descr *
+extern struct rrdeng_page_descr *
pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
usec_t point_in_time);
extern struct pg_cache_page_index *create_page_index(uuid_t *id);
extern void init_page_cache(struct rrdengine_instance *ctx);
-extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr);
+extern void free_page_cache(struct rrdengine_instance *ctx);
+extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
-#endif /* NETDATA_PAGECACHE_H */ \ No newline at end of file
+#endif /* NETDATA_PAGECACHE_H */
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index b8e4eba01..0f2dceaa4 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -3,6 +3,10 @@
#include "rrdengine.h"
+rrdeng_stats_t global_io_errors = 0;
+rrdeng_stats_t global_fs_errors = 0;
+rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
+
void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
@@ -27,12 +31,12 @@ void read_extent_cb(uv_fs_t* req)
struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
struct extent_io_descriptor *xt_io_descr;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
int ret;
unsigned i, j, count;
void *page, *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
- struct rrdengine_datafile *datafile;
/* persistent structures */
struct rrdeng_df_extent_header *header;
struct rrdeng_df_extent_trailer *trailer;
@@ -54,9 +58,13 @@ void read_extent_cb(uv_fs_t* req)
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
ret = crc32cmp(trailer->checksum, crc);
- datafile = xt_io_descr->descr_array[0]->extent->datafile;
- debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
- xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+ }
+#endif
if (unlikely(ret)) {
/* TODO: handle errors */
exit(UV_EIO);
@@ -97,36 +105,38 @@ void read_extent_cb(uv_fs_t* req)
(void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
}
pg_cache_replaceQ_insert(ctx, descr);
- uv_mutex_lock(&descr->mutex);
- descr->page = page;
- descr->flags |= RRD_PAGE_POPULATED;
- descr->flags &= ~RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags |= RRD_PAGE_POPULATED;
+ pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
if (xt_io_descr->release_descr) {
pg_cache_put_unsafe(descr);
} else {
pg_cache_wake_up_waiters_unsafe(descr);
}
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (RRD_NO_COMPRESSION != header->compression_algorithm) {
- free(uncompressed_buf);
+ freez(uncompressed_buf);
}
if (xt_io_descr->completion)
complete(xt_io_descr->completion);
cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
- free(xt_io_descr);
+ freez(xt_io_descr);
}
static void do_read_extent(struct rrdengine_worker_config* wc,
- struct rrdeng_page_cache_descr **descr,
+ struct rrdeng_page_descr **descr,
unsigned count,
uint8_t release_descr)
{
struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache_descr *pg_cache_descr;
int ret;
unsigned i, size_bytes, pos, real_io_size;
// uint32_t payload_length;
@@ -141,14 +151,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
- /* free(xt_io_descr);
+ /* freez(xt_io_descr);
return;*/
}
for (i = 0 ; i < count; ++i) {
- uv_mutex_lock(&descr[i]->mutex);
- descr[i]->flags |= RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_lock(ctx, descr[i]);
+ pg_cache_descr = descr[i]->pg_cache_descr;
+ pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
// payload_length = descr[i]->page_length;
- uv_mutex_unlock(&descr[i]->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
xt_io_descr->descr_array[i] = descr[i];
}
@@ -227,8 +238,8 @@ void flush_pages_cb(uv_fs_t* req)
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
struct extent_io_descriptor *xt_io_descr;
- struct rrdeng_page_cache_descr *descr;
- struct rrdengine_datafile *datafile;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
int ret;
unsigned i, count;
Word_t commit_id;
@@ -238,10 +249,13 @@ void flush_pages_cb(uv_fs_t* req)
error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
goto cleanup;
}
- datafile = xt_io_descr->descr_array[0]->extent->datafile;
- debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
- __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
-
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
+ __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ }
+#endif
count = xt_io_descr->descr_count;
for (i = 0 ; i < count ; ++i) {
/* care, we don't hold the descriptor mutex */
@@ -256,18 +270,19 @@ void flush_pages_cb(uv_fs_t* req)
pg_cache_replaceQ_insert(ctx, descr);
- uv_mutex_lock(&descr->mutex);
- descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
/* wake up waiters, care no reference being held */
pg_cache_wake_up_waiters_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (xt_io_descr->completion)
complete(xt_io_descr->completion);
cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
- free(xt_io_descr);
+ freez(xt_io_descr);
}
/*
@@ -283,7 +298,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
int compressed_size, max_compressed_size = 0;
unsigned i, count, size_bytes, pos, real_io_size;
uint32_t uncompressed_payload_length, payload_offset;
- struct rrdeng_page_cache_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
+ struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
+ struct page_cache_descr *pg_cache_descr;
struct extent_io_descriptor *xt_io_descr;
void *compressed_buf = NULL;
Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
@@ -311,15 +327,16 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
assert(0 != descr->page_length);
- uv_mutex_lock(&descr->mutex);
- if (!(descr->flags & RRD_PAGE_WRITE_PENDING)) {
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) {
/* care, no reference being held */
- descr->flags |= RRD_PAGE_WRITE_PENDING;
+ pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING;
uncompressed_payload_length += descr->page_length;
descr_commit_idx_array[count] = Index;
eligible_pages[count++] = descr;
}
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock);
@@ -345,9 +362,9 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
- /* free(xt_io_descr);*/
+ /* freez(xt_io_descr);*/
}
- (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_cache_descr *) * count);
+ (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
xt_io_descr->descr_count = count;
pos = 0;
@@ -378,7 +395,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
/* care, we don't hold the descriptor mutex */
- (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length);
+ (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length);
descr->extent = extent;
extent->pages[i] = descr;
@@ -397,7 +414,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
ctx->stats.after_compress_bytes += compressed_size;
debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
(void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
- free(compressed_buf);
+ freez(compressed_buf);
size_bytes = payload_offset + compressed_size + sizeof(*trailer);
header->payload_length = compressed_size;
break;
@@ -435,23 +452,36 @@ static void after_delete_old_data(uv_work_t *req, int status)
struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
- unsigned bytes;
+ unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
+ int ret;
+ char path[RRDENG_PATH_MAX];
(void)status;
datafile = ctx->datafiles.first;
journalfile = datafile->journalfile;
- bytes = datafile->pos + journalfile->pos;
+ datafile_bytes = datafile->pos;
+ journalfile_bytes = journalfile->pos;
+ deleted_bytes = 0;
+ info("Deleting data and journal file pair.");
datafile_list_delete(ctx, datafile);
- destroy_journal_file(journalfile, datafile);
- destroy_data_file(datafile);
- info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
- datafile->tier, datafile->fileno);
- free(journalfile);
- free(datafile);
+ ret = destroy_journal_file(journalfile, datafile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Deleted journal file \"%s\".", path);
+ deleted_bytes += journalfile_bytes;
+ }
+ ret = destroy_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Deleted data file \"%s\".", path);
+ deleted_bytes += datafile_bytes;
+ }
+ freez(journalfile);
+ freez(datafile);
- ctx->disk_space -= bytes;
- info("Reclaimed %u bytes of disk space.", bytes);
+ ctx->disk_space -= deleted_bytes;
+ info("Reclaimed %u bytes of disk space.", deleted_bytes);
/* unfreeze command processing */
wc->now_deleting.data = NULL;
@@ -464,7 +494,7 @@ static void delete_old_data(uv_work_t *req)
struct rrdengine_instance *ctx = req->data;
struct rrdengine_datafile *datafile;
struct extent_info *extent, *next;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
unsigned count, i;
/* Safe to use since it will be deleted after we are done */
@@ -474,10 +504,10 @@ static void delete_old_data(uv_work_t *req)
count = extent->number_of_pages;
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
- pg_cache_punch_hole(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 0);
}
next = extent->next;
- free(extent);
+ freez(extent);
}
}
@@ -487,6 +517,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
struct rrdengine_datafile *datafile;
unsigned current_size, target_size;
uint8_t out_of_space, only_one_datafile;
+ int ret;
out_of_space = 0;
if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
@@ -501,7 +532,10 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
/* Finalize data and journal file and create a new pair */
wal_flush_transaction_buffer(wc);
- create_new_datafile_pair(ctx, 1, datafile->fileno + 1);
+ ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1);
+ if (likely(!ret)) {
+ ++ctx->last_fileno;
+ }
}
if (unlikely(out_of_space)) {
/* delete old data */
@@ -509,18 +543,30 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
/* already deleting data */
return;
}
- info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
- ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ if (NULL == ctx->datafiles.first->next) {
+ error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\""
+ " to reclaim space, there are no other file pairs left.",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ return;
+ }
+ info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
wc->now_deleting.data = ctx;
- uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data);
+ assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data));
}
}
+/* return 0 on success */
int init_rrd_files(struct rrdengine_instance *ctx)
{
return init_data_files(ctx);
}
+void finalize_rrd_files(struct rrdengine_instance *ctx)
+{
+ return finalize_data_files(ctx);
+}
+
void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
{
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
@@ -588,7 +634,6 @@ void async_cb(uv_async_t *handle)
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
- struct rrdengine_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
@@ -608,7 +653,7 @@ void timer_cb(uv_timer_t* handle)
#ifdef NETDATA_INTERNAL_CHECKS
{
char buf[4096];
- debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf)));
+ debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
}
#endif
}
@@ -623,7 +668,7 @@ void rrdeng_worker(void* arg)
struct rrdengine_worker_config* wc = arg;
struct rrdengine_instance *ctx = wc->ctx;
uv_loop_t* loop;
- int shutdown;
+ int shutdown, ret;
enum rrdeng_opcode opcode;
uv_timer_t timer_req;
struct rrdeng_cmd cmd;
@@ -631,22 +676,35 @@ void rrdeng_worker(void* arg)
rrdeng_init_cmd_queue(wc);
loop = wc->loop = mallocz(sizeof(uv_loop_t));
- uv_loop_init(loop);
+ ret = uv_loop_init(loop);
+ if (ret) {
+ error("uv_loop_init(): %s", uv_strerror(ret));
+ goto error_after_loop_init;
+ }
loop->data = wc;
- uv_async_init(wc->loop, &wc->async, async_cb);
+ ret = uv_async_init(wc->loop, &wc->async, async_cb);
+ if (ret) {
+ error("uv_async_init(): %s", uv_strerror(ret));
+ goto error_after_async_init;
+ }
wc->async.data = wc;
wc->now_deleting.data = NULL;
/* dirty page flushing timer */
- uv_timer_init(loop, &timer_req);
+ ret = uv_timer_init(loop, &timer_req);
+ if (ret) {
+ error("uv_timer_init(): %s", uv_strerror(ret));
+ goto error_after_timer_init;
+ }
timer_req.data = wc;
+ wc->error = 0;
/* wake up initialization thread */
complete(&ctx->rrdengine_completion);
- uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS);
+ assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
while (shutdown == 0 || uv_loop_alive(loop)) {
uv_run(loop, UV_RUN_DEFAULT);
@@ -661,12 +719,6 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_SHUTDOWN:
shutdown = 1;
- if (unlikely(wc->now_deleting.data)) {
- /* postpone shutdown until after deletion */
- info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
- rrdeng_enq_cmd(wc, &cmd);
- break;
- }
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
* it is however undocumented behaviour and we need to be aware if this becomes
@@ -675,10 +727,6 @@ void rrdeng_worker(void* arg)
uv_close((uv_handle_t *)&wc->async, NULL);
assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
- info("Shutting down RRD engine event loop.");
- while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all commited pages. */
- }
break;
case RRDENG_READ_PAGE:
do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
@@ -690,14 +738,14 @@ void rrdeng_worker(void* arg)
do_commit_transaction(wc, STORE_DATA, NULL);
break;
case RRDENG_FLUSH_PAGES: {
- unsigned total_bytes, bytes_written;
+ unsigned bytes_written;
/* First I/O should be enough to call completion */
bytes_written = do_flush_pages(wc, 1, cmd.completion);
- for (total_bytes = bytes_written ;
- bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
- total_bytes += bytes_written) {
- bytes_written = do_flush_pages(wc, 1, NULL);
+ if (bytes_written) {
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all commited pages. */
+ }
}
break;
}
@@ -708,6 +756,13 @@ void rrdeng_worker(void* arg)
} while (opcode != RRDENG_NOOP);
}
/* cleanup operations of the event loop */
+ if (unlikely(wc->now_deleting.data)) {
+ info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
+ }
+ info("Shutting down RRD engine event loop.");
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all commited pages. */
+ }
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);
@@ -716,7 +771,20 @@ void rrdeng_worker(void* arg)
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
assert(0 == uv_loop_close(loop));
- free(loop);
+ freez(loop);
+
+ return;
+
+error_after_timer_init:
+ uv_close((uv_handle_t *)&wc->async, NULL);
+error_after_async_init:
+ assert(0 == uv_loop_close(loop));
+error_after_loop_init:
+ freez(loop);
+
+ wc->error = UV_EAGAIN;
+ /* wake up initialization thread */
+ complete(&ctx->rrdengine_completion);
}
@@ -726,19 +794,19 @@ static void basic_functional_test(struct rrdengine_instance *ctx)
int i, j, failed_validations;
uuid_t uuid[NR_PAGES];
void *buf;
- struct rrdeng_page_cache_descr *handle[NR_PAGES];
- char uuid_str[37];
- char backup[NR_PAGES][37 * 100]; /* backup storage for page data verification */
+ struct rrdeng_page_descr *handle[NR_PAGES];
+ char uuid_str[UUID_STR_LEN];
+ char backup[NR_PAGES][UUID_STR_LEN * 100]; /* backup storage for page data verification */
for (i = 0 ; i < NR_PAGES ; ++i) {
uuid_generate(uuid[i]);
uuid_unparse_lower(uuid[i], uuid_str);
// fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str);
- buf = rrdeng_create_page(&uuid[i], &handle[i]);
+ buf = rrdeng_create_page(ctx, &uuid[i], &handle[i]);
/* Each page contains 10 times its own UUID stringified */
for (j = 0 ; j < 100 ; ++j) {
- strcpy(buf + 37 * j, uuid_str);
- strcpy(backup[i] + 37 * j, uuid_str);
+ strcpy(buf + UUID_STR_LEN * j, uuid_str);
+ strcpy(backup[i] + UUID_STR_LEN * j, uuid_str);
}
rrdeng_commit_page(ctx, handle[i], (Word_t)i);
}
@@ -750,7 +818,7 @@ static void basic_functional_test(struct rrdengine_instance *ctx)
++failed_validations;
fprintf(stderr, "Page %d was LOST.\n", i);
}
- if (memcmp(backup[i], buf, 37 * 100)) {
+ if (memcmp(backup[i], buf, UUID_STR_LEN * 100)) {
++failed_validations;
fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i);
}
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 141bb9c63..6f6a6f8ff 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -22,6 +22,7 @@
#include "journalfile.h"
#include "rrdengineapi.h"
#include "pagecache.h"
+#include "rrdenglocking.h"
#ifdef NETDATA_RRD_INTERNALS
@@ -59,10 +60,10 @@ struct rrdeng_cmd {
enum rrdeng_opcode opcode;
union {
struct rrdeng_read_page {
- struct rrdeng_page_cache_descr *page_cache_descr;
+ struct rrdeng_page_descr *page_cache_descr;
} read_page;
struct rrdeng_read_extent {
- struct rrdeng_page_cache_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
+ struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
int page_count;
} read_extent;
struct completion *completion;
@@ -85,7 +86,7 @@ struct extent_io_descriptor {
struct completion *completion;
unsigned descr_count;
int release_descr;
- struct rrdeng_page_cache_descr *descr_array[MAX_PAGES_PER_EXTENT];
+ struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
};
@@ -111,6 +112,8 @@ struct rrdengine_worker_config {
uv_cond_t cmd_cond;
volatile unsigned queue_size;
struct rrdeng_cmdqueue cmd_queue;
+
+ int error;
};
/*
@@ -142,10 +145,19 @@ struct rrdengine_statistics {
rrdeng_stats_t datafile_deletions;
rrdeng_stats_t journalfile_creations;
rrdeng_stats_t journalfile_deletions;
+ rrdeng_stats_t page_cache_descriptors;
+ rrdeng_stats_t io_errors;
+ rrdeng_stats_t fs_errors;
};
+/* I/O errors global counter */
+extern rrdeng_stats_t global_io_errors;
+/* File-System errors global counter */
+extern rrdeng_stats_t global_fs_errors;
+/* number of File-Descriptors that have been reserved by dbengine */
+extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
+
struct rrdengine_instance {
- rrdengine_state_t rrdengine_state;
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
struct page_cache pg_cache;
@@ -155,6 +167,7 @@ struct rrdengine_instance {
char dbfiles_path[FILENAME_MAX+1];
uint64_t disk_space;
uint64_t max_disk_space;
+ unsigned last_fileno; /* newest index of datafile and journalfile */
unsigned long max_cache_pages;
unsigned long cache_pages_low_watermark;
@@ -163,6 +176,7 @@ struct rrdengine_instance {
extern void sanity_check(void);
extern int init_rrd_files(struct rrdengine_instance *ctx);
+extern void finalize_rrd_files(struct rrdengine_instance *ctx);
extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
extern void rrdeng_worker(void* arg);
extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index a4e711554..a87ce6d64 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -41,6 +41,7 @@ void rrdeng_store_metric_init(RRDDIM *rd)
handle->descr = NULL;
handle->prev_descr = NULL;
+ handle->unaligned_page = 0;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t));
@@ -54,59 +55,140 @@ void rrdeng_store_metric_init(RRDDIM *rd)
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(&temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
rd->state->rrdeng_uuid = &page_index->id;
handle->page_index = page_index;
}
+/* The page must be populated and referenced */
+static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
+{
+ unsigned i;
+ uint8_t has_only_empty_metrics = 1;
+ storage_number *page;
+
+ page = descr->pg_cache_descr->page;
+ for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) {
+ if (SN_EMPTY_SLOT != page[i]) {
+ has_only_empty_metrics = 0;
+ break;
+ }
+ }
+ return has_only_empty_metrics;
+}
+
+void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_descr *descr;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (unlikely(NULL == descr)) {
+ return;
+ }
+ if (likely(descr->page_length)) {
+ int ret, page_is_empty;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+#endif
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
+ }
+ page_is_empty = page_has_only_empty_metrics(descr);
+ if (page_is_empty) {
+ debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_put(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 1);
+ handle->prev_descr = NULL;
+ } else {
+ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ ret = pg_cache_try_get_unsafe(descr, 0);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ assert (1 == ret);
+
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ handle->prev_descr = descr;
+ }
+ } else {
+ freez(descr->pg_cache_descr->page);
+ rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
+ freez(descr);
+ }
+ handle->descr = NULL;
+}
+
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
storage_number *page;
+ uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
pg_cache = &ctx->pg_cache;
descr = handle->descr;
- if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) {
- if (descr) {
- descr->handle = NULL;
- if (descr->page_length) {
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
- /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
- ++descr->refcnt;
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- if (handle->prev_descr) {
- /* unpin old second page */
- pg_cache_put(handle->prev_descr);
- }
- handle->prev_descr = descr;
- } else {
- free(descr->page);
- free(descr);
- handle->descr = NULL;
- }
+
+ if (descr) {
+ /* Make alignment decisions */
+
+ if (descr->page_length == rd->rrdset->rrddim_page_alignment) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ /* is the metric far enough out of alignment with the others? */
+ if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) {
+ handle->unaligned_page = 1;
+ debug(D_RRDENGINE, "Metric page is not aligned with chart:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
}
- page = rrdeng_create_page(&handle->page_index->id, &descr);
+ if (unlikely(handle->unaligned_page &&
+ /* did the other metrics change page? */
+ rd->rrdset->rrddim_page_alignment <= sizeof(number))) {
+ debug(D_RRDENGINE, "Flushing unaligned metric page.");
+ must_flush_unaligned_page = 1;
+ handle->unaligned_page = 0;
+ }
+ }
+ if (unlikely(NULL == descr ||
+ descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE ||
+ must_flush_unaligned_page)) {
+ rrdeng_store_metric_flush_current_page(rd);
+
+ page = rrdeng_create_page(ctx, &handle->page_index->id, &descr);
assert(page);
- handle->prev_descr = handle->descr;
+
handle->descr = descr;
- descr->handle = handle;
+
uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
- }
- page = descr->page;
+ if (0 == rd->rrdset->rrddim_page_alignment) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ }
+ page = descr->pg_cache_descr->page;
page[descr->page_length / sizeof(number)] = number;
descr->end_time = point_in_time;
descr->page_length += sizeof(number);
+ if (perfect_page_alignment)
+ rd->rrdset->rrddim_page_alignment = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time)) {
descr->start_time = point_in_time;
@@ -126,26 +208,13 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
- descr = handle->descr;
- if (descr) {
- descr->handle = NULL;
- if (descr->page_length) {
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- if (handle->prev_descr) {
- /* unpin old second page */
- pg_cache_put(handle->prev_descr);
- }
- } else {
- free(descr->page);
- free(descr);
- }
+ rrdeng_store_metric_flush_current_page(rd);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
}
}
@@ -174,7 +243,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
storage_number *page, ret;
unsigned position;
usec_t point_in_time;
@@ -198,7 +267,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
handle->descr = NULL;
}
descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time);
@@ -216,7 +285,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
ret = SN_EMPTY_SLOT;
goto out;
}
- page = descr->page;
+ page = descr->pg_cache_descr->page;
if (unlikely(descr->start_time == descr->end_time)) {
ret = page[0];
goto out;
@@ -248,7 +317,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
handle = &rrdimm_handle->rrdeng;
ctx = handle->ctx;
@@ -257,7 +326,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
}
@@ -283,30 +352,32 @@ time_t rrdeng_metric_oldest_time(RRDDIM *rd)
}
/* Also gets a reference for the page */
-void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr)
+void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
void *page;
- int ret;
-
/* TODO: check maximum number of pages in page cache limit */
- page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
descr = pg_cache_create_descr();
- descr->page = page;
descr->id = id; /* TODO: add page type: metric, log, something? */
- descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
- descr->refcnt = 1;
-
- debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------");
- if(unlikely(debug_flags & D_RRDENGINE))
+ page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
+ pg_cache_descr->refcnt = 1;
+
+ debug(D_RRDENGINE, "Created new page:");
+ if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
*ret_descr = descr;
return page;
}
/* The page must not be empty */
-void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -324,15 +395,16 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache
++pg_cache->commited_page_index.nr_commited_pages;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
/* Gets a reference for the page */
void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
- debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ debug(D_RRDENGINE, "Reading existing page:");
descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
if (NULL == descr) {
*handle = NULL;
@@ -340,16 +412,18 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **
return NULL;
}
*handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
- return descr->page;
+ return pg_cache_descr->page;
}
/* Gets a reference for the page */
void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
- debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ debug(D_RRDENGINE, "Reading existing page:");
descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
if (NULL == descr) {
*handle = NULL;
@@ -357,11 +431,18 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
return NULL;
}
*handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
- return descr->page;
+ return pg_cache_descr->page;
}
-void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+/*
+ * Gathers Database Engine statistics.
+ * Careful when modifying this function.
+ * You must not change the indices of the statistics or user code will break.
+ * You must not exceed RRDENG_NR_STATS or it will crash.
+ */
+void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -392,24 +473,46 @@ void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long
array[24] = (uint64_t)ctx->stats.datafile_deletions;
array[25] = (uint64_t)ctx->stats.journalfile_creations;
array[26] = (uint64_t)ctx->stats.journalfile_deletions;
+ array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
+ array[28] = (uint64_t)ctx->stats.io_errors;
+ array[29] = (uint64_t)ctx->stats.fs_errors;
+ array[30] = (uint64_t)global_io_errors;
+ array[31] = (uint64_t)global_fs_errors;
+ array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
+ assert(RRDENG_NR_STATS == 33);
}
/* Releases reference to page */
void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
{
(void)ctx;
- pg_cache_put((struct rrdeng_page_cache_descr *)handle);
+ pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
}
/*
- * Returns 0 on success, 1 on error
+ * Returns 0 on success, negative on error
*/
int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb)
{
struct rrdengine_instance *ctx;
int error;
+ uint32_t max_open_files;
sanity_check();
+
+ max_open_files = rlimit_nofile.rlim_cur / 4;
+
+ /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
+ if (rrdeng_reserved_file_descriptors > max_open_files) {
+ error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
+ (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files);
+
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EMFILE;
+ }
+
if (NULL == ctxp) {
/* for testing */
ctx = &default_global_ctx;
@@ -417,10 +520,6 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
} else {
*ctxp = ctx = callocz(1, sizeof(*ctx));
}
- if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) {
- return 1;
- }
- ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING;
ctx->global_compress_alg = RRD_LZ4;
if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
@@ -439,11 +538,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
init_commit_log(ctx);
error = init_rrd_files(ctx);
if (error) {
- ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED;
- if (ctx != &default_global_ctx) {
- freez(ctx);
- }
- return 1;
+ goto error_after_init_rrd_files;
}
init_completion(&ctx->rrdengine_completion);
@@ -451,9 +546,21 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
/* wait for worker thread to initialize */
wait_for_completion(&ctx->rrdengine_completion);
destroy_completion(&ctx->rrdengine_completion);
-
- ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED;
+ if (ctx->worker_config.error) {
+ goto error_after_rrdeng_worker;
+ }
return 0;
+
+error_after_rrdeng_worker:
+ finalize_rrd_files(ctx);
+error_after_init_rrd_files:
+ free_page_cache(ctx);
+ if (ctx != &default_global_ctx) {
+ freez(ctx);
+ *ctxp = NULL;
+ }
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EIO;
}
/*
@@ -464,10 +571,6 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
struct rrdeng_cmd cmd;
if (NULL == ctx) {
- /* TODO: move to per host basis */
- ctx = &default_global_ctx;
- }
- if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) {
return 1;
}
@@ -477,8 +580,12 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
assert(0 == uv_thread_join(&ctx->worker_config.thread));
+ finalize_rrd_files(ctx);
+ free_page_cache(ctx);
+
if (ctx != &default_global_ctx) {
freez(ctx);
}
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return 0;
} \ No newline at end of file
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index e76629a4b..e52aabcbe 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -7,16 +7,22 @@
#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (32)
#define RRDENG_MIN_DISK_SPACE_MB (256)
+
+#define RRDENG_NR_STATS (33)
+
+#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
+
extern int default_rrdeng_page_cache_mb;
extern int default_rrdeng_disk_quota_mb;
-extern void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr);
-extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
+extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id);
extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
extern void rrdeng_store_metric_init(RRDDIM *rd);
+extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd);
extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
extern void rrdeng_store_metric_finalize(RRDDIM *rd);
extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
@@ -26,7 +32,7 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
-extern void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+extern void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
/* must call once before using anything */
extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
index 25f57ba1b..96504b275 100644
--- a/database/engine/rrdenginelib.c
+++ b/database/engine/rrdenginelib.c
@@ -1,25 +1,52 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr)
+#define BUFSIZE (512)
+
+/* Caller must hold descriptor lock */
+void print_page_cache_descr(struct rrdeng_page_descr *descr)
{
- char uuid_str[37];
- char str[512];
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ char uuid_str[UUID_STR_LEN];
+ char str[BUFSIZE];
int pos = 0;
- uuid_unparse_lower(*page_cache_descr->id, uuid_str);
- pos += snprintfz(str, 512 - pos, "page(%p) id=%s\n"
+ uuid_unparse_lower(*descr->id, uuid_str);
+ pos += snprintfz(str, BUFSIZE - pos, "page(%p) id=%s\n"
"--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
- page_cache_descr->page, uuid_str,
- page_cache_descr->page_length,
- (uint64_t)page_cache_descr->start_time,
- (uint64_t)page_cache_descr->end_time);
- if (!page_cache_descr->extent) {
- pos += snprintfz(str + pos, 512 - pos, "N/A");
+ pg_cache_descr->page, uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time,
+ (uint64_t)descr->end_time);
+ if (!descr->extent) {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
+ } else {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
+ }
+
+ snprintfz(str + pos, BUFSIZE - pos, " flags:0x%2.2lX refcnt:%u\n\n", pg_cache_descr->flags, pg_cache_descr->refcnt);
+ debug(D_RRDENGINE, "%s", str);
+}
+
+void print_page_descr(struct rrdeng_page_descr *descr)
+{
+ char uuid_str[UUID_STR_LEN];
+ char str[BUFSIZE];
+ int pos = 0;
+
+ uuid_unparse_lower(*descr->id, uuid_str);
+ pos += snprintfz(str, BUFSIZE - pos, "id=%s\n"
+ "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
+ uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time,
+ (uint64_t)descr->end_time);
+ if (!descr->extent) {
+ pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
} else {
- pos += snprintfz(str + pos, 512 - pos, "%"PRIu64, page_cache_descr->extent->offset);
+ pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
}
- snprintfz(str + pos, 512 - pos, " flags:0x%2.2lX refcnt:%u\n\n", page_cache_descr->flags, page_cache_descr->refcnt);
+ snprintfz(str + pos, BUFSIZE - pos, "\n\n");
fputs(str, stderr);
}
@@ -51,6 +78,48 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
return 0;
}
+/*
+ * Tries to open a file in direct I/O mode, falls back to buffered mode if not possible.
+ * Returns UV error number that is < 0 on failure.
+ * On success sets (*file) to be the uv_file that was opened.
+ */
+int open_file_direct_io(char *path, int flags, uv_file *file)
+{
+ uv_fs_t req;
+ int fd, current_flags, direct;
+
+ for (direct = 1 ; direct >= 0 ; --direct) {
+#ifdef __APPLE__
+ /* Apple OS does not support O_DIRECT */
+ direct = 0;
+#endif
+ current_flags = flags;
+ if (direct) {
+ current_flags |= O_DIRECT;
+ }
+ fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ if ((direct) && (UV_EINVAL == fd)) {
+ error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path);
+ } else {
+ error("Failed to open file \"%s\".", path);
+ --direct; /* break the loop */
+ }
+ } else {
+ assert(req.result >= 0);
+ *file = req.result;
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ --direct; /* break the loop */
+ }
+ uv_fs_req_cleanup(&req);
+ }
+
+ return fd;
+}
+
char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size)
{
struct page_cache *pg_cache;
@@ -60,6 +129,7 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
"metric_API_producers: %ld\n"
"metric_API_consumers: %ld\n"
"page_cache_total_pages: %ld\n"
+ "page_cache_descriptors: %ld\n"
"page_cache_populated_pages: %ld\n"
"page_cache_commited_pages: %ld\n"
"page_cache_insertions: %ld\n"
@@ -87,6 +157,7 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
(long)ctx->stats.metric_API_producers,
(long)ctx->stats.metric_API_consumers,
(long)pg_cache->page_descriptors,
+ (long)ctx->stats.page_cache_descriptors,
(long)pg_cache->populated_pages,
(long)pg_cache->commited_page_index.nr_commited_pages,
(long)ctx->stats.pg_cache_insertions,
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
index bb6f072bf..36d414e89 100644
--- a/database/engine/rrdenginelib.h
+++ b/database/engine/rrdenginelib.h
@@ -6,11 +6,17 @@
#include "rrdengine.h"
/* Forward declarations */
-struct rrdeng_page_cache_descr;
+struct rrdeng_page_descr;
#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)
+#define BITS_PER_ULONG (sizeof(unsigned long) * 8)
+
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN (37)
+#endif
+
/* Taken from linux kernel */
#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)]))
@@ -25,6 +31,15 @@ typedef uintptr_t rrdeng_stats_t;
#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0)
#endif
+#define RRDENG_PATH_MAX (4096)
+
+/* returns old *ptr value */
+static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr,
+ unsigned long oldval, unsigned long newval)
+{
+ return __sync_val_compare_and_swap(ptr, oldval, newval);
+}
+
#ifndef O_DIRECT
/* Workaround for OS X */
#define O_DIRECT (0)
@@ -77,8 +92,10 @@ static inline void crc32set(void *crcp, uLong crc)
*(uint32_t *)crcp = crc;
}
-extern void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr);
+extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr);
+extern void print_page_descr(struct rrdeng_page_descr *descr);
extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
+extern int open_file_direct_io(char *path, int flags, uv_file *file);
extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
#endif /* NETDATA_RRDENGINELIB_H */ \ No newline at end of file
diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c
new file mode 100644
index 000000000..0eb9019b4
--- /dev/null
+++ b/database/engine/rrdenglocking.c
@@ -0,0 +1,233 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx)
+{
+ struct page_cache_descr *pg_cache_descr;
+
+ pg_cache_descr = mallocz(sizeof(*pg_cache_descr));
+ rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, 1);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags = 0;
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
+ pg_cache_descr->refcnt = 0;
+ pg_cache_descr->waiters = 0;
+ assert(0 == uv_cond_init(&pg_cache_descr->cond));
+ assert(0 == uv_mutex_init(&pg_cache_descr->mutex));
+
+ return pg_cache_descr;
+}
+
+void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr)
+{
+ uv_cond_destroy(&pg_cache_descr->cond);
+ uv_mutex_destroy(&pg_cache_descr->mutex);
+ freez(pg_cache_descr);
+ rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1);
+}
+
+/* also allocates page cache descriptor if missing */
+void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, old_users, new_state, ret_state;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ uint8_t we_locked;
+
+ we_locked = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(we_locked)) {
+ assert(old_state & PG_CACHE_DESCR_LOCKED);
+ new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ assert(0 == old_users);
+ continue; /* spin */
+ }
+ if (0 == old_state) {
+ /* no page cache descriptor has been allocated */
+
+ if (NULL == pg_cache_descr) {
+ pg_cache_descr = rrdeng_create_pg_cache_descr(ctx);
+ }
+ new_state = PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, 0, new_state);
+ if (0 == ret_state) {
+ we_locked = 1;
+ descr->pg_cache_descr = pg_cache_descr;
+ pg_cache_descr->descr = descr;
+ pg_cache_descr = NULL; /* make sure we don't free pg_cache_descr */
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ /* page cache descriptor is already allocated */
+ assert(old_state & PG_CACHE_DESCR_ALLOCATED);
+
+ new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT;
+ new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ /* spin */
+ }
+
+ if (pg_cache_descr) {
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ }
+ pg_cache_descr = descr->pg_cache_descr;
+ uv_mutex_lock(&pg_cache_descr->mutex);
+}
+
+void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, new_state, ret_state, old_users;
+ struct page_cache_descr *pg_cache_descr;
+ uint8_t we_locked;
+
+ uv_mutex_unlock(&descr->pg_cache_descr->mutex);
+
+ we_locked = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(we_locked)) {
+ assert(0 == old_users);
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ assert(0 == old_users);
+ continue; /* spin */
+ }
+ assert(old_state & PG_CACHE_DESCR_ALLOCATED);
+ pg_cache_descr = descr->pg_cache_descr;
+ /* caller is the only page cache descriptor user and there are no pending references on the page */
+ if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) &&
+ !pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ new_state = PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ we_locked = 1;
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ assert(old_users > 0);
+ new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT;
+ new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ break;
+ }
+ /* spin */
+ }
+
+}
+
+/*
+ * Tries to deallocate page cache descriptor. If it fails, it postpones deallocation by setting the
+ * PG_CACHE_DESCR_DESTROY flag which will be eventually cleared by a different context after doing
+ * the deallocation.
+ */
+void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ unsigned long old_state, new_state, ret_state, old_users;
+ struct page_cache_descr *pg_cache_descr;
+ uint8_t just_locked, we_freed, must_unlock;
+
+ just_locked = 0;
+ we_freed = 0;
+ must_unlock = 0;
+ while (1) { /* spin */
+ old_state = descr->pg_cache_descr_state;
+ old_users = old_state >> PG_CACHE_DESCR_SHIFT;
+
+ if (unlikely(just_locked)) {
+ assert(0 == old_users);
+
+ must_unlock = 1;
+ just_locked = 0;
+ /* Try deallocate if there are no pending references on the page */
+ if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ we_freed = 1;
+ /* success */
+ continue;
+ }
+ continue; /* spin */
+ }
+ if (unlikely(must_unlock)) {
+ assert(0 == old_users);
+
+ if (we_freed) {
+ /* success */
+ new_state = 0;
+ } else {
+ new_state = old_state | PG_CACHE_DESCR_DESTROY;
+ new_state &= ~PG_CACHE_DESCR_LOCKED;
+ }
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* unlocked */
+ return;
+ }
+ continue; /* spin */
+ }
+ if (!(old_state & PG_CACHE_DESCR_ALLOCATED)) {
+ /* don't do anything */
+ return;
+ }
+ if (old_state & PG_CACHE_DESCR_LOCKED) {
+ assert(0 == old_users);
+ continue; /* spin */
+ }
+ pg_cache_descr = descr->pg_cache_descr;
+ /* caller is the only page cache descriptor user */
+ if (0 == old_users) {
+ new_state = old_state | PG_CACHE_DESCR_LOCKED;
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ just_locked = 1;
+ /* retry */
+ continue;
+ }
+ continue; /* spin */
+ }
+ if (old_state & PG_CACHE_DESCR_DESTROY) {
+ /* don't do anything */
+ return;
+ }
+ /* plant PG_CACHE_DESCR_DESTROY so that other contexts eventually free the page cache descriptor */
+ new_state = old_state | PG_CACHE_DESCR_DESTROY;
+
+ ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
+ if (old_state == ret_state) {
+ /* success */
+ return;
+ }
+ /* spin */
+ }
+} \ No newline at end of file
diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h
new file mode 100644
index 000000000..127ddc90c
--- /dev/null
+++ b/database/engine/rrdenglocking.h
@@ -0,0 +1,17 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGLOCKING_H
+#define NETDATA_RRDENGLOCKING_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct page_cache_descr;
+
+extern struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx);
+extern void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr);
+extern void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+
+#endif /* NETDATA_RRDENGLOCKING_H */ \ No newline at end of file
diff --git a/database/rrd.c b/database/rrd.c
index 2457cac01..31ad3f07e 100644
--- a/database/rrd.c
+++ b/database/rrd.c
@@ -132,7 +132,6 @@ const char *rrdset_type_name(RRDSET_TYPE chart_type) {
}
}
-
// ----------------------------------------------------------------------------
// RRD - cache directory
@@ -146,8 +145,7 @@ char *rrdset_cache_dir(RRDHOST *host, const char *id, const char *config_section
snprintfz(n, FILENAME_MAX, "%s/%s", host->cache_dir, b);
ret = config_get(config_section, "cache directory", n);
- if(host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE ||
- host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+ if(host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
int r = mkdir(ret, 0775);
if(r != 0 && errno != EEXIST)
error("Cannot create directory '%s'", ret);
@@ -155,3 +153,4 @@ char *rrdset_cache_dir(RRDHOST *host, const char *id, const char *config_section
return ret;
}
+
diff --git a/database/rrd.h b/database/rrd.h
index 3f57b9037..5b09c2dda 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -17,7 +17,7 @@ typedef struct alarm_entry ALARM_ENTRY;
// forward declarations
struct rrddim_volatile;
#ifdef ENABLE_DBENGINE
-struct rrdeng_page_cache_descr;
+struct rrdeng_page_descr;
struct rrdengine_instance;
struct pg_cache_page_index;
#endif
@@ -246,10 +246,12 @@ union rrddim_collect_handle {
} slotted; // state the legacy code uses
#ifdef ENABLE_DBENGINE
struct rrdeng_collect_handle {
- struct rrdeng_page_cache_descr *descr, *prev_descr;
+ struct rrdeng_page_descr *descr, *prev_descr;
unsigned long page_correlation_id;
struct rrdengine_instance *ctx;
struct pg_cache_page_index *page_index;
+ // set to 1 when this dimension is not page aligned with the other dimensions in the chart
+ uint8_t unaligned_page;
} rrdeng; // state the database engine uses
#endif
};
@@ -268,7 +270,7 @@ struct rrddim_query_handle {
} slotted; // state the legacy code uses
#ifdef ENABLE_DBENGINE
struct rrdeng_query_handle {
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
struct rrdengine_instance *ctx;
struct pg_cache_page_index *page_index;
time_t now; //TODO: remove now to implement next point iteration
@@ -351,7 +353,7 @@ typedef enum rrdset_flags {
RRDSET_FLAG_UPSTREAM_EXPOSED = 1 << 8, // if set, we have sent this chart definition to netdata master (streaming)
RRDSET_FLAG_STORE_FIRST = 1 << 9, // if set, do not eliminate the first collection during interpolation
RRDSET_FLAG_HETEROGENEOUS = 1 << 10, // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers)
- RRDSET_FLAG_HOMEGENEOUS_CHECK = 1 << 11, // if set, the chart should be checked to determine if the dimensions as homogeneous
+ RRDSET_FLAG_HOMOGENEOUS_CHECK = 1 << 11, // if set, the chart should be checked to determine if the dimensions are homogeneous
RRDSET_FLAG_HIDDEN = 1 << 12, // if set, do not show this chart on the dashboard, but use it for backends
RRDSET_FLAG_SYNC_CLOCK = 1 << 13, // if set, microseconds on next data collection will be ignored (the chart will be synced to now)
RRDSET_FLAG_OBSOLETE_DIMENSIONS = 1 << 14 // this is marked by the collector/module when a chart has obsolete dimensions
@@ -431,7 +433,9 @@ struct rrdset {
char *plugin_name; // the name of the plugin that generated this
char *module_name; // the name of the plugin module that generated this
- size_t unused[6];
+ size_t unused[5];
+
+ size_t rrddim_page_alignment; // keeps metric pages in alignment when using dbengine
uint32_t hash; // a simple hash on the id, to speed up searching
// we first compare hashes, and only if the hashes are equal we do string comparisons
@@ -568,6 +572,8 @@ struct alarm_entry {
uint32_t updated_by_id;
uint32_t updates_id;
+ time_t last_repeat;
+
struct alarm_entry *next;
};
@@ -682,11 +688,16 @@ struct rrdhost {
char *health_log_filename; // the alarms event log filename
size_t health_log_entries_written; // the number of alarm events writtern to the alarms event log
FILE *health_log_fp; // the FILE pointer to the open alarms event log file
+ uint32_t health_default_warn_repeat_every; // the default value for the interval between repeating warning notifications
+ uint32_t health_default_crit_repeat_every; // the default value for the interval between repeating critical notifications
+
// all RRDCALCs are primarily allocated and linked here
// RRDCALCs may be linked to charts at any point
// (charts may or may not exist when these are loaded)
RRDCALC *alarms;
+ avl_tree_lock alarms_idx_health_log;
+ avl_tree_lock alarms_idx_name;
ALARM_LOG health_log; // alarms historical events (event log)
uint32_t health_last_processed_id; // the last processed health id from the log
@@ -723,6 +734,10 @@ struct rrdhost {
struct rrdengine_instance *rrdeng_ctx; // DB engine instance for this host
#endif
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl ssl; //Structure used to encrypt the connection
+#endif
+
struct rrdhost *next;
};
extern RRDHOST *localhost;
@@ -781,7 +796,6 @@ extern RRDHOST *rrdhost_find_or_create(
);
extern int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, char *name, char *value);
-extern struct rrdhost_system_info *rrdhost_system_info_dup(struct rrdhost_system_info *system_info);
#if defined(NETDATA_INTERNAL_CHECKS) && defined(NETDATA_VERIFY_LOCKS)
extern void __rrdhost_check_wrlock(RRDHOST *host, const char *file, const char *function, const unsigned long line);
@@ -1015,6 +1029,12 @@ extern collected_number rrddim_set(RRDSET *st, const char *id, collected_number
extern long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries);
// ----------------------------------------------------------------------------
+// Miscellaneous functions
+
+extern int alarm_compare_id(void *a, void *b);
+extern int alarm_compare_name(void *a, void *b);
+
+// ----------------------------------------------------------------------------
// RRD internal functions
#ifdef NETDATA_RRD_INTERNALS
diff --git a/database/rrdcalc.c b/database/rrdcalc.c
index 7f6a896b6..908fc2ebf 100644
--- a/database/rrdcalc.c
+++ b/database/rrdcalc.c
@@ -81,9 +81,9 @@ static void rrdsetcalc_link(RRDSET *st, RRDCALC *rc) {
if(!rc->units) rc->units = strdupz(st->units);
- {
+ if(!rrdcalc_isrepeating(rc)) {
time_t now = now_realtime_sec();
- health_alarm_log(
+ ALARM_ENTRY *ae = health_create_alarm_entry(
host,
rc->id,
rc->next_event_id++,
@@ -104,6 +104,7 @@ static void rrdsetcalc_link(RRDSET *st, RRDCALC *rc) {
0,
0
);
+ health_alarm_log(host, ae);
}
}
@@ -142,9 +143,9 @@ inline void rrdsetcalc_unlink(RRDCALC *rc) {
RRDHOST *host = st->rrdhost;
- {
+ if(!rrdcalc_isrepeating(rc)) {
time_t now = now_realtime_sec();
- health_alarm_log(
+ ALARM_ENTRY *ae = health_create_alarm_entry(
host,
rc->id,
rc->next_event_id++,
@@ -165,6 +166,7 @@ inline void rrdsetcalc_unlink(RRDCALC *rc) {
0,
0
);
+ health_alarm_log(host, ae);
}
debug(D_HEALTH, "Health unlinking alarm '%s.%s' from chart '%s' of host '%s'", rc->chart?rc->chart:"NOCHART", rc->name, st->id, host->hostname);
@@ -253,7 +255,7 @@ inline uint32_t rrdcalc_get_unique_id(RRDHOST *host, const char *chart, const ch
return host->health_log.next_alarm_id++;
}
-inline void rrdcalc_create_part2(RRDHOST *host, RRDCALC *rc) {
+inline void rrdcalc_add_to_host(RRDHOST *host, RRDCALC *rc) {
rrdhost_check_rdlock(host);
if(rc->calculation) {
@@ -301,8 +303,7 @@ inline void rrdcalc_create_part2(RRDHOST *host, RRDCALC *rc) {
}
}
-inline RRDCALC *rrdcalc_create(RRDHOST *host, RRDCALCTEMPLATE *rt, const char *chart) {
-
+inline RRDCALC *rrdcalc_create_from_template(RRDHOST *host, RRDCALCTEMPLATE *rt, const char *chart) {
debug(D_HEALTH, "Health creating dynamic alarm (from template) '%s.%s'", chart, rt->name);
if(rrdcalc_exists(host, chart, rt->name, 0, 0))
@@ -328,6 +329,10 @@ inline RRDCALC *rrdcalc_create(RRDHOST *host, RRDCALCTEMPLATE *rt, const char *c
rc->delay_max_duration = rt->delay_max_duration;
rc->delay_multiplier = rt->delay_multiplier;
+ rc->last_repeat = 0;
+ rc->warn_repeat_every = rt->warn_repeat_every;
+ rc->crit_repeat_every = rt->crit_repeat_every;
+
rc->group = rt->group;
rc->after = rt->after;
rc->before = rt->before;
@@ -356,7 +361,7 @@ inline RRDCALC *rrdcalc_create(RRDHOST *host, RRDCALCTEMPLATE *rt, const char *c
error("Health alarm '%s.%s': failed to re-parse critical expression '%s'", chart, rt->name, rt->critical->source);
}
- debug(D_HEALTH, "Health runtime added alarm '%s.%s': exec '%s', recipient '%s', green " CALCULATED_NUMBER_FORMAT_AUTO ", red " CALCULATED_NUMBER_FORMAT_AUTO ", lookup: group %d, after %d, before %d, options %u, dimensions '%s', update every %d, calculation '%s', warning '%s', critical '%s', source '%s', delay up %d, delay down %d, delay max %d, delay_multiplier %f",
+ debug(D_HEALTH, "Health runtime added alarm '%s.%s': exec '%s', recipient '%s', green " CALCULATED_NUMBER_FORMAT_AUTO ", red " CALCULATED_NUMBER_FORMAT_AUTO ", lookup: group %d, after %d, before %d, options %u, dimensions '%s', update every %d, calculation '%s', warning '%s', critical '%s', source '%s', delay up %d, delay down %d, delay max %d, delay_multiplier %f, warn_repeat_every %u, crit_repeat_every %u",
(rc->chart)?rc->chart:"NOCHART",
rc->name,
(rc->exec)?rc->exec:"DEFAULT",
@@ -376,16 +381,24 @@ inline RRDCALC *rrdcalc_create(RRDHOST *host, RRDCALCTEMPLATE *rt, const char *c
rc->delay_up_duration,
rc->delay_down_duration,
rc->delay_max_duration,
- rc->delay_multiplier
+ rc->delay_multiplier,
+ rc->warn_repeat_every,
+ rc->crit_repeat_every
);
- rrdcalc_create_part2(host, rc);
+ rrdcalc_add_to_host(host, rc);
+ RRDCALC *rdcmp = (RRDCALC *) avl_insert_lock(&(host)->alarms_idx_health_log,(avl *)rc);
+ if (rdcmp != rc) {
+ error("Cannot insert the alarm index ID %s",rc->name);
+ }
+
return rc;
}
void rrdcalc_free(RRDCALC *rc) {
if(unlikely(!rc)) return;
+
expression_free(rc->calculation);
expression_free(rc->warning);
expression_free(rc->critical);
@@ -413,7 +426,6 @@ void rrdcalc_unlink_and_free(RRDHOST *host, RRDCALC *rc) {
// unlink it from RRDHOST
if(unlikely(rc == host->alarms))
host->alarms = rc->next;
-
else {
RRDCALC *t;
for(t = host->alarms; t && t->next != rc; t = t->next) ;
@@ -425,5 +437,79 @@ void rrdcalc_unlink_and_free(RRDHOST *host, RRDCALC *rc) {
error("Cannot unlink alarm '%s.%s' from host '%s': not found", rc->chart?rc->chart:"NOCHART", rc->name, host->hostname);
}
+ if (rc) {
+ RRDCALC *rdcmp = (RRDCALC *) avl_search_lock(&(host)->alarms_idx_health_log, (avl *)rc);
+ if (rdcmp) {
+ rdcmp = (RRDCALC *) avl_remove_lock(&(host)->alarms_idx_health_log, (avl *)rc);
+ if (!rdcmp) {
+ error("Cannot remove the health alarm index from health_log");
+ }
+ }
+
+ rdcmp = (RRDCALC *) avl_search_lock(&(host)->alarms_idx_name, (avl *)rc);
+ if (rdcmp) {
+ rdcmp = (RRDCALC *) avl_remove_lock(&(host)->alarms_idx_name, (avl *)rc);
+ if (!rdcmp) {
+ error("Cannot remove the health alarm index from idx_name");
+ }
+ }
+ }
+
rrdcalc_free(rc);
}
+
+// ----------------------------------------------------------------------------
+// Alarm
+
+
+/**
+ * Alarm is repeating
+ *
+ * Is this alarm repeating ?
+ *
+ * @param host The structure that has the binary tree
+ * @param alarm_id the id of the alarm to search
+ *
+ * @return It returns 1 case it is repeating and 0 otherwise
+ */
+int alarm_isrepeating(RRDHOST *host, uint32_t alarm_id) {
+ RRDCALC findme;
+ findme.id = alarm_id;
+ RRDCALC *rc = (RRDCALC *)avl_search_lock(&host->alarms_idx_health_log, (avl *)&findme);
+ if (!rc) {
+ return 0;
+ }
+ return rrdcalc_isrepeating(rc);
+}
+
+/**
+ * Entry is repeating
+ *
+ * Check whether the id of alarm entry is yet present in the host structure
+ *
+ * @param host The structure that has the binary tree
+ * @param ae the alarm entry
+ *
+ * @return It returns 1 case it is repeating and 0 otherwise
+ */
+int alarm_entry_isrepeating(RRDHOST *host, ALARM_ENTRY *ae) {
+ return alarm_isrepeating(host, ae->alarm_id);
+}
+
+/**
+ * Max last repeat
+ *
+ * Check the maximum last_repeat for the alarms associated a host
+ *
+ * @param host The structure that has the binary tree
+ *
+ * @return It returns 1 case it is repeating and 0 otherwise
+ */
+RRDCALC *alarm_max_last_repeat(RRDHOST *host, char *alarm_name,uint32_t hash) {
+ RRDCALC findme;
+ findme.name = alarm_name;
+ findme.hash = hash;
+ RRDCALC *rc = (RRDCALC *)avl_search_lock(&host->alarms_idx_name, (avl *)&findme);
+
+ return rc;
+}
diff --git a/database/rrdcalc.h b/database/rrdcalc.h
index 4df4381ae..3400f711c 100644
--- a/database/rrdcalc.h
+++ b/database/rrdcalc.h
@@ -29,7 +29,9 @@
#define RRDCALC_FLAG_SILENCED 0x00000100
#define RRDCALC_FLAG_NO_CLEAR_NOTIFICATION 0x80000000
+
struct rrdcalc {
+ avl avl; // the index, with key the id - this has to be first!
uint32_t id; // the unique id of this alarm
uint32_t next_event_id; // the next event id that will be used for this alarm
@@ -78,8 +80,15 @@ struct rrdcalc {
// while now < delay_up_to
// ------------------------------------------------------------------------
+ // notification repeat settings
+
+ uint32_t warn_repeat_every; // interval between repeating warning notifications
+ uint32_t crit_repeat_every; // interval between repeating critical notifications
+
+ // ------------------------------------------------------------------------
// runtime information
+ RRDCALC_STATUS old_status; // the old status of the alarm
RRDCALC_STATUS status; // the current status of the alarm
calculated_number value; // the current value of the alarm
@@ -90,6 +99,7 @@ struct rrdcalc {
time_t last_updated; // the last update timestamp of the alarm
time_t next_update; // the next update timestamp of the alarm
time_t last_status_change; // the timestamp of the last time this alarm changed status
+ time_t last_repeat; // the last time the alarm got repeated
time_t db_after; // the first timestamp evaluated by the db lookup
time_t db_before; // the last timestamp evaluated by the db lookup
@@ -119,6 +129,10 @@ struct rrdcalc {
struct rrdcalc *next;
};
+extern int alarm_isrepeating(RRDHOST *host, uint32_t alarm_id);
+extern int alarm_entry_isrepeating(RRDHOST *host, ALARM_ENTRY *ae);
+extern RRDCALC *alarm_max_last_repeat(RRDHOST *host, char *alarm_name, uint32_t hash);
+
#define RRDCALC_HAS_DB_LOOKUP(rc) ((rc)->after)
extern void rrdsetcalc_link_matching(RRDSET *st);
@@ -132,7 +146,14 @@ extern void rrdcalc_unlink_and_free(RRDHOST *host, RRDCALC *rc);
extern int rrdcalc_exists(RRDHOST *host, const char *chart, const char *name, uint32_t hash_chart, uint32_t hash_name);
extern uint32_t rrdcalc_get_unique_id(RRDHOST *host, const char *chart, const char *name, uint32_t *next_event_id);
-extern RRDCALC *rrdcalc_create(RRDHOST *host, RRDCALCTEMPLATE *rt, const char *chart);
-extern void rrdcalc_create_part2(RRDHOST *host, RRDCALC *rc);
+extern RRDCALC *rrdcalc_create_from_template(RRDHOST *host, RRDCALCTEMPLATE *rt, const char *chart);
+extern void rrdcalc_add_to_host(RRDHOST *host, RRDCALC *rc);
+
+static inline int rrdcalc_isrepeating(RRDCALC *rc) {
+ if (unlikely(rc->warn_repeat_every > 0 || rc->crit_repeat_every > 0)) {
+ return 1;
+ }
+ return 0;
+}
#endif //NETDATA_RRDCALC_H
diff --git a/database/rrdcalctemplate.c b/database/rrdcalctemplate.c
index ba7e7ec94..f2b9767c6 100644
--- a/database/rrdcalctemplate.c
+++ b/database/rrdcalctemplate.c
@@ -13,7 +13,7 @@ void rrdcalctemplate_link_matching(RRDSET *st) {
for(rt = host->templates; rt ; rt = rt->next) {
if(rt->hash_context == st->hash_context && !strcmp(rt->context, st->context)
&& (!rt->family_pattern || simple_pattern_matches(rt->family_pattern, st->family))) {
- RRDCALC *rc = rrdcalc_create(host, rt, st->id);
+ RRDCALC *rc = rrdcalc_create_from_template(host, rt, st->id);
if(unlikely(!rc))
info("Health tried to create alarm from template '%s' on chart '%s' of host '%s', but it failed", rt->name, st->id, host->hostname);
diff --git a/database/rrdcalctemplate.h b/database/rrdcalctemplate.h
index b8996bc14..92bb4138e 100644
--- a/database/rrdcalctemplate.h
+++ b/database/rrdcalctemplate.h
@@ -49,6 +49,12 @@ struct rrdcalctemplate {
float delay_multiplier; // multiplier for all delays when alarms switch status
// ------------------------------------------------------------------------
+ // notification repeat settings
+
+ uint32_t warn_repeat_every; // interval between repeating warning notifications
+ uint32_t crit_repeat_every; // interval between repeating critical notifications
+
+ // ------------------------------------------------------------------------
// expressions related to the alarm
EVAL_EXPRESSION *calculation;
diff --git a/database/rrddim.c b/database/rrddim.c
index 0cf6734a6..088c80d0b 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -60,7 +60,7 @@ inline int rrddim_set_algorithm(RRDSET *st, RRDDIM *rd, RRD_ALGORITHM algorithm)
debug(D_RRD_CALLS, "Updating algorithm of dimension '%s/%s' from %s to %s", st->id, rd->name, rrd_algorithm_name(rd->algorithm), rrd_algorithm_name(algorithm));
rd->algorithm = algorithm;
rd->exposed = 0;
- rrdset_flag_set(st, RRDSET_FLAG_HOMEGENEOUS_CHECK);
+ rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
return 1;
}
@@ -72,7 +72,7 @@ inline int rrddim_set_multiplier(RRDSET *st, RRDDIM *rd, collected_number multip
debug(D_RRD_CALLS, "Updating multiplier of dimension '%s/%s' from " COLLECTED_NUMBER_FORMAT " to " COLLECTED_NUMBER_FORMAT, st->id, rd->name, rd->multiplier, multiplier);
rd->multiplier = multiplier;
rd->exposed = 0;
- rrdset_flag_set(st, RRDSET_FLAG_HOMEGENEOUS_CHECK);
+ rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
return 1;
}
@@ -84,7 +84,7 @@ inline int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor)
debug(D_RRD_CALLS, "Updating divisor of dimension '%s/%s' from " COLLECTED_NUMBER_FORMAT " to " COLLECTED_NUMBER_FORMAT, st->id, rd->name, rd->divisor, divisor);
rd->divisor = divisor;
rd->exposed = 0;
- rrdset_flag_set(st, RRDSET_FLAG_HOMEGENEOUS_CHECK);
+ rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
return 1;
}
diff --git a/database/rrdhost.c b/database/rrdhost.c
index c552c6c39..d6252d206 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -147,6 +147,10 @@ RRDHOST *rrdhost_create(const char *hostname,
host->rrdpush_sender_pipe[0] = -1;
host->rrdpush_sender_pipe[1] = -1;
host->rrdpush_sender_socket = -1;
+#ifdef ENABLE_HTTPS
+ host->ssl.conn = NULL;
+ host->ssl.flags = NETDATA_SSL_START;
+#endif
netdata_mutex_init(&host->rrdpush_sender_buffer_mutex);
netdata_rwlock_init(&host->rrdhost_rwlock);
@@ -162,7 +166,7 @@ RRDHOST *rrdhost_create(const char *hostname,
host->program_version = strdupz((program_version && *program_version)?program_version:"unknown");
host->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
- host->system_info = rrdhost_system_info_dup(system_info);
+ host->system_info = system_info;
avl_init_lock(&(host->rrdset_root_index), rrdset_compare);
avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name);
@@ -175,6 +179,10 @@ RRDHOST *rrdhost_create(const char *hostname,
if(config_get_boolean(CONFIG_SECTION_GLOBAL, "delete orphan hosts files", 1) && !is_localhost)
rrdhost_flag_set(host, RRDHOST_FLAG_DELETE_ORPHAN_HOST);
+ host->health_default_warn_repeat_every = config_get_duration(CONFIG_SECTION_HEALTH, "default repeat warning", "never");
+ host->health_default_crit_repeat_every = config_get_duration(CONFIG_SECTION_HEALTH, "default repeat critical", "never");
+ avl_init_lock(&(host->alarms_idx_health_log), alarm_compare_id);
+ avl_init_lock(&(host->alarms_idx_name), alarm_compare_name);
// ------------------------------------------------------------------------
// initialize health variables
@@ -270,12 +278,12 @@ RRDHOST *rrdhost_create(const char *hostname,
// load health configuration
if(host->health_enabled) {
- health_alarm_log_load(host);
- health_alarm_log_open(host);
-
rrdhost_wrlock(host);
health_readdir(host, health_user_config_dir(), health_stock_config_dir(), NULL);
rrdhost_unlock(host);
+
+ health_alarm_log_load(host);
+ health_alarm_log_open(host);
}
@@ -812,81 +820,103 @@ restart_after_removal:
// RRDHOST - set system info from environment variables
int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, char *name, char *value) {
+ int res = 0;
+
if(!strcmp(name, "NETDATA_SYSTEM_OS_NAME")){
+ freez(system_info->os_name);
system_info->os_name = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_OS_ID")){
+ freez(system_info->os_id);
system_info->os_id = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE")){
+ freez(system_info->os_id_like);
system_info->os_id_like = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_OS_VERSION")){
+ freez(system_info->os_version);
system_info->os_version = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID")){
+ freez(system_info->os_version_id);
system_info->os_version_id = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION")){
+ freez(system_info->os_detection);
system_info->os_detection = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_KERNEL_NAME")){
+ freez(system_info->kernel_name);
system_info->kernel_name = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_KERNEL_VERSION")){
+ freez(system_info->kernel_version);
system_info->kernel_version = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_ARCHITECTURE")){
+ freez(system_info->architecture);
system_info->architecture = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_VIRTUALIZATION")){
+ freez(system_info->virtualization);
system_info->virtualization = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_VIRT_DETECTION")){
+ freez(system_info->virt_detection);
system_info->virt_detection = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_CONTAINER")){
+ freez(system_info->container);
system_info->container = strdupz(value);
}
else if(!strcmp(name, "NETDATA_SYSTEM_CONTAINER_DETECTION")){
+ freez(system_info->container_detection);
system_info->container_detection = strdupz(value);
}
- else return 1;
+ else {
+ res = 1;
+ }
- return 0;
+ return res;
}
-struct rrdhost_system_info *rrdhost_system_info_dup(struct rrdhost_system_info *system_info) {
- struct rrdhost_system_info *ret = callocz(1, sizeof(struct rrdhost_system_info));
+/**
+ * Alarm Compare ID
+ *
+ * Callback function used with the binary trees to compare the id of RRDCALC
+ *
+ * @param a a pointer to the RRDCAL item to insert,compare or update the binary tree
+ * @param b the pointer to the binary tree.
+ *
+ * @return It returns 0 case the values are equal, 1 case a is bigger than b and -1 case a is smaller than b.
+ */
+int alarm_compare_id(void *a, void *b) {
+ register uint32_t hash1 = ((RRDCALC *)a)->id;
+ register uint32_t hash2 = ((RRDCALC *)b)->id;
+
+ if(hash1 < hash2) return -1;
+ else if(hash1 > hash2) return 1;
- if(likely(system_info)) {
- if(system_info->os_name)
- ret->os_name = strdupz(system_info->os_name);
- if(system_info->os_id)
- ret->os_id = strdupz(system_info->os_id);
- if(system_info->os_id_like)
- ret->os_id_like = strdupz(system_info->os_id_like);
- if(system_info->os_version)
- ret->os_version = strdupz(system_info->os_version);
- if(system_info->os_version_id)
- ret->os_version_id = strdupz(system_info->os_version_id);
- if(system_info->os_detection)
- ret->os_detection = strdupz(system_info->os_detection);
- if(system_info->kernel_name)
- ret->kernel_name = strdupz(system_info->kernel_name);
- if(system_info->kernel_version)
- ret->kernel_version = strdupz(system_info->kernel_version);
- if(system_info->architecture)
- ret->architecture = strdupz(system_info->architecture);
- if(system_info->virtualization)
- ret->virtualization = strdupz(system_info->virtualization);
- if(system_info->virt_detection)
- ret->virt_detection = strdupz(system_info->virt_detection);
- if(system_info->container)
- ret->container = strdupz(system_info->container);
- if(system_info->container_detection)
- ret->container_detection = strdupz(system_info->container_detection);
- }
-
- return ret;
+ return 0;
+}
+
+/**
+ * Alarm Compare NAME
+ *
+ * Callback function used with the binary trees to compare the name of RRDCALC
+ *
+ * @param a a pointer to the RRDCAL item to insert,compare or update the binary tree
+ * @param b the pointer to the binary tree.
+ *
+ * @return It returns 0 case the values are equal, 1 case a is bigger than b and -1 case a is smaller than b.
+ */
+int alarm_compare_name(void *a, void *b) {
+ RRDCALC *in1 = (RRDCALC *)a;
+ RRDCALC *in2 = (RRDCALC *)b;
+
+ if(in1->hash < in2->hash) return -1;
+ else if(in1->hash > in2->hash) return 1;
+
+ return strcmp(in1->name,in2->name);
}
diff --git a/database/rrdset.c b/database/rrdset.c
index 689591468..f8962b2fb 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -210,7 +210,7 @@ inline void rrdset_update_heterogeneous_flag(RRDSET *st) {
RRDDIM *rd;
- rrdset_flag_clear(st, RRDSET_FLAG_HOMEGENEOUS_CHECK);
+ rrdset_flag_clear(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
RRD_ALGORITHM algorithm = st->dimensions->algorithm;
collected_number multiplier = abs(st->dimensions->multiplier);
@@ -251,6 +251,7 @@ void rrdset_reset(RRDSET *st) {
st->current_entry = 0;
st->counter = 0;
st->counter_done = 0;
+ st->rrddim_page_alignment = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
@@ -258,6 +259,11 @@ void rrdset_reset(RRDSET *st) {
rd->last_collected_time.tv_usec = 0;
rd->collections_counter = 0;
// memset(rd->values, 0, rd->entries * sizeof(storage_number));
+#ifdef ENABLE_DBENGINE
+ if (RRD_MEMORY_MODE_DBENGINE == st->rrd_memory_mode) {
+ rrdeng_store_metric_flush_current_page(rd);
+ }
+#endif
}
}
@@ -505,6 +511,12 @@ RRDSET *rrdset_create_custom(
if(st) {
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+
+ if(unlikely(name))
+ rrdset_set_name(st, name);
+ else
+ rrdset_set_name(st, id);
+
return st;
}
@@ -613,7 +625,7 @@ RRDSET *rrdset_create_custom(
memset(st, 0, size);
}
else if((now - st->last_updated.tv_sec) > update_every * entries) {
- error("File %s is too old. Clearing it.", fullfilename);
+ info("File %s is too old. Clearing it.", fullfilename);
memset(st, 0, size);
}
else if(st->last_updated.tv_sec > now + update_every) {
@@ -702,6 +714,7 @@ RRDSET *rrdset_create_custom(
st->last_collected_time.tv_sec = 0;
st->last_collected_time.tv_usec = 0;
st->counter_done = 0;
+ st->rrddim_page_alignment = 0;
st->gap_when_lost_iterations_above = (int) (gap_when_lost_iterations_above + 2);
@@ -1273,6 +1286,22 @@ void rrdset_done(RRDSET *st) {
first_entry = 1;
}
+#ifdef ENABLE_DBENGINE
+ // check if we will re-write the entire page
+ if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE &&
+ dt_usec(&st->last_collected_time, &st->last_updated) > (RRDENG_BLOCK_SIZE / sizeof(storage_number)) * update_every_ut)) {
+ info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
+ rrdset_reset(st);
+ rrdset_init_last_updated_time(st);
+
+ st->usec_since_last_update = update_every_ut;
+
+ // the first entry should not be stored
+ store_this_entry = 0;
+ first_entry = 1;
+ }
+#endif
+
// these are the 3 variables that will help us in interpolation
// last_stored_ut = the last time we added a value to the storage
// now_collect_ut = the time the current value has been collected