summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2019-05-21 18:56:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2019-05-21 18:56:05 +0000
commit54deae27eed83a162ee438ef6bad4a23767757dd (patch)
treeda5333377dfacf22177375aef822a8e696f007eb /database
parentReleasing debian version 1.14.0-1. (diff)
downloadnetdata-54deae27eed83a162ee438ef6bad4a23767757dd.tar.xz
netdata-54deae27eed83a162ee438ef6bad4a23767757dd.zip
Merging upstream version 1.15.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database')
-rw-r--r--database/Makefile.am4
-rw-r--r--database/Makefile.in472
-rw-r--r--database/README.md34
-rw-r--r--database/engine/Makefile.am8
-rw-r--r--database/engine/README.md109
-rw-r--r--database/engine/datafile.c335
-rw-r--r--database/engine/datafile.h63
-rw-r--r--database/engine/journalfile.c462
-rw-r--r--database/engine/journalfile.h46
-rw-r--r--database/engine/pagecache.c792
-rw-r--r--database/engine/pagecache.h132
-rw-r--r--database/engine/rrddiskprotocol.h119
-rw-r--r--database/engine/rrdengine.c780
-rw-r--r--database/engine/rrdengine.h171
-rw-r--r--database/engine/rrdengineapi.c484
-rw-r--r--database/engine/rrdengineapi.h37
-rw-r--r--database/engine/rrdenginelib.c116
-rw-r--r--database/engine/rrdenginelib.h84
-rw-r--r--database/rrd.c9
-rw-r--r--database/rrd.h173
-rw-r--r--database/rrddim.c101
-rw-r--r--database/rrdhost.c150
-rw-r--r--database/rrdset.c49
23 files changed, 4227 insertions, 503 deletions
diff --git a/database/Makefile.am b/database/Makefile.am
index 19554bed8..2f5dbc297 100644
--- a/database/Makefile.am
+++ b/database/Makefile.am
@@ -3,6 +3,10 @@
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+SUBDIRS = \
+ engine \
+ $(NULL)
+
dist_noinst_DATA = \
README.md \
$(NULL)
diff --git a/database/Makefile.in b/database/Makefile.in
deleted file mode 100644
index f35cb3a07..000000000
--- a/database/Makefile.in
+++ /dev/null
@@ -1,472 +0,0 @@
-# Makefile.in generated by automake 1.14.1 from Makefile.am.
-# @configure_input@
-
-# Copyright (C) 1994-2013 Free Software Foundation, Inc.
-
-# This Makefile.in is free software; the Free Software Foundation
-# gives unlimited permission to copy and/or distribute it,
-# with or without modifications, as long as this notice is preserved.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
-# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
-# PARTICULAR PURPOSE.
-
-@SET_MAKE@
-
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-VPATH = @srcdir@
-am__is_gnu_make = test -n '$(MAKEFILE_LIST)' && test -n '$(MAKELEVEL)'
-am__make_running_with_option = \
- case $${target_option-} in \
- ?) ;; \
- *) echo "am__make_running_with_option: internal error: invalid" \
- "target option '$${target_option-}' specified" >&2; \
- exit 1;; \
- esac; \
- has_opt=no; \
- sane_makeflags=$$MAKEFLAGS; \
- if $(am__is_gnu_make); then \
- sane_makeflags=$$MFLAGS; \
- else \
- case $$MAKEFLAGS in \
- *\\[\ \ ]*) \
- bs=\\; \
- sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \
- | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \
- esac; \
- fi; \
- skip_next=no; \
- strip_trailopt () \
- { \
- flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \
- }; \
- for flg in $$sane_makeflags; do \
- test $$skip_next = yes && { skip_next=no; continue; }; \
- case $$flg in \
- *=*|--*) continue;; \
- -*I) strip_trailopt 'I'; skip_next=yes;; \
- -*I?*) strip_trailopt 'I';; \
- -*O) strip_trailopt 'O'; skip_next=yes;; \
- -*O?*) strip_trailopt 'O';; \
- -*l) strip_trailopt 'l'; skip_next=yes;; \
- -*l?*) strip_trailopt 'l';; \
- -[dEDm]) skip_next=yes;; \
- -[JT]) skip_next=yes;; \
- esac; \
- case $$flg in \
- *$$target_option*) has_opt=yes; break;; \
- esac; \
- done; \
- test $$has_opt = yes
-am__make_dryrun = (target_option=n; $(am__make_running_with_option))
-am__make_keepgoing = (target_option=k; $(am__make_running_with_option))
-pkgdatadir = $(datadir)/@PACKAGE@
-pkgincludedir = $(includedir)/@PACKAGE@
-pkglibdir = $(libdir)/@PACKAGE@
-pkglibexecdir = $(libexecdir)/@PACKAGE@
-am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
-install_sh_DATA = $(install_sh) -c -m 644
-install_sh_PROGRAM = $(install_sh) -c
-install_sh_SCRIPT = $(install_sh) -c
-INSTALL_HEADER = $(INSTALL_DATA)
-transform = $(program_transform_name)
-NORMAL_INSTALL = :
-PRE_INSTALL = :
-POST_INSTALL = :
-NORMAL_UNINSTALL = :
-PRE_UNINSTALL = :
-POST_UNINSTALL = :
-build_triplet = @build@
-host_triplet = @host@
-subdir = database
-DIST_COMMON = $(srcdir)/Makefile.in $(srcdir)/Makefile.am \
- $(dist_noinst_DATA)
-ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
-am__aclocal_m4_deps = $(top_srcdir)/build/m4/ax_c___atomic.m4 \
- $(top_srcdir)/build/m4/ax_c__generic.m4 \
- $(top_srcdir)/build/m4/ax_c_lto.m4 \
- $(top_srcdir)/build/m4/ax_c_mallinfo.m4 \
- $(top_srcdir)/build/m4/ax_c_mallopt.m4 \
- $(top_srcdir)/build/m4/ax_check_compile_flag.m4 \
- $(top_srcdir)/build/m4/ax_gcc_func_attribute.m4 \
- $(top_srcdir)/build/m4/ax_pthread.m4 \
- $(top_srcdir)/build/m4/jemalloc.m4 \
- $(top_srcdir)/build/m4/tcmalloc.m4 $(top_srcdir)/configure.ac
-am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
- $(ACLOCAL_M4)
-mkinstalldirs = $(install_sh) -d
-CONFIG_HEADER = $(top_builddir)/config.h
-CONFIG_CLEAN_FILES =
-CONFIG_CLEAN_VPATH_FILES =
-AM_V_P = $(am__v_P_@AM_V@)
-am__v_P_ = $(am__v_P_@AM_DEFAULT_V@)
-am__v_P_0 = false
-am__v_P_1 = :
-AM_V_GEN = $(am__v_GEN_@AM_V@)
-am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@)
-am__v_GEN_0 = @echo " GEN " $@;
-am__v_GEN_1 =
-AM_V_at = $(am__v_at_@AM_V@)
-am__v_at_ = $(am__v_at_@AM_DEFAULT_V@)
-am__v_at_0 = @
-am__v_at_1 =
-SOURCES =
-DIST_SOURCES =
-am__can_run_installinfo = \
- case $$AM_UPDATE_INFO_DIR in \
- n|no|NO) false;; \
- *) (install-info --version) >/dev/null 2>&1;; \
- esac
-DATA = $(dist_noinst_DATA)
-am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP)
-DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
-ACLOCAL = @ACLOCAL@
-AMTAR = @AMTAR@
-AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@
-AUTOCONF = @AUTOCONF@
-AUTOHEADER = @AUTOHEADER@
-AUTOMAKE = @AUTOMAKE@
-AWK = @AWK@
-CC = @CC@
-CCDEPMODE = @CCDEPMODE@
-CFLAGS = @CFLAGS@
-CPP = @CPP@
-CPPFLAGS = @CPPFLAGS@
-CUPSCONFIG = @CUPSCONFIG@
-CYGPATH_W = @CYGPATH_W@
-DEFS = @DEFS@
-DEPDIR = @DEPDIR@
-ECHO_C = @ECHO_C@
-ECHO_N = @ECHO_N@
-ECHO_T = @ECHO_T@
-EGREP = @EGREP@
-EXEEXT = @EXEEXT@
-GREP = @GREP@
-INSTALL = @INSTALL@
-INSTALL_DATA = @INSTALL_DATA@
-INSTALL_PROGRAM = @INSTALL_PROGRAM@
-INSTALL_SCRIPT = @INSTALL_SCRIPT@
-INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
-IPMIMONITORING_CFLAGS = @IPMIMONITORING_CFLAGS@
-IPMIMONITORING_LIBS = @IPMIMONITORING_LIBS@
-LDFLAGS = @LDFLAGS@
-LIBCAP_CFLAGS = @LIBCAP_CFLAGS@
-LIBCAP_LIBS = @LIBCAP_LIBS@
-LIBMNL_CFLAGS = @LIBMNL_CFLAGS@
-LIBMNL_LIBS = @LIBMNL_LIBS@
-LIBOBJS = @LIBOBJS@
-LIBS = @LIBS@
-LTLIBOBJS = @LTLIBOBJS@
-MAINT = @MAINT@
-MAKEINFO = @MAKEINFO@
-MATH_CFLAGS = @MATH_CFLAGS@
-MATH_LIBS = @MATH_LIBS@
-MKDIR_P = @MKDIR_P@
-NFACCT_CFLAGS = @NFACCT_CFLAGS@
-NFACCT_LIBS = @NFACCT_LIBS@
-OBJEXT = @OBJEXT@
-OPTIONAL_CUPS_CFLAGS = @OPTIONAL_CUPS_CFLAGS@
-OPTIONAL_CUPS_LIBS = @OPTIONAL_CUPS_LIBS@
-OPTIONAL_IPMIMONITORING_CFLAGS = @OPTIONAL_IPMIMONITORING_CFLAGS@
-OPTIONAL_IPMIMONITORING_LIBS = @OPTIONAL_IPMIMONITORING_LIBS@
-OPTIONAL_LIBCAP_CFLAGS = @OPTIONAL_LIBCAP_CFLAGS@
-OPTIONAL_LIBCAP_LIBS = @OPTIONAL_LIBCAP_LIBS@
-OPTIONAL_MATH_CLFAGS = @OPTIONAL_MATH_CLFAGS@
-OPTIONAL_MATH_LIBS = @OPTIONAL_MATH_LIBS@
-OPTIONAL_NFACCT_CLFAGS = @OPTIONAL_NFACCT_CLFAGS@
-OPTIONAL_NFACCT_LIBS = @OPTIONAL_NFACCT_LIBS@
-OPTIONAL_UUID_CLFAGS = @OPTIONAL_UUID_CLFAGS@
-OPTIONAL_UUID_LIBS = @OPTIONAL_UUID_LIBS@
-OPTIONAL_XENSTAT_CFLAGS = @OPTIONAL_XENSTAT_CFLAGS@
-OPTIONAL_XENSTAT_LIBS = @OPTIONAL_XENSTAT_LIBS@
-OPTIONAL_ZLIB_CLFAGS = @OPTIONAL_ZLIB_CLFAGS@
-OPTIONAL_ZLIB_LIBS = @OPTIONAL_ZLIB_LIBS@
-PACKAGE = @PACKAGE@
-PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
-PACKAGE_NAME = @PACKAGE_NAME@
-PACKAGE_RPM_VERSION = @PACKAGE_RPM_VERSION@
-PACKAGE_STRING = @PACKAGE_STRING@
-PACKAGE_TARNAME = @PACKAGE_TARNAME@
-PACKAGE_URL = @PACKAGE_URL@
-PACKAGE_VERSION = @PACKAGE_VERSION@
-PATH_SEPARATOR = @PATH_SEPARATOR@
-PKG_CONFIG = @PKG_CONFIG@
-PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@
-PKG_CONFIG_PATH = @PKG_CONFIG_PATH@
-PTHREAD_CC = @PTHREAD_CC@
-PTHREAD_CFLAGS = @PTHREAD_CFLAGS@
-PTHREAD_LIBS = @PTHREAD_LIBS@
-SET_MAKE = @SET_MAKE@
-SHELL = @SHELL@
-SSE_CANDIDATE = @SSE_CANDIDATE@
-STRIP = @STRIP@
-UUID_CFLAGS = @UUID_CFLAGS@
-UUID_LIBS = @UUID_LIBS@
-VERSION = @VERSION@
-XENLIGHT_CFLAGS = @XENLIGHT_CFLAGS@
-XENLIGHT_LIBS = @XENLIGHT_LIBS@
-YAJL_CFLAGS = @YAJL_CFLAGS@
-YAJL_LIBS = @YAJL_LIBS@
-ZLIB_CFLAGS = @ZLIB_CFLAGS@
-ZLIB_LIBS = @ZLIB_LIBS@
-abs_builddir = @abs_builddir@
-abs_srcdir = @abs_srcdir@
-abs_top_builddir = @abs_top_builddir@
-abs_top_srcdir = @abs_top_srcdir@
-ac_ct_CC = @ac_ct_CC@
-am__include = @am__include@
-am__leading_dot = @am__leading_dot@
-am__quote = @am__quote@
-am__tar = @am__tar@
-am__untar = @am__untar@
-ax_pthread_config = @ax_pthread_config@
-bindir = @bindir@
-build = @build@
-build_alias = @build_alias@
-build_cpu = @build_cpu@
-build_os = @build_os@
-build_target = @build_target@
-build_vendor = @build_vendor@
-builddir = @builddir@
-cachedir = @cachedir@
-chartsdir = @chartsdir@
-configdir = @configdir@
-datadir = @datadir@
-datarootdir = @datarootdir@
-docdir = @docdir@
-dvidir = @dvidir@
-exec_prefix = @exec_prefix@
-has_jemalloc = @has_jemalloc@
-has_tcmalloc = @has_tcmalloc@
-host = @host@
-host_alias = @host_alias@
-host_cpu = @host_cpu@
-host_os = @host_os@
-host_vendor = @host_vendor@
-htmldir = @htmldir@
-includedir = @includedir@
-infodir = @infodir@
-install_sh = @install_sh@
-libconfigdir = @libconfigdir@
-libdir = @libdir@
-libexecdir = @libexecdir@
-localedir = @localedir@
-localstatedir = @localstatedir@
-logdir = @logdir@
-mandir = @mandir@
-mkdir_p = @mkdir_p@
-nodedir = @nodedir@
-oldincludedir = @oldincludedir@
-pdfdir = @pdfdir@
-pluginsdir = @pluginsdir@
-prefix = @prefix@
-program_transform_name = @program_transform_name@
-psdir = @psdir@
-pythondir = @pythondir@
-registrydir = @registrydir@
-sbindir = @sbindir@
-sharedstatedir = @sharedstatedir@
-srcdir = @srcdir@
-sysconfdir = @sysconfdir@
-target_alias = @target_alias@
-top_build_prefix = @top_build_prefix@
-top_builddir = @top_builddir@
-top_srcdir = @top_srcdir@
-varlibdir = @varlibdir@
-webdir = @webdir@
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-dist_noinst_DATA = \
- README.md \
- $(NULL)
-
-all: all-am
-
-.SUFFIXES:
-$(srcdir)/Makefile.in: @MAINTAINER_MODE_TRUE@ $(srcdir)/Makefile.am $(am__configure_deps)
- @for dep in $?; do \
- case '$(am__configure_deps)' in \
- *$$dep*) \
- ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \
- && { if test -f $@; then exit 0; else break; fi; }; \
- exit 1;; \
- esac; \
- done; \
- echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu database/Makefile'; \
- $(am__cd) $(top_srcdir) && \
- $(AUTOMAKE) --gnu database/Makefile
-.PRECIOUS: Makefile
-Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
- @case '$?' in \
- *config.status*) \
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
- *) \
- echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \
- cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \
- esac;
-
-$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-
-$(top_srcdir)/configure: @MAINTAINER_MODE_TRUE@ $(am__configure_deps)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-$(ACLOCAL_M4): @MAINTAINER_MODE_TRUE@ $(am__aclocal_m4_deps)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-$(am__aclocal_m4_deps):
-tags TAGS:
-
-ctags CTAGS:
-
-cscope cscopelist:
-
-
-distdir: $(DISTFILES)
- @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
- topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
- list='$(DISTFILES)'; \
- dist_files=`for file in $$list; do echo $$file; done | \
- sed -e "s|^$$srcdirstrip/||;t" \
- -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \
- case $$dist_files in \
- */*) $(MKDIR_P) `echo "$$dist_files" | \
- sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \
- sort -u` ;; \
- esac; \
- for file in $$dist_files; do \
- if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
- if test -d $$d/$$file; then \
- dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \
- if test -d "$(distdir)/$$file"; then \
- find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
- fi; \
- if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
- cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \
- find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
- fi; \
- cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \
- else \
- test -f "$(distdir)/$$file" \
- || cp -p $$d/$$file "$(distdir)/$$file" \
- || exit 1; \
- fi; \
- done
-check-am: all-am
-check: check-am
-all-am: Makefile $(DATA)
-installdirs:
-install: install-am
-install-exec: install-exec-am
-install-data: install-data-am
-uninstall: uninstall-am
-
-install-am: all-am
- @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
-
-installcheck: installcheck-am
-install-strip:
- if test -z '$(STRIP)'; then \
- $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
- install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
- install; \
- else \
- $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
- install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
- "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \
- fi
-mostlyclean-generic:
-
-clean-generic:
-
-distclean-generic:
- -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
- -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES)
-
-maintainer-clean-generic:
- @echo "This command is intended for maintainers to use"
- @echo "it deletes files that may require special tools to rebuild."
- -test -z "$(MAINTAINERCLEANFILES)" || rm -f $(MAINTAINERCLEANFILES)
-clean: clean-am
-
-clean-am: clean-generic mostlyclean-am
-
-distclean: distclean-am
- -rm -f Makefile
-distclean-am: clean-am distclean-generic
-
-dvi: dvi-am
-
-dvi-am:
-
-html: html-am
-
-html-am:
-
-info: info-am
-
-info-am:
-
-install-data-am:
-
-install-dvi: install-dvi-am
-
-install-dvi-am:
-
-install-exec-am:
-
-install-html: install-html-am
-
-install-html-am:
-
-install-info: install-info-am
-
-install-info-am:
-
-install-man:
-
-install-pdf: install-pdf-am
-
-install-pdf-am:
-
-install-ps: install-ps-am
-
-install-ps-am:
-
-installcheck-am:
-
-maintainer-clean: maintainer-clean-am
- -rm -f Makefile
-maintainer-clean-am: distclean-am maintainer-clean-generic
-
-mostlyclean: mostlyclean-am
-
-mostlyclean-am: mostlyclean-generic
-
-pdf: pdf-am
-
-pdf-am:
-
-ps: ps-am
-
-ps-am:
-
-uninstall-am:
-
-.MAKE: install-am install-strip
-
-.PHONY: all all-am check check-am clean clean-generic cscopelist-am \
- ctags-am distclean distclean-generic distdir dvi dvi-am html \
- html-am info info-am install install-am install-data \
- install-data-am install-dvi install-dvi-am install-exec \
- install-exec-am install-html install-html-am install-info \
- install-info-am install-man install-pdf install-pdf-am \
- install-ps install-ps-am install-strip installcheck \
- installcheck-am installdirs maintainer-clean \
- maintainer-clean-generic mostlyclean mostlyclean-generic pdf \
- pdf-am ps ps-am tags-am uninstall uninstall-am
-
-
-# Tell versions [3.59,3.63) of GNU make to not export all variables.
-# Otherwise a system limit (for SysV at least) may be exceeded.
-.NOEXPORT:
diff --git a/database/README.md b/database/README.md
index aedf4d520..dc40a3e40 100644
--- a/database/README.md
+++ b/database/README.md
@@ -17,12 +17,13 @@ to 1 second. You will have just one hour of data.
For a day of data and 1.000 dimensions, you will need: 86.400 seconds * 4 bytes * 1.000
dimensions = 345MB of RAM.
-Currently the only option you have to lower this number is to use
-**[Memory Deduplication - Kernel Same Page Merging - KSM](#ksm)**.
+One option you have to lower this number is to use
+**[Memory Deduplication - Kernel Same Page Merging - KSM](#ksm)**. Another possibility is to
+use the **[Database Engine](engine/)**.
## Memory modes
-Currently netdata supports 5 memory modes:
+Currently netdata supports 6 memory modes:
1. `ram`, data are purely in memory. Data are never saved on disk. This mode uses `mmap()` and
supports [KSM](#ksm).
@@ -42,6 +43,12 @@ Currently netdata supports 5 memory modes:
5. `alloc`, like `ram` but it uses `calloc()` and does not support [KSM](#ksm). This mode is the
fallback for all others except `none`.
+6. `dbengine`, data are in database files. The [Database Engine](engine/) works like a traditional
+ database. There is some amount of RAM dedicated to data caching and indexing and the rest of
+ the data reside compressed on disk. The number of history entries is not fixed in this case,
+ but depends on the configured disk space and the effective compression ratio of the data stored.
+ For more details see [here](engine/).
+
You can select the memory mode by editing netdata.conf and setting:
```
@@ -80,7 +87,7 @@ server that will maintain the entire database for all nodes, and will also run h
for all nodes.
For this central netdata, memory size can be a problem. Fortunately, netdata supports several
-memory modes. What is interesting for this setup is `memory mode = map`.
+memory modes. One interesting option for this setup is `memory mode = map`.
In this mode, the database of netdata is stored in memory mapped files. netdata continues to read
and write the database in memory, but the kernel automatically loads and saves memory pages from/to
@@ -88,7 +95,7 @@ disk.
**We suggest _not_ to use this mode on nodes that run other applications.** There will always be
dirty memory to be synced and this syncing process may influence the way other applications work.
-This mode however is ideal when we need a central netdata server that would normally need huge
+This mode however is useful when we need a central netdata server that would normally need huge
amounts of memory. Using memory mode `map` we can overcome all memory restrictions.
There are a few kernel options that provide finer control on the way this syncing works. But before
@@ -155,9 +162,24 @@ vm.dirty_ratio = 90
vm.dirty_writeback_centisecs = 0
```
+There is another memory mode to help overcome the memory size problem. What is most interesting
+for this setup is `memory mode = dbengine`.
+
+In this mode, the database of netdata is stored in database files. The [Database Engine](engine/)
+works like a traditional database. There is some amount of RAM dedicated to data caching and
+indexing and the rest of the data reside compressed on disk. The number of history entries is not
+fixed in this case, but depends on the configured disk space and the effective compression ratio
+of the data stored.
+
+We suggest to use **this** mode on nodes that also run other applications. The Database Engine uses
+direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O traffic
+so as to create the minimum possible interference with other applications. Using memory mode
+`dbengine` we can overcome most memory restrictions. For more details see [here](engine/).
+
## KSM
-Netdata offers all its round robin database to kernel for deduplication.
+Netdata offers all its round robin database to kernel for deduplication
+(except for `memory mode = dbengine`).
In the past KSM has been criticized for consuming a lot of CPU resources.
Although this is true when KSM is used for deduplicating certain applications, it is not true with
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am
new file mode 100644
index 000000000..19554bed8
--- /dev/null
+++ b/database/engine/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/database/engine/README.md b/database/engine/README.md
new file mode 100644
index 000000000..28a2528cb
--- /dev/null
+++ b/database/engine/README.md
@@ -0,0 +1,109 @@
+# Database engine
+
+The Database Engine works like a traditional
+database. There is some amount of RAM dedicated to data caching and indexing and the rest of
+the data reside compressed on disk. The number of history entries is not fixed in this case,
+but depends on the configured disk space and the effective compression ratio of the data stored.
+
+## Files
+
+With the DB engine memory mode the metric data are stored in database files. These files are
+organized in pairs, the datafiles and their corresponding journalfiles, e.g.:
+
+```
+datafile-1-0000000001.ndf
+journalfile-1-0000000001.njf
+datafile-1-0000000002.ndf
+journalfile-1-0000000002.njf
+datafile-1-0000000003.ndf
+journalfile-1-0000000003.njf
+...
+```
+
+They are located under their host's cache directory in the directory `./dbengine`
+(e.g. for localhost the default location is `/var/cache/netdata/dbengine/*`). The higher
+numbered filenames contain more recent metric data. The user can safely delete some pairs
+of files when netdata is stopped to manually free up some space.
+
+*Users should* **back up** *their `./dbengine` folders if they consider this data to be important.*
+
+## Configuration
+
+There is one DB engine instance per netdata host/node. That is, there is one `./dbengine` folder
+per node, and all charts of `dbengine` memory mode in such a host share the same storage space
+and DB engine instance memory state. You can select the memory mode for localhost by editing
+netdata.conf and setting:
+
+```
+[global]
+ memory mode = dbengine
+```
+
+For setting the memory mode for the rest of the nodes you should look at
+[streaming](../../streaming/).
+
+The `history` configuration option is meaningless for `memory mode = dbengine` and is ignored
+for any metrics being stored in the DB engine.
+
+All DB engine instances, for localhost and all other streaming recipient nodes inherit their
+configuration from `netdata.conf`:
+
+```
+[global]
+ page cache size = 32
+ dbengine disk space = 256
+```
+
+The above values are the default and minimum values for Page Cache size and DB engine disk space
+quota. Both numbers are in **MiB**. All DB engine instances will allocate the configured resources
+separately.
+
+The `page cache size` option determines the amount of RAM in **MiB** that is dedicated to caching
+netdata metric values themselves.
+
+The `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated
+to storing netdata metric values and all related metadata describing them.
+
+## Operation
+
+The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets
+its own page to store consecutive values generated from the data collectors. Those pages comprise
+the **Page Cache**.
+
+When those pages fill up they are slowly compressed and flushed to disk.
+It can take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected
+every 1 second, to fill a page. Pages can be cut short when we stop netdata or the DB engine
+instance so as to not lose the data. When we query the DB engine for data we trigger disk read
+I/O requests that fill the Page Cache with the requested pages and potentially evict cold
+(not recently used) pages.
+
+When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by
+automatically deleting the oldest datafile and journalfile pair. Any corresponding pages residing
+in the Page Cache will also be invalidated and removed. The DB engine logic will try to maintain
+between 10 and 20 file pairs at any point in time.
+
+The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not
+generate excessive I/O traffic so as to create the minimum possible interference with other
+applications.
+
+## Memory requirements
+
+Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that
+is much larger than the available memory.
+
+There are explicit memory requirements **per** DB engine **instance**, meaning **per** netdata
+**node** (e.g. localhost and streaming recipient nodes):
+
+- `page cache size` must be at least `#dimensions-being-collected x 4096 x 2` bytes.
+
+- an additional `#pages-on-disk x 4096 x 0.06` bytes of RAM are allocated for metadata.
+
+ - roughly speaking this is 6% of the uncompressed disk space taken by the DB files.
+
+ - for very highly compressible data (compression ratio > 90%) this RAM overhead
+ is comparable to the disk space footprint.
+
+An important observation is that RAM usage depends on both the `page cache size` and the
+`dbengine disk space` options.
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
new file mode 100644
index 000000000..2d17d05e4
--- /dev/null
+++ b/database/engine/datafile.c
@@ -0,0 +1,335 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+void df_extent_insert(struct extent_info *extent)
+{
+ struct rrdengine_datafile *datafile = extent->datafile;
+
+ if (likely(NULL != datafile->extents.last)) {
+ datafile->extents.last->next = extent;
+ }
+ if (unlikely(NULL == datafile->extents.first)) {
+ datafile->extents.first = extent;
+ }
+ datafile->extents.last = extent;
+}
+
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ if (likely(NULL != ctx->datafiles.last)) {
+ ctx->datafiles.last->next = datafile;
+ }
+ if (unlikely(NULL == ctx->datafiles.first)) {
+ ctx->datafiles.first = datafile;
+ }
+ ctx->datafiles.last = datafile;
+}
+
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_datafile *next;
+
+ next = datafile->next;
+ assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
+ ctx->datafiles.first = next;
+}
+
+
+static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
+ unsigned tier, unsigned fileno)
+{
+ assert(tier == 1);
+ datafile->tier = tier;
+ datafile->fileno = fileno;
+ datafile->file = (uv_file)0;
+ datafile->pos = 0;
+ datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
+ datafile->journalfile = NULL;
+ datafile->next = NULL;
+ datafile->ctx = ctx;
+}
+
+static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+int destroy_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret, fd;
+ char path[1024];
+
+ ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_close: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_unlink(NULL, &req, path, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return 0;
+}
+
+int create_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ char path[1024];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
+ S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
+ superblock->tier = 1;
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_write: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+ fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+
+ datafile->file = file;
+ datafile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+ ++ctx->stats.datafile_creations;
+
+ return 0;
+}
+
+static int check_data_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
+ superblock->tier != 1) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static int load_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ uint64_t file_size;
+ char path[1024];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ /* if (UV_ENOENT != fd) */
+ error("uv_fs_fsopen: %s", uv_strerror(fd));
+ uv_fs_req_cleanup(&req);
+ return fd;
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ info("Initializing data file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_CEILING(file_size);
+
+ ret = check_data_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
+ ++ctx->stats.io_read_requests;
+
+ datafile->file = file;
+ datafile->pos = file_size;
+
+ info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ (void) uv_fs_close(NULL, &req, file, NULL);
+ uv_fs_req_cleanup(&req);
+ return ret;
+}
+
+static int scan_data_files_cmp(const void *a, const void *b)
+{
+ struct rrdengine_datafile *file1, *file2;
+ char path1[1024], path2[1024];
+
+ file1 = *(struct rrdengine_datafile **)a;
+ file2 = *(struct rrdengine_datafile **)b;
+ generate_datafilepath(file1, path1, sizeof(path1));
+ generate_datafilepath(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of datafiles that were loaded */
+static int scan_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+ unsigned tier, no, matched_files, i,failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct rrdengine_datafile **datafiles, *datafile;
+ struct rrdengine_journalfile *journalfile;
+
+ ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
+ assert(ret >= 0);
+ assert(req.result >= 0);
+ info("Found %d files in path %s", ret, ctx->dbfiles_path);
+
+ datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s\"", dent.name);
+ ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
+ if (2 == ret) {
+ info("Matched file \"%s\"", dent.name);
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, no);
+ datafiles[matched_files++] = datafile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ datafile = datafiles[i];
+ ret = load_data_file(datafile);
+ if (0 != ret) {
+ free(datafile);
+ ++failed_to_load;
+ continue;
+ }
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = load_journal_file(ctx, journalfile, datafile);
+ if (0 != ret) {
+ free(datafile);
+ free(journalfile);
+ ++failed_to_load;
+ continue;
+ }
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+ }
+ if (failed_to_load) {
+ error("%u files failed to load.", failed_to_load);
+ }
+ free(datafiles);
+
+ return matched_files - failed_to_load;
+}
+
+/* Creates a datafile and a journalfile pair */
+void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+{
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ int ret;
+
+ info("Creating new data and journal files.");
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, fileno);
+ ret = create_data_file(datafile);
+ assert(!ret);
+
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = create_journal_file(journalfile, datafile);
+ assert(!ret);
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+}
+
+/* Page cache must already be initialized. */
+int init_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+
+ ret = scan_data_files(ctx);
+ if (0 == ret) {
+ info("Data files not found, creating.");
+ create_new_datafile_pair(ctx, 1, 1);
+ }
+ return 0;
+} \ No newline at end of file
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
new file mode 100644
index 000000000..c5c8f31f3
--- /dev/null
+++ b/database/engine/datafile.h
@@ -0,0 +1,63 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_DATAFILE_H
+#define NETDATA_DATAFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+struct rrdengine_instance;
+
+#define DATAFILE_PREFIX "datafile-"
+#define DATAFILE_EXTENSION ".ndf"
+
+#define MAX_DATAFILE_SIZE (1073741824LU)
+#define MIN_DATAFILE_SIZE (16777216LU)
+#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
+#define TARGET_DATAFILES (20)
+
+#define DATAFILE_IDEAL_IO_SIZE (1048576U)
+
+struct extent_info {
+ uint64_t offset;
+ uint32_t size;
+ uint8_t number_of_pages;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *next;
+ struct rrdeng_page_cache_descr *pages[];
+};
+
+struct rrdengine_df_extents {
+ /* the extent list is sorted based on disk offset */
+ struct extent_info *first;
+ struct extent_info *last;
+};
+
+/* only one event loop is supported for now */
+struct rrdengine_datafile {
+ unsigned tier;
+ unsigned fileno;
+ uv_file file;
+ uint64_t pos;
+ struct rrdengine_instance *ctx;
+ struct rrdengine_df_extents extents;
+ struct rrdengine_journalfile *journalfile;
+ struct rrdengine_datafile *next;
+};
+
+struct rrdengine_datafile_list {
+ struct rrdengine_datafile *first; /* oldest */
+ struct rrdengine_datafile *last; /* newest */
+};
+
+extern void df_extent_insert(struct extent_info *extent);
+extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern int destroy_data_file(struct rrdengine_datafile *datafile);
+extern int create_data_file(struct rrdengine_datafile *datafile);
+extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+extern int init_data_files(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
new file mode 100644
index 000000000..44d8461db
--- /dev/null
+++ b/database/engine/journalfile.c
@@ -0,0 +1,462 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+static void flush_transaction_buffer_cb(uv_fs_t* req)
+{
+ struct generic_io_descriptor *io_descr;
+
+ debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ if (req->result < 0) {
+ fatal("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ }
+ io_descr = req->data;
+
+ uv_fs_req_cleanup(req);
+ free(io_descr->buf);
+ free(io_descr);
+}
+
+/* Careful to always call this before creating a new journal file */
+void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ struct generic_io_descriptor *io_descr;
+ unsigned pos, size;
+ struct rrdengine_journalfile *journalfile;
+
+ if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) {
+ return;
+ }
+ /* care with outstanding transactions when switching journal files */
+ journalfile = ctx->datafiles.last->journalfile;
+
+ io_descr = mallocz(sizeof(*io_descr));
+ pos = ctx->commit_log.buf_pos;
+ size = ctx->commit_log.buf_size;
+ if (pos < size) {
+ /* simulate an empty transaction to skip the rest of the block */
+ *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING;
+ }
+ io_descr->buf = ctx->commit_log.buf;
+ io_descr->bytes = size;
+ io_descr->pos = journalfile->pos;
+ io_descr->req.data = io_descr;
+ io_descr->completion = NULL;
+
+ io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
+ ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
+ journalfile->pos, flush_transaction_buffer_cb);
+ assert (-1 != ret);
+ journalfile->pos += RRDENG_BLOCK_SIZE;
+ ctx->disk_space += RRDENG_BLOCK_SIZE;
+ ctx->commit_log.buf = NULL;
+ ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE;
+ ++ctx->stats.io_write_requests;
+}
+
+void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ unsigned buf_pos, buf_size;
+
+ assert(size);
+ if (ctx->commit_log.buf) {
+ unsigned remaining;
+
+ buf_pos = ctx->commit_log.buf_pos;
+ buf_size = ctx->commit_log.buf_size;
+ remaining = buf_size - buf_pos;
+ if (size > remaining) {
+ /* we need a new buffer */
+ wal_flush_transaction_buffer(wc);
+ }
+ }
+ if (NULL == ctx->commit_log.buf) {
+ buf_size = ALIGN_BYTES_CEILING(size);
+ ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size);
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ buf_pos = ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.buf_size = buf_size;
+ }
+ ctx->commit_log.buf_pos += size;
+
+ return ctx->commit_log.buf + buf_pos;
+}
+
+static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ journalfile->file = (uv_file)0;
+ journalfile->pos = 0;
+ journalfile->datafile = datafile;
+}
+
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret, fd;
+ char path[1024];
+
+ ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_close: %s", uv_strerror(ret));
+ exit(ret);
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = uv_fs_unlink(NULL, &req, path, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return 0;
+}
+
+int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ char path[1024];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
+ S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_write: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+ fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+
+ journalfile->file = file;
+ journalfile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+ ++ctx->stats.journalfile_creations;
+
+ return 0;
+}
+
+static int check_journal_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, unsigned max_size)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned i, count, payload_length, descr_size, valid_pages;
+ struct rrdeng_page_cache_descr *descr;
+ struct extent_info *extent;
+ /* persistent structures */
+ struct rrdeng_jf_store_data *jf_metric_data;
+
+ jf_metric_data = buf;
+ count = jf_metric_data->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ if (payload_length > max_size) {
+ error("Corrupted transaction payload.");
+ return;
+ }
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ extent->offset = jf_metric_data->extent_offset;
+ extent->size = jf_metric_data->extent_size;
+ extent->number_of_pages = count;
+ extent->datafile = journalfile->datafile;
+ extent->next = NULL;
+
+ for (i = 0, valid_pages = 0 ; i < count ; ++i) {
+ uuid_t *temp_id;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+
+ if (PAGE_METRICS != jf_metric_data->descr[i].type) {
+ error("Unknown page type encountered.");
+ continue;
+ }
+ ++valid_pages;
+ temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ /* First time we see the UUID */
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
+ assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(temp_id);
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+
+ descr = pg_cache_create_descr();
+ descr->page_length = jf_metric_data->descr[i].page_length;
+ descr->start_time = jf_metric_data->descr[i].start_time;
+ descr->end_time = jf_metric_data->descr[i].end_time;
+ descr->id = &page_index->id;
+ descr->extent = extent;
+ extent->pages[i] = descr;
+ pg_cache_insert(ctx, page_index, descr);
+ }
+ if (likely(valid_pages))
+ df_extent_insert(extent);
+}
+
+/*
+ * Replays transaction by interpreting up to max_size bytes from buf.
+ * Sets id to the current transaction id or to 0 if unknown.
+ * Returns size of transaction record or 0 for unknown size.
+ */
+static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, uint64_t *id, unsigned max_size)
+{
+ unsigned payload_length, size_bytes;
+ int ret;
+ /* persistent structures */
+ struct rrdeng_jf_transaction_header *jf_header;
+ struct rrdeng_jf_transaction_trailer *jf_trailer;
+ uLong crc;
+
+ *id = 0;
+ jf_header = buf;
+ if (STORE_PADDING == jf_header->type) {
+ debug(D_RRDENGINE, "Skipping padding.");
+ return 0;
+ }
+ if (sizeof(*jf_header) > max_size) {
+ error("Corrupted transaction record, skipping.");
+ return 0;
+ }
+ *id = jf_header->id;
+ payload_length = jf_header->payload_length;
+ size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
+ if (size_bytes > max_size) {
+ error("Corrupted transaction record, skipping.");
+ return 0;
+ }
+ jf_trailer = buf + sizeof(*jf_header) + payload_length;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
+ ret = crc32cmp(jf_trailer->checksum, crc);
+ debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
+ if (unlikely(ret)) {
+ return size_bytes;
+ }
+ switch (jf_header->type) {
+ case STORE_DATA:
+ debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ break;
+ default:
+ error("Unknown transaction type. Skipping record.");
+ break;
+ }
+
+ return size_bytes;
+}
+
+
+#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256)
+/*
+ * Iterates journal file transactions and populates the page cache.
+ * Page cache must already be initialized.
+ * Returns the maximum transaction id it discovered.
+ */
+static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
+{
+ uv_file file;
+ uint64_t file_size;//, data_file_size;
+ int ret;
+ uint64_t pos, pos_i, max_id, id;
+ unsigned size_bytes;
+ void *buf;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ file = journalfile->file;
+ file_size = journalfile->pos;
+ //data_file_size = journalfile->datafile->pos; TODO: utilize this?
+
+ max_id = 1;
+ ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+
+ for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) {
+ size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
+ iov = uv_buf_init(buf, size_bytes);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_read: %s", uv_strerror(ret));
+ /*uv_fs_req_cleanup(&req);*/
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+ ctx->stats.io_read_bytes += size_bytes;
+ ++ctx->stats.io_read_requests;
+
+ //pos_i = pos;
+ //while (pos_i < pos + size_bytes) {
+ for (pos_i = 0 ; pos_i < size_bytes ; ) {
+ unsigned max_size;
+
+ max_size = pos + size_bytes - pos_i;
+ ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
+ if (!ret) /* TODO: support transactions bigger than 4K */
+ /* unknown transaction size, move on to the next block */
+ pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
+ else
+ pos_i += ret;
+ max_id = MAX(max_id, id);
+ }
+ }
+
+ free(buf);
+ return max_id;
+}
+
+int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile)
+{
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ uint64_t file_size, max_id;
+ char path[1024];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ /* if (UV_ENOENT != fd) */
+ error("uv_fs_fsopen: %s", uv_strerror(fd));
+ uv_fs_req_cleanup(&req);
+ return fd;
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ info("Loading journal file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_FLOOR(file_size);
+
+ ret = check_journal_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
+ ++ctx->stats.io_read_requests;
+
+ journalfile->file = file;
+ journalfile->pos = file_size;
+
+ max_id = iterate_transactions(ctx, journalfile);
+
+ ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1);
+
+ info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ (void) uv_fs_close(NULL, &req, file, NULL);
+ uv_fs_req_cleanup(&req);
+ return ret;
+}
+
+void init_commit_log(struct rrdengine_instance *ctx)
+{
+ ctx->commit_log.buf = NULL;
+ ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.transaction_id = 1;
+} \ No newline at end of file
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
new file mode 100644
index 000000000..50489aeee
--- /dev/null
+++ b/database/engine/journalfile.h
@@ -0,0 +1,46 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_JOURNALFILE_H
+#define NETDATA_JOURNALFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_instance;
+struct rrdengine_worker_config;
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+
+#define WALFILE_PREFIX "journalfile-"
+#define WALFILE_EXTENSION ".njf"
+
+
+/* only one event loop is supported for now */
+struct rrdengine_journalfile {
+ uv_file file;
+ uint64_t pos;
+
+ struct rrdengine_datafile *datafile;
+};
+
+/* only one event loop is supported for now */
+struct transaction_commit_log {
+ uint64_t transaction_id;
+
+ /* outstanding transaction buffer */
+ void *buf;
+ unsigned buf_pos;
+ unsigned buf_size;
+};
+
+extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
+extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
+extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile);
+extern void init_commit_log(struct rrdengine_instance *ctx);
+
+
+#endif /* NETDATA_JOURNALFILE_H */ \ No newline at end of file
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
new file mode 100644
index 000000000..c90947a67
--- /dev/null
+++ b/database/engine/pagecache.c
@@ -0,0 +1,792 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdengine.h"
+
+/* Forward declerations */
+static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
+
+/* always inserts into tail */
+static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ if (likely(NULL != pg_cache->replaceQ.tail)) {
+ descr->prev = pg_cache->replaceQ.tail;
+ pg_cache->replaceQ.tail->next = descr;
+ }
+ if (unlikely(NULL == pg_cache->replaceQ.head)) {
+ pg_cache->replaceQ.head = descr;
+ }
+ pg_cache->replaceQ.tail = descr;
+}
+
+static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_cache_descr *prev, *next;
+
+ prev = descr->prev;
+ next = descr->next;
+
+ if (likely(NULL != prev)) {
+ prev->next = next;
+ }
+ if (likely(NULL != next)) {
+ next->prev = prev;
+ }
+ if (unlikely(descr == pg_cache->replaceQ.head)) {
+ pg_cache->replaceQ.head = next;
+ }
+ if (unlikely(descr == pg_cache->replaceQ.tail)) {
+ pg_cache->replaceQ.tail = prev;
+ }
+ descr->prev = descr->next = NULL;
+}
+
+void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_insert_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+
+void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ pg_cache_replaceQ_insert_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+}
+
+struct rrdeng_page_cache_descr *pg_cache_create_descr(void)
+{
+ struct rrdeng_page_cache_descr *descr;
+
+ descr = mallocz(sizeof(*descr));
+ descr->page = NULL;
+ descr->page_length = 0;
+ descr->start_time = INVALID_TIME;
+ descr->end_time = INVALID_TIME;
+ descr->id = NULL;
+ descr->extent = NULL;
+ descr->flags = 0;
+ descr->prev = descr->next = descr->private = NULL;
+ descr->refcnt = 0;
+ descr->waiters = 0;
+ descr->handle = NULL;
+ assert(0 == uv_cond_init(&descr->cond));
+ assert(0 == uv_mutex_init(&descr->mutex));
+
+ return descr;
+}
+
+void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr)
+{
+ uv_cond_destroy(&descr->cond);
+ uv_mutex_destroy(&descr->mutex);
+ free(descr);
+}
+
+/* The caller must hold page descriptor lock. */
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
+{
+ if (descr->waiters)
+ uv_cond_broadcast(&descr->cond);
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ */
+void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
+{
+ ++descr->waiters;
+ uv_cond_wait(&descr->cond, &descr->mutex);
+ --descr->waiters;
+}
+
+/*
+ * Returns page flags.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ */
+unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
+{
+ unsigned long flags;
+
+ uv_mutex_lock(&descr->mutex);
+ pg_cache_wait_event_unsafe(descr);
+ flags = descr->flags;
+ uv_mutex_unlock(&descr->mutex);
+
+ return flags;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * Gets a reference to the page descriptor.
+ * Returns 1 on success and 0 on failure.
+ */
+int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+{
+ if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && descr->refcnt)) {
+ return 0;
+ }
+ if (exclusive_access)
+ descr->flags |= RRD_PAGE_LOCKED;
+ ++descr->refcnt;
+
+ return 1;
+}
+
+/*
+ * The caller must hold page descriptor lock.
+ * Same return values as pg_cache_try_get_unsafe() without doing anything.
+ */
+int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+{
+ if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && descr->refcnt)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/*
+ * The caller must hold the page descriptor lock.
+ * This function may block doing cleanup.
+ */
+void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr)
+{
+ descr->flags &= ~RRD_PAGE_LOCKED;
+ if (0 == --descr->refcnt) {
+ pg_cache_wake_up_waiters_unsafe(descr);
+ }
+ /* TODO: perform cleanup */
+}
+
+/*
+ * This function may block doing cleanup.
+ */
+void pg_cache_put(struct rrdeng_page_cache_descr *descr)
+{
+ uv_mutex_lock(&descr->mutex);
+ pg_cache_put_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+}
+
+/* The caller must hold the page cache lock */
+static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->populated_pages -= number;
+}
+
+static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ pg_cache_release_pages_unsafe(ctx, number);
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+/*
+ * This function will block until it reserves #number populated pages.
+ * It will trigger evictions or dirty page flushing if the ctx->max_cache_pages limit is hit.
+ */
+static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ assert(number < ctx->max_cache_pages);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1)
+ debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================",
+ number);
+ while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
+ if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
+ /* failed to evict */
+ struct completion compl;
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ init_completion(&compl);
+ cmd.opcode = RRDENG_FLUSH_PAGES;
+ cmd.completion = &compl;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ /* wait for some pages to be flushed */
+ debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
+ wait_for_completion(&compl);
+ destroy_completion(&compl);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ }
+ }
+ pg_cache->populated_pages += number;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+/*
+ * This function will attempt to reserve #number populated pages.
+ * It may trigger evictions if the ctx->cache_pages_low_watermark limit is hit.
+ * Returns 0 on failure and 1 on success.
+ */
+static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned count = 0;
+ int ret = 0;
+
+ assert(number < ctx->max_cache_pages);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) {
+ debug(D_RRDENGINE,
+ "=================================\nPage cache full. Trying to reserve %u pages.\n=================================",
+ number);
+ do {
+ if (!pg_cache_try_evict_one_page_unsafe(ctx))
+ break;
+ ++count;
+ } while (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1);
+ debug(D_RRDENGINE, "Evicted %u pages.", count);
+ }
+
+ if (pg_cache->populated_pages + number < ctx->max_cache_pages + 1) {
+ pg_cache->populated_pages += number;
+ ret = 1; /* success */
+ }
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ return ret;
+}
+
+/* The caller must hold the page cache and the page descriptor locks in that order */
+static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+{
+ free(descr->page);
+ descr->page = NULL;
+ descr->flags &= ~RRD_PAGE_POPULATED;
+ pg_cache_release_pages_unsafe(ctx, 1);
+ ++ctx->stats.pg_cache_evictions;
+}
+
+/*
+ * The caller must hold the page cache lock.
+ * Lock order: page cache -> replaceQ -> descriptor
+ * This function iterates all pages and tries to evict one.
+ * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
+ * or it sets it to NULL if no write-back is in progress.
+ *
+ * Returns 1 on success and 0 on failure.
+ */
+static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned long old_flags;
+ struct rrdeng_page_cache_descr *descr;
+
+ uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
+ for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) {
+ uv_mutex_lock(&descr->mutex);
+ old_flags = descr->flags;
+ if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
+ /* must evict */
+ pg_cache_evict_unsafe(ctx, descr);
+ pg_cache_put_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+ pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+
+ return 1;
+ }
+ uv_mutex_unlock(&descr->mutex);
+ };
+ uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+
+ /* failed to evict */
+ return 0;
+}
+
+/*
+ * TODO: last waiter frees descriptor ?
+ */
+void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ int ret;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
+ assert(NULL != PValue);
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ uv_rwlock_wrlock(&page_index->lock);
+ ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
+ uv_rwlock_wrunlock(&page_index->lock);
+ if (unlikely(0 == ret)) {
+ error("Page under deletion was not in index.");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ goto destroy;
+ }
+ assert(1 == ret);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ ++ctx->stats.pg_cache_deletions;
+ --pg_cache->page_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+
+ uv_mutex_lock(&descr->mutex);
+ while (!pg_cache_try_get_unsafe(descr, 1)) {
+ debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* even a locked page could be dirty */
+ while (unlikely(descr->flags & RRD_PAGE_DIRTY)) {
+ debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ uv_mutex_unlock(&descr->mutex);
+
+ if (descr->flags & RRD_PAGE_POPULATED) {
+ /* only after locking can it be safely deleted from LRU */
+ pg_cache_replaceQ_delete(ctx, descr);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ pg_cache_evict_unsafe(ctx, descr);
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+ }
+ pg_cache_put(descr);
+destroy:
+ pg_cache_destroy_descr(descr);
+ pg_cache_update_metric_times(page_index);
+}
+
+static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time)
+{
+ usec_t pg_start, pg_end;
+
+ pg_start = descr->start_time;
+ pg_end = descr->end_time;
+
+ return (pg_start < start_time && pg_end >= start_time) ||
+ (pg_start >= start_time && pg_start <= end_time);
+}
+
+static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time)
+{
+ return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
+}
+
+/* Update metric oldest and latest timestamps efficiently when adding new values */
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr)
+{
+ usec_t oldest_time = page_index->oldest_time;
+ usec_t latest_time = page_index->latest_time;
+
+ if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
+ page_index->oldest_time = descr->start_time;
+ }
+ if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
+ page_index->latest_time = descr->end_time;
+ }
+}
+
+/* Update metric oldest and latest timestamps when removing old values */
+void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
+{
+ Pvoid_t *firstPValue, *lastPValue;
+ Word_t firstIndex, lastIndex;
+ struct rrdeng_page_cache_descr *descr;
+ usec_t oldest_time = INVALID_TIME;
+ usec_t latest_time = INVALID_TIME;
+
+ uv_rwlock_rdlock(&page_index->lock);
+ /* Find first page in range */
+ firstIndex = (Word_t)0;
+ firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
+ if (likely(NULL != firstPValue)) {
+ descr = *firstPValue;
+ oldest_time = descr->start_time;
+ }
+ lastIndex = (Word_t)-1;
+ lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
+ if (likely(NULL != lastPValue)) {
+ descr = *lastPValue;
+ latest_time = descr->end_time;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (unlikely(NULL == firstPValue)) {
+ assert(NULL == lastPValue);
+ page_index->oldest_time = page_index->latest_time = INVALID_TIME;
+ return;
+ }
+ page_index->oldest_time = oldest_time;
+ page_index->latest_time = latest_time;
+}
+
+/* If index is NULL lookup by UUID (descr->id) */
+void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+ struct rrdeng_page_cache_descr *descr)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+
+ if (descr->flags & RRD_PAGE_POPULATED) {
+ pg_cache_reserve_pages(ctx, 1);
+ if (!(descr->flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_insert(ctx, descr);
+ }
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
+ assert(NULL != PValue);
+ page_index = *PValue;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ } else {
+ page_index = index;
+ }
+
+ uv_rwlock_wrlock(&page_index->lock);
+ PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
+ *PValue = descr;
+ pg_cache_add_new_metric_time(page_index, descr);
+ uv_rwlock_wrunlock(&page_index->lock);
+
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ ++ctx->stats.pg_cache_insertions;
+ ++pg_cache->page_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
+}
+
+/*
+ * Searches for a page and triggers disk I/O if necessary and possible.
+ * Does not get a reference.
+ * Returns page index pointer for given metric UUID.
+ */
+struct pg_cache_page_index *
+ pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ int i, j, k, count, found;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ Word_t Index;
+ uint8_t failed_to_reserve;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
+ return NULL;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ /* Find first page in range */
+ found = 0;
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ found = 1;
+ }
+ }
+ if (!found) {
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ found = 1;
+ }
+ }
+ }
+ if (!found) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
+ return page_index;
+ }
+
+ for (count = 0 ;
+ descr != NULL && is_page_in_time_range(descr, start_time, end_time);
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ /* Iterate all pages in range */
+
+ if (unlikely(0 == descr->page_length))
+ continue;
+ uv_mutex_lock(&descr->mutex);
+ flags = descr->flags;
+ if (pg_cache_can_get_unsafe(descr, 0)) {
+ if (flags & RRD_PAGE_POPULATED) {
+ /* success */
+ uv_mutex_unlock(&descr->mutex);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ continue;
+ }
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ preload_array[count++] = descr;
+ if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
+ uv_mutex_unlock(&descr->mutex);
+ break;
+ }
+ }
+ uv_mutex_unlock(&descr->mutex);
+
+ };
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ failed_to_reserve = 0;
+ for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
+ struct rrdeng_cmd cmd;
+ struct rrdeng_page_cache_descr *next;
+
+ descr = preload_array[i];
+ if (NULL == descr) {
+ continue;
+ }
+ if (!pg_cache_try_reserve_pages(ctx, 1)) {
+ failed_to_reserve = 1;
+ break;
+ }
+ cmd.opcode = RRDENG_READ_EXTENT;
+ cmd.read_extent.page_cache_descr[0] = descr;
+ /* don't use this page again */
+ preload_array[i] = NULL;
+ for (j = 0, k = 1 ; j < count ; ++j) {
+ next = preload_array[j];
+ if (NULL == next) {
+ continue;
+ }
+ if (descr->extent == next->extent) {
+ /* same extent, consolidate */
+ if (!pg_cache_try_reserve_pages(ctx, 1)) {
+ failed_to_reserve = 1;
+ break;
+ }
+ cmd.read_extent.page_cache_descr[k++] = next;
+ /* don't use this page again */
+ preload_array[j] = NULL;
+ }
+ }
+ cmd.read_extent.page_count = k;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ }
+ if (failed_to_reserve) {
+ debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
+ for (i = 0 ; i < count ; ++i) {
+ descr = preload_array[i];
+ if (NULL == descr) {
+ continue;
+ }
+ pg_cache_put(descr);
+ }
+ }
+ if (!count) {
+ /* no such page */
+ debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
+ }
+ return page_index;
+}
+
+/*
+ * Searches for a page and gets a reference.
+ * When point_in_time is INVALID_TIME get any page.
+ * If index is NULL lookup by UUID (id).
+ */
+struct rrdeng_page_cache_descr *
+ pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t point_in_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_cache_descr *descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ Word_t Index;
+ uint8_t page_not_in_cache;
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return NULL;
+ }
+ } else {
+ page_index = index;
+ }
+ pg_cache_reserve_pages(ctx, 1);
+
+ page_not_in_cache = 0;
+ uv_rwlock_rdlock(&page_index->lock);
+ while (1) {
+ Index = (Word_t)(point_in_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ if (NULL == PValue ||
+ 0 == descr->page_length ||
+ (INVALID_TIME != point_in_time &&
+ !is_point_in_time_in_page(descr, point_in_time))) {
+ /* non-empty page not found */
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ uv_mutex_lock(&descr->mutex);
+ flags = descr->flags;
+ if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
+ /* success */
+ uv_mutex_unlock(&descr->mutex);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ break;
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ cmd.opcode = RRDENG_READ_PAGE;
+ cmd.read_page.page_cache_descr = descr;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ while (!(descr->flags & RRD_PAGE_POPULATED)) {
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* success */
+ /* Downgrade exclusive reference to allow other readers */
+ descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_wake_up_waiters_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ return descr;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ if (!(flags & RRD_PAGE_POPULATED))
+ page_not_in_cache = 1;
+ pg_cache_wait_event_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+
+ /* reset scan to find again */
+ uv_rwlock_rdlock(&page_index->lock);
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (!(flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_set_hot(ctx, descr);
+ pg_cache_release_pages(ctx, 1);
+ if (page_not_in_cache)
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ else
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
+ return descr;
+}
+
+struct pg_cache_page_index *create_page_index(uuid_t *id)
+{
+ struct pg_cache_page_index *page_index;
+
+ page_index = mallocz(sizeof(*page_index));
+ page_index->JudyL_array = (Pvoid_t) NULL;
+ uuid_copy(page_index->id, *id);
+ assert(0 == uv_rwlock_init(&page_index->lock));
+ page_index->oldest_time = INVALID_TIME;
+ page_index->latest_time = INVALID_TIME;
+
+ return page_index;
+}
+
+static void init_metrics_index(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
+ assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
+}
+
+static void init_replaceQ(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->replaceQ.head = NULL;
+ pg_cache->replaceQ.tail = NULL;
+ assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
+}
+
+static void init_commited_page_index(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->commited_page_index.JudyL_array = (Pvoid_t) NULL;
+ assert(0 == uv_rwlock_init(&pg_cache->commited_page_index.lock));
+ pg_cache->commited_page_index.latest_corr_id = 0;
+ pg_cache->commited_page_index.nr_commited_pages = 0;
+}
+
+void init_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ pg_cache->page_descriptors = 0;
+ pg_cache->populated_pages = 0;
+ assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
+
+ init_metrics_index(ctx);
+ init_replaceQ(ctx);
+ init_commited_page_index(ctx);
+} \ No newline at end of file
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
new file mode 100644
index 000000000..d1e29aaab
--- /dev/null
+++ b/database/engine/pagecache.h
@@ -0,0 +1,132 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PAGECACHE_H
+#define NETDATA_PAGECACHE_H
+
+#include "rrdengine.h"
+
+/* Forward declerations */
+struct rrdengine_instance;
+struct extent_info;
+
+#define INVALID_TIME (0)
+
+/* Page flags */
+#define RRD_PAGE_DIRTY (1LU << 0)
+#define RRD_PAGE_LOCKED (1LU << 1)
+#define RRD_PAGE_READ_PENDING (1LU << 2)
+#define RRD_PAGE_WRITE_PENDING (1LU << 3)
+#define RRD_PAGE_POPULATED (1LU << 4)
+
+struct rrdeng_page_cache_descr {
+ void *page;
+ uint32_t page_length;
+ usec_t start_time;
+ usec_t end_time;
+ uuid_t *id; /* never changes */
+ struct extent_info *extent;
+ unsigned long flags;
+ void *private;
+ struct rrdeng_page_cache_descr *prev;
+ struct rrdeng_page_cache_descr *next;
+
+ /* TODO: move waiter logic to concurrency table */
+ unsigned refcnt;
+ uv_mutex_t mutex; /* always take it after the page cache lock or after the commit lock */
+ uv_cond_t cond;
+ unsigned waiters;
+ struct rrdeng_collect_handle *handle; /* API user */
+};
+
+#define PAGE_CACHE_MAX_PRELOAD_PAGES (256)
+
+/* maps time ranges to pages */
+struct pg_cache_page_index {
+ uuid_t id;
+ /*
+ * care: JudyL_array indices are converted from useconds to seconds to fit in one word in 32-bit architectures
+ * TODO: examine if we want to support better granularity than seconds
+ */
+ Pvoid_t JudyL_array;
+ uv_rwlock_t lock;
+
+ /*
+ * Only one effective writer, data deletion workqueue.
+ * It's also written during the DB loading phase.
+ */
+ usec_t oldest_time;
+
+ /*
+ * Only one effective writer, data collection thread.
+ * It's also written by the data deletion workqueue when data collection is disabled for this metric.
+ */
+ usec_t latest_time;
+};
+
+/* maps UUIDs to page indices */
+struct pg_cache_metrics_index {
+ uv_rwlock_t lock;
+ Pvoid_t JudyHS_array;
+};
+
+/* gathers dirty pages to be written on disk */
+struct pg_cache_commited_page_index {
+ uv_rwlock_t lock;
+
+ Pvoid_t JudyL_array;
+
+ /*
+ * Dirty page correlation ID is a hint. Dirty pages that are correlated should have
+ * a small correlation ID difference. Dirty pages in memory should never have the
+ * same ID at the same time for correctness.
+ */
+ Word_t latest_corr_id;
+
+ unsigned nr_commited_pages;
+};
+
+/* gathers populated pages to be evicted */
+struct pg_cache_replaceQ {
+ uv_rwlock_t lock; /* LRU lock */
+
+ struct rrdeng_page_cache_descr *head; /* LRU */
+ struct rrdeng_page_cache_descr *tail; /* MRU */
+};
+
+struct page_cache { /* TODO: add statistics */
+ uv_rwlock_t pg_cache_rwlock; /* page cache lock */
+
+ struct pg_cache_metrics_index metrics_index;
+ struct pg_cache_commited_page_index commited_page_index;
+ struct pg_cache_replaceQ replaceQ;
+
+ unsigned page_descriptors;
+ unsigned populated_pages;
+};
+
+extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr);
+extern unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+ struct rrdeng_page_cache_descr *descr);
+extern struct rrdeng_page_cache_descr *pg_cache_create_descr(void);
+extern void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_put(struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+ struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr);
+extern struct pg_cache_page_index *
+ pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time);
+extern struct rrdeng_page_cache_descr *
+ pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t point_in_time);
+extern struct pg_cache_page_index *create_page_index(uuid_t *id);
+extern void init_page_cache(struct rrdengine_instance *ctx);
+extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr);
+extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
+
+#endif /* NETDATA_PAGECACHE_H */ \ No newline at end of file
diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h
new file mode 100644
index 000000000..db47af531
--- /dev/null
+++ b/database/engine/rrddiskprotocol.h
@@ -0,0 +1,119 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDDISKPROTOCOL_H
+#define NETDATA_RRDDISKPROTOCOL_H
+
+#define RRDENG_BLOCK_SIZE (4096)
+#define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE
+
+#define RRDENG_MAGIC_SZ (32)
+#define RRDENG_DF_MAGIC "netdata-data-file"
+#define RRDENG_JF_MAGIC "netdata-journal-file"
+
+#define RRDENG_VER_SZ (16)
+#define RRDENG_DF_VER "1.0"
+#define RRDENG_JF_VER "1.0"
+
+#define UUID_SZ (16)
+#define CHECKSUM_SZ (4) /* CRC32 */
+
+#define RRD_NO_COMPRESSION (0)
+#define RRD_LZ4 (1)
+
+#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t)))
+/*
+ * Data file persistent super-block
+ */
+struct rrdeng_df_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ char version[RRDENG_VER_SZ];
+ uint8_t tier;
+ uint8_t padding[RRDENG_DF_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Page types
+ */
+#define PAGE_METRICS (0)
+#define PAGE_LOGS (1) /* reserved */
+
+/*
+ * Data file page descriptor
+ */
+struct rrdeng_extent_page_descr {
+ uint8_t type;
+
+ uint8_t uuid[UUID_SZ];
+ uint32_t page_length;
+ uint64_t start_time;
+ uint64_t end_time;
+} __attribute__ ((packed));
+
+/*
+ * Data file extent header
+ */
+struct rrdeng_df_extent_header {
+ uint32_t payload_length;
+ uint8_t compression_algorithm;
+ uint8_t number_of_pages;
+ /* #number_of_pages page descriptors follow */
+ struct rrdeng_extent_page_descr descr[];
+} __attribute__ ((packed));
+
+/*
+ * Data file extent trailer
+ */
+struct rrdeng_df_extent_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+#define RRDENG_JF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ))
+/*
+ * Journal file super-block
+ */
+struct rrdeng_jf_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ char version[RRDENG_VER_SZ];
+ uint8_t padding[RRDENG_JF_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Transaction record types
+ */
+#define STORE_PADDING (0)
+#define STORE_DATA (1)
+#define STORE_LOGS (2) /* reserved */
+
+/*
+ * Journal file transaction record header
+ */
+struct rrdeng_jf_transaction_header {
+ /* when set to STORE_PADDING jump to start of next block */
+ uint8_t type;
+
+ uint32_t reserved; /* reserved for future use */
+ uint64_t id;
+ uint16_t payload_length;
+} __attribute__ ((packed));
+
+/*
+ * Journal file transaction record trailer
+ */
+struct rrdeng_jf_transaction_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+/*
+ * Journal file STORE_DATA action
+ */
+struct rrdeng_jf_store_data {
+ /* data file extent information */
+ uint64_t extent_offset;
+ uint32_t extent_size;
+
+ uint8_t number_of_pages;
+ /* #number_of_pages page descriptors follow */
+ struct rrdeng_extent_page_descr descr[];
+} __attribute__ ((packed));
+
+#endif /* NETDATA_RRDDISKPROTOCOL_H */ \ No newline at end of file
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
new file mode 100644
index 000000000..b8e4eba01
--- /dev/null
+++ b/database/engine/rrdengine.c
@@ -0,0 +1,780 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdengine.h"
+
+void sanity_check(void)
+{
+ /* Magic numbers must fit in the super-blocks */
+ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
+ BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
+
+ /* Version strings must fit in the super-blocks */
+ BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ);
+ BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ);
+
+ /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */
+ BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0);
+
+ BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */
+
+ /* page count must fit in 8 bits */
+ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
+}
+
+void read_extent_cb(uv_fs_t* req)
+{
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdeng_page_cache_descr *descr;
+ int ret;
+ unsigned i, j, count;
+ void *page, *uncompressed_buf = NULL;
+ uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
+ struct rrdengine_datafile *datafile;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *header;
+ struct rrdeng_df_extent_trailer *trailer;
+ uLong crc;
+
+ xt_io_descr = req->data;
+ if (req->result < 0) {
+ error("%s: uv_fs_read: %s", __func__, uv_strerror((int)req->result));
+ goto cleanup;
+ }
+
+ header = xt_io_descr->buf;
+ payload_length = header->payload_length;
+ count = header->number_of_pages;
+
+ payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
+
+ trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
+ ret = crc32cmp(trailer->checksum, crc);
+ datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+ if (unlikely(ret)) {
+ /* TODO: handle errors */
+ exit(UV_EIO);
+ goto cleanup;
+ }
+
+ if (RRD_NO_COMPRESSION != header->compression_algorithm) {
+ uncompressed_payload_length = 0;
+ for (i = 0 ; i < count ; ++i) {
+ uncompressed_payload_length += header->descr[i].page_length;
+ }
+ uncompressed_buf = mallocz(uncompressed_payload_length);
+ ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf,
+ payload_length, uncompressed_payload_length);
+ ctx->stats.before_decompress_bytes += payload_length;
+ ctx->stats.after_decompress_bytes += ret;
+ debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret);
+ /* care, we don't hold the descriptor mutex */
+ }
+
+ for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
+ page = mallocz(RRDENG_BLOCK_SIZE);
+ descr = xt_io_descr->descr_array[i];
+ for (j = 0, page_offset = 0; j < count; ++j) {
+ /* care, we don't hold the descriptor mutex */
+ if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) &&
+ header->descr[j].page_length == descr->page_length &&
+ header->descr[j].start_time == descr->start_time &&
+ header->descr[j].end_time == descr->end_time) {
+ break;
+ }
+ page_offset += header->descr[j].page_length;
+ }
+ /* care, we don't hold the descriptor mutex */
+ if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
+ } else {
+ (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
+ }
+ pg_cache_replaceQ_insert(ctx, descr);
+ uv_mutex_lock(&descr->mutex);
+ descr->page = page;
+ descr->flags |= RRD_PAGE_POPULATED;
+ descr->flags &= ~RRD_PAGE_READ_PENDING;
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ if (xt_io_descr->release_descr) {
+ pg_cache_put_unsafe(descr);
+ } else {
+ pg_cache_wake_up_waiters_unsafe(descr);
+ }
+ uv_mutex_unlock(&descr->mutex);
+ }
+ if (RRD_NO_COMPRESSION != header->compression_algorithm) {
+ free(uncompressed_buf);
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+cleanup:
+ uv_fs_req_cleanup(req);
+ free(xt_io_descr->buf);
+ free(xt_io_descr);
+}
+
+
+static void do_read_extent(struct rrdengine_worker_config* wc,
+ struct rrdeng_page_cache_descr **descr,
+ unsigned count,
+ uint8_t release_descr)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ unsigned i, size_bytes, pos, real_io_size;
+// uint32_t payload_length;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdengine_datafile *datafile;
+
+ datafile = descr[0]->extent->datafile;
+ pos = descr[0]->extent->offset;
+ size_bytes = descr[0]->extent->size;
+
+ xt_io_descr = mallocz(sizeof(*xt_io_descr));
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* free(xt_io_descr);
+ return;*/
+ }
+ for (i = 0 ; i < count; ++i) {
+ uv_mutex_lock(&descr[i]->mutex);
+ descr[i]->flags |= RRD_PAGE_READ_PENDING;
+// payload_length = descr[i]->page_length;
+ uv_mutex_unlock(&descr[i]->mutex);
+
+ xt_io_descr->descr_array[i] = descr[i];
+ }
+ xt_io_descr->descr_count = count;
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->pos = pos;
+ xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->completion = NULL;
+ /* xt_io_descr->descr_commit_idx_array[0] */
+ xt_io_descr->release_descr = release_descr;
+
+ real_io_size = ALIGN_BYTES_CEILING(size_bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb);
+ assert (-1 != ret);
+ ctx->stats.io_read_bytes += real_io_size;
+ ++ctx->stats.io_read_requests;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+ ++ctx->stats.io_read_extents;
+ ctx->stats.pg_cache_backfills += count;
+}
+
+static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ unsigned count, payload_length, descr_size, size_bytes;
+ void *buf;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *df_header;
+ struct rrdeng_jf_transaction_header *jf_header;
+ struct rrdeng_jf_store_data *jf_metric_data;
+ struct rrdeng_jf_transaction_trailer *jf_trailer;
+ uLong crc;
+
+ df_header = xt_io_descr->buf;
+ count = df_header->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
+
+ buf = wal_get_transaction_buffer(wc, size_bytes);
+
+ jf_header = buf;
+ jf_header->type = STORE_DATA;
+ jf_header->reserved = 0;
+ jf_header->id = ctx->commit_log.transaction_id++;
+ jf_header->payload_length = payload_length;
+
+ jf_metric_data = buf + sizeof(*jf_header);
+ jf_metric_data->extent_offset = xt_io_descr->pos;
+ jf_metric_data->extent_size = xt_io_descr->bytes;
+ jf_metric_data->number_of_pages = count;
+ memcpy(jf_metric_data->descr, df_header->descr, descr_size);
+
+ jf_trailer = buf + sizeof(*jf_header) + payload_length;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
+ crc32set(jf_trailer->checksum, crc);
+}
+
+static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data)
+{
+ switch (type) {
+ case STORE_DATA:
+ commit_data_extent(wc, (struct extent_io_descriptor *)data);
+ break;
+ default:
+ assert(type == STORE_DATA);
+ break;
+ }
+}
+
+void flush_pages_cb(uv_fs_t* req)
+{
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct extent_io_descriptor *xt_io_descr;
+ struct rrdeng_page_cache_descr *descr;
+ struct rrdengine_datafile *datafile;
+ int ret;
+ unsigned i, count;
+ Word_t commit_id;
+
+ xt_io_descr = req->data;
+ if (req->result < 0) {
+ error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ goto cleanup;
+ }
+ datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
+ __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+
+ count = xt_io_descr->descr_count;
+ for (i = 0 ; i < count ; ++i) {
+ /* care, we don't hold the descriptor mutex */
+ descr = xt_io_descr->descr_array[i];
+
+ uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
+ commit_id = xt_io_descr->descr_commit_idx_array[i];
+ ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, commit_id, PJE0);
+ assert(1 == ret);
+ --pg_cache->commited_page_index.nr_commited_pages;
+ uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+
+ pg_cache_replaceQ_insert(ctx, descr);
+
+ uv_mutex_lock(&descr->mutex);
+ descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
+ /* wake up waiters, care no reference being held */
+ pg_cache_wake_up_waiters_unsafe(descr);
+ uv_mutex_unlock(&descr->mutex);
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+cleanup:
+ uv_fs_req_cleanup(req);
+ free(xt_io_descr->buf);
+ free(xt_io_descr);
+}
+
+/*
+ * completion must be NULL or valid.
+ * Returns 0 when no flushing can take place.
+ * Returns datafile bytes to be written on successful flushing initiation.
+ */
+static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ int compressed_size, max_compressed_size = 0;
+ unsigned i, count, size_bytes, pos, real_io_size;
+ uint32_t uncompressed_payload_length, payload_offset;
+ struct rrdeng_page_cache_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
+ struct extent_io_descriptor *xt_io_descr;
+ void *compressed_buf = NULL;
+ Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ Pvoid_t *PValue;
+ Word_t Index;
+ uint8_t compression_algorithm = ctx->global_compress_alg;
+ struct extent_info *extent;
+ struct rrdengine_datafile *datafile;
+ /* persistent structures */
+ struct rrdeng_df_extent_header *header;
+ struct rrdeng_df_extent_trailer *trailer;
+ uLong crc;
+
+ if (force) {
+ debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
+ }
+ uv_rwlock_rdlock(&pg_cache->commited_page_index.lock);
+ for (Index = 0, count = 0, uncompressed_payload_length = 0,
+ PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue ;
+
+ descr != NULL && count != MAX_PAGES_PER_EXTENT ;
+
+ PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ assert(0 != descr->page_length);
+
+ uv_mutex_lock(&descr->mutex);
+ if (!(descr->flags & RRD_PAGE_WRITE_PENDING)) {
+ /* care, no reference being held */
+ descr->flags |= RRD_PAGE_WRITE_PENDING;
+ uncompressed_payload_length += descr->page_length;
+ descr_commit_idx_array[count] = Index;
+ eligible_pages[count++] = descr;
+ }
+ uv_mutex_unlock(&descr->mutex);
+ }
+ uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock);
+
+ if (!count) {
+ debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
+ if (completion)
+ complete(completion);
+ return 0;
+ }
+ xt_io_descr = mallocz(sizeof(*xt_io_descr));
+ payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
+ switch (compression_algorithm) {
+ case RRD_NO_COMPRESSION:
+ size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
+ break;
+ default: /* Compress */
+ assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
+ max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
+ compressed_buf = mallocz(max_compressed_size);
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
+ break;
+ }
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* free(xt_io_descr);*/
+ }
+ (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_cache_descr *) * count);
+ xt_io_descr->descr_count = count;
+
+ pos = 0;
+ header = xt_io_descr->buf;
+ header->compression_algorithm = compression_algorithm;
+ header->number_of_pages = count;
+ pos += sizeof(*header);
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */
+ extent->offset = datafile->pos;
+ extent->number_of_pages = count;
+ extent->datafile = datafile;
+ extent->next = NULL;
+
+ for (i = 0 ; i < count ; ++i) {
+ /* This is here for performance reasons */
+ xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i];
+
+ descr = xt_io_descr->descr_array[i];
+ header->descr[i].type = PAGE_METRICS;
+ uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
+ header->descr[i].page_length = descr->page_length;
+ header->descr[i].start_time = descr->start_time;
+ header->descr[i].end_time = descr->end_time;
+ pos += sizeof(header->descr[i]);
+ }
+ for (i = 0 ; i < count ; ++i) {
+ descr = xt_io_descr->descr_array[i];
+ /* care, we don't hold the descriptor mutex */
+ (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length);
+ descr->extent = extent;
+ extent->pages[i] = descr;
+
+ pos += descr->page_length;
+ }
+ df_extent_insert(extent);
+
+ switch (compression_algorithm) {
+ case RRD_NO_COMPRESSION:
+ header->payload_length = uncompressed_payload_length;
+ break;
+ default: /* Compress */
+ compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf,
+ uncompressed_payload_length, max_compressed_size);
+ ctx->stats.before_compress_bytes += uncompressed_payload_length;
+ ctx->stats.after_compress_bytes += compressed_size;
+ debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
+ (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
+ free(compressed_buf);
+ size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+ header->payload_length = compressed_size;
+ break;
+ }
+ extent->size = size_bytes;
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->pos = datafile->pos;
+ xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->completion = completion;
+
+ trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer));
+ crc32set(trailer->checksum, crc);
+
+ real_io_size = ALIGN_BYTES_CEILING(size_bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb);
+ assert (-1 != ret);
+ ctx->stats.io_write_bytes += real_io_size;
+ ++ctx->stats.io_write_requests;
+ ctx->stats.io_write_extent_bytes += real_io_size;
+ ++ctx->stats.io_write_extents;
+ do_commit_transaction(wc, STORE_DATA, xt_io_descr);
+ datafile->pos += ALIGN_BYTES_CEILING(size_bytes);
+ ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes);
+ rrdeng_test_quota(wc);
+
+ return ALIGN_BYTES_CEILING(size_bytes);
+}
+
+static void after_delete_old_data(uv_work_t *req, int status)
+{
+ struct rrdengine_instance *ctx = req->data;
+ struct rrdengine_worker_config* wc = &ctx->worker_config;
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ unsigned bytes;
+
+ (void)status;
+ datafile = ctx->datafiles.first;
+ journalfile = datafile->journalfile;
+ bytes = datafile->pos + journalfile->pos;
+
+ datafile_list_delete(ctx, datafile);
+ destroy_journal_file(journalfile, datafile);
+ destroy_data_file(datafile);
+ info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ datafile->tier, datafile->fileno);
+ free(journalfile);
+ free(datafile);
+
+ ctx->disk_space -= bytes;
+ info("Reclaimed %u bytes of disk space.", bytes);
+
+ /* unfreeze command processing */
+ wc->now_deleting.data = NULL;
+ /* wake up event loop */
+ assert(0 == uv_async_send(&wc->async));
+}
+
+static void delete_old_data(uv_work_t *req)
+{
+ struct rrdengine_instance *ctx = req->data;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *extent, *next;
+ struct rrdeng_page_cache_descr *descr;
+ unsigned count, i;
+
+ /* Safe to use since it will be deleted after we are done */
+ datafile = ctx->datafiles.first;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next) {
+ count = extent->number_of_pages;
+ for (i = 0 ; i < count ; ++i) {
+ descr = extent->pages[i];
+ pg_cache_punch_hole(ctx, descr);
+ }
+ next = extent->next;
+ free(extent);
+ }
+}
+
+void rrdeng_test_quota(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdengine_datafile *datafile;
+ unsigned current_size, target_size;
+ uint8_t out_of_space, only_one_datafile;
+
+ out_of_space = 0;
+ if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
+ out_of_space = 1;
+ }
+ datafile = ctx->datafiles.last;
+ current_size = datafile->pos;
+ target_size = ctx->max_disk_space / TARGET_DATAFILES;
+ target_size = MIN(target_size, MAX_DATAFILE_SIZE);
+ target_size = MAX(target_size, MIN_DATAFILE_SIZE);
+ only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0;
+ if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
+ /* Finalize data and journal file and create a new pair */
+ wal_flush_transaction_buffer(wc);
+ create_new_datafile_pair(ctx, 1, datafile->fileno + 1);
+ }
+ if (unlikely(out_of_space)) {
+ /* delete old data */
+ if (wc->now_deleting.data) {
+ /* already deleting data */
+ return;
+ }
+ info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ wc->now_deleting.data = ctx;
+ uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data);
+ }
+}
+
+int init_rrd_files(struct rrdengine_instance *ctx)
+{
+ return init_data_files(ctx);
+}
+
+void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
+{
+ wc->cmd_queue.head = wc->cmd_queue.tail = 0;
+ wc->queue_size = 0;
+ assert(0 == uv_cond_init(&wc->cmd_cond));
+ assert(0 == uv_mutex_init(&wc->cmd_mutex));
+}
+
+void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
+{
+ unsigned queue_size;
+
+ /* wait for free space in queue */
+ uv_mutex_lock(&wc->cmd_mutex);
+ while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) {
+ uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
+ }
+ assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
+ /* enqueue command */
+ wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
+ wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ?
+ wc->cmd_queue.tail + 1 : 0;
+ wc->queue_size = queue_size + 1;
+ uv_mutex_unlock(&wc->cmd_mutex);
+
+ /* wake up event loop */
+ assert(0 == uv_async_send(&wc->async));
+}
+
+struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
+{
+ struct rrdeng_cmd ret;
+ unsigned queue_size;
+
+ uv_mutex_lock(&wc->cmd_mutex);
+ queue_size = wc->queue_size;
+ if (queue_size == 0) {
+ ret.opcode = RRDENG_NOOP;
+ } else {
+ /* dequeue command */
+ ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
+ if (queue_size == 1) {
+ wc->cmd_queue.head = wc->cmd_queue.tail = 0;
+ } else {
+ wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ?
+ wc->cmd_queue.head + 1 : 0;
+ }
+ wc->queue_size = queue_size - 1;
+
+ /* wake up producers */
+ uv_cond_signal(&wc->cmd_cond);
+ }
+ uv_mutex_unlock(&wc->cmd_mutex);
+
+ return ret;
+}
+
+void async_cb(uv_async_t *handle)
+{
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+ debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
+}
+
+void timer_cb(uv_timer_t* handle)
+{
+ struct rrdengine_worker_config* wc = handle->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+ rrdeng_test_quota(wc);
+ debug(D_RRDENGINE, "%s: timeout reached.", __func__);
+ if (likely(!wc->now_deleting.data)) {
+ unsigned total_bytes, bytes_written;
+
+ /* There is free space so we can write to disk */
+ debug(D_RRDENGINE, "Flushing pages to disk.");
+ for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
+ bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 0, NULL);
+ }
+ }
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ char buf[4096];
+ debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf)));
+ }
+#endif
+}
+
+/* Flushes dirty pages when timer expires */
+#define TIMER_PERIOD_MS (1000)
+
+#define CMD_BATCH_SIZE (256)
+
+void rrdeng_worker(void* arg)
+{
+ struct rrdengine_worker_config* wc = arg;
+ struct rrdengine_instance *ctx = wc->ctx;
+ uv_loop_t* loop;
+ int shutdown;
+ enum rrdeng_opcode opcode;
+ uv_timer_t timer_req;
+ struct rrdeng_cmd cmd;
+
+ rrdeng_init_cmd_queue(wc);
+
+ loop = wc->loop = mallocz(sizeof(uv_loop_t));
+ uv_loop_init(loop);
+ loop->data = wc;
+
+ uv_async_init(wc->loop, &wc->async, async_cb);
+ wc->async.data = wc;
+
+ wc->now_deleting.data = NULL;
+
+ /* dirty page flushing timer */
+ uv_timer_init(loop, &timer_req);
+ timer_req.data = wc;
+
+ /* wake up initialization thread */
+ complete(&ctx->rrdengine_completion);
+
+ uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS);
+ shutdown = 0;
+ while (shutdown == 0 || uv_loop_alive(loop)) {
+ uv_run(loop, UV_RUN_DEFAULT);
+ /* wait for commands */
+ do {
+ cmd = rrdeng_deq_cmd(wc);
+ opcode = cmd.opcode;
+
+ switch (opcode) {
+ case RRDENG_NOOP:
+ /* the command queue was empty, do nothing */
+ break;
+ case RRDENG_SHUTDOWN:
+ shutdown = 1;
+ if (unlikely(wc->now_deleting.data)) {
+ /* postpone shutdown until after deletion */
+ info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
+ rrdeng_enq_cmd(wc, &cmd);
+ break;
+ }
+ /*
+ * uv_async_send after uv_close does not seem to crash in linux at the moment,
+ * it is however undocumented behaviour and we need to be aware if this becomes
+ * an issue in the future.
+ */
+ uv_close((uv_handle_t *)&wc->async, NULL);
+ assert(0 == uv_timer_stop(&timer_req));
+ uv_close((uv_handle_t *)&timer_req, NULL);
+ info("Shutting down RRD engine event loop.");
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all commited pages. */
+ }
+ break;
+ case RRDENG_READ_PAGE:
+ do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
+ break;
+ case RRDENG_READ_EXTENT:
+ do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1);
+ break;
+ case RRDENG_COMMIT_PAGE:
+ do_commit_transaction(wc, STORE_DATA, NULL);
+ break;
+ case RRDENG_FLUSH_PAGES: {
+ unsigned total_bytes, bytes_written;
+
+ /* First I/O should be enough to call completion */
+ bytes_written = do_flush_pages(wc, 1, cmd.completion);
+ for (total_bytes = bytes_written ;
+ bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 1, NULL);
+ }
+ break;
+ }
+ default:
+ debug(D_RRDENGINE, "%s: default.", __func__);
+ break;
+ }
+ } while (opcode != RRDENG_NOOP);
+ }
+ /* cleanup operations of the event loop */
+ wal_flush_transaction_buffer(wc);
+ uv_run(loop, UV_RUN_DEFAULT);
+
+ info("Shutting down RRD engine event loop complete.");
+ /* TODO: don't let the API block by waiting to enqueue commands */
+ uv_cond_destroy(&wc->cmd_cond);
+/* uv_mutex_destroy(&wc->cmd_mutex); */
+ assert(0 == uv_loop_close(loop));
+ free(loop);
+}
+
+
+#define NR_PAGES (256)
+static void basic_functional_test(struct rrdengine_instance *ctx)
+{
+ int i, j, failed_validations;
+ uuid_t uuid[NR_PAGES];
+ void *buf;
+ struct rrdeng_page_cache_descr *handle[NR_PAGES];
+ char uuid_str[37];
+ char backup[NR_PAGES][37 * 100]; /* backup storage for page data verification */
+
+ for (i = 0 ; i < NR_PAGES ; ++i) {
+ uuid_generate(uuid[i]);
+ uuid_unparse_lower(uuid[i], uuid_str);
+// fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str);
+ buf = rrdeng_create_page(&uuid[i], &handle[i]);
+ /* Each page contains 10 times its own UUID stringified */
+ for (j = 0 ; j < 100 ; ++j) {
+ strcpy(buf + 37 * j, uuid_str);
+ strcpy(backup[i] + 37 * j, uuid_str);
+ }
+ rrdeng_commit_page(ctx, handle[i], (Word_t)i);
+ }
+ fprintf(stderr, "\n********** CREATED %d METRIC PAGES ***********\n\n", NR_PAGES);
+ failed_validations = 0;
+ for (i = 0 ; i < NR_PAGES ; ++i) {
+ buf = rrdeng_get_latest_page(ctx, &uuid[i], (void **)&handle[i]);
+ if (NULL == buf) {
+ ++failed_validations;
+ fprintf(stderr, "Page %d was LOST.\n", i);
+ }
+ if (memcmp(backup[i], buf, 37 * 100)) {
+ ++failed_validations;
+ fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i);
+ }
+ rrdeng_put_page(ctx, handle[i]);
+ }
+ fprintf(stderr, "\n********** CORRECTLY VALIDATED %d/%d METRIC PAGES ***********\n\n",
+ NR_PAGES - failed_validations, NR_PAGES);
+
+}
+/* C entry point for development purposes
+ * make "LDFLAGS=-errdengine_main"
+ */
+void rrdengine_main(void)
+{
+ int ret;
+ struct rrdengine_instance *ctx;
+
+ ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
+ if (ret) {
+ exit(ret);
+ }
+ basic_functional_test(ctx);
+
+ rrdeng_exit(ctx);
+ fprintf(stderr, "Hello world!");
+ exit(0);
+} \ No newline at end of file
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
new file mode 100644
index 000000000..141bb9c63
--- /dev/null
+++ b/database/engine/rrdengine.h
@@ -0,0 +1,171 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINE_H
+#define NETDATA_RRDENGINE_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <fcntl.h>
+#include <aio.h>
+#include <uv.h>
+#include <assert.h>
+#include <lz4.h>
+#include <Judy.h>
+#include <openssl/sha.h>
+#include <openssl/evp.h>
+#include <stdint.h>
+#include "../rrd.h"
+#include "rrddiskprotocol.h"
+#include "rrdenginelib.h"
+#include "datafile.h"
+#include "journalfile.h"
+#include "rrdengineapi.h"
+#include "pagecache.h"
+
+#ifdef NETDATA_RRD_INTERNALS
+
+#endif /* NETDATA_RRD_INTERNALS */
+
+/* Forward declerations */
+struct rrdengine_instance;
+
+#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */
+
+#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
+#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
+
+
+typedef enum {
+ RRDENGINE_STATUS_UNINITIALIZED = 0,
+ RRDENGINE_STATUS_INITIALIZING,
+ RRDENGINE_STATUS_INITIALIZED
+} rrdengine_state_t;
+
+enum rrdeng_opcode {
+ /* can be used to return empty status or flush the command queue */
+ RRDENG_NOOP = 0,
+
+ RRDENG_READ_PAGE,
+ RRDENG_READ_EXTENT,
+ RRDENG_COMMIT_PAGE,
+ RRDENG_FLUSH_PAGES,
+ RRDENG_SHUTDOWN,
+
+ RRDENG_MAX_OPCODE
+};
+
+struct rrdeng_cmd {
+ enum rrdeng_opcode opcode;
+ union {
+ struct rrdeng_read_page {
+ struct rrdeng_page_cache_descr *page_cache_descr;
+ } read_page;
+ struct rrdeng_read_extent {
+ struct rrdeng_page_cache_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
+ int page_count;
+ } read_extent;
+ struct completion *completion;
+ };
+};
+
+#define RRDENG_CMD_Q_MAX_SIZE (2048)
+
+struct rrdeng_cmdqueue {
+ unsigned head, tail;
+ struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE];
+};
+
+struct extent_io_descriptor {
+ uv_fs_t req;
+ uv_buf_t iov;
+ void *buf;
+ uint64_t pos;
+ unsigned bytes;
+ struct completion *completion;
+ unsigned descr_count;
+ int release_descr;
+ struct rrdeng_page_cache_descr *descr_array[MAX_PAGES_PER_EXTENT];
+ Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+};
+
+struct generic_io_descriptor {
+ uv_fs_t req;
+ uv_buf_t iov;
+ void *buf;
+ uint64_t pos;
+ unsigned bytes;
+ struct completion *completion;
+};
+
+struct rrdengine_worker_config {
+ struct rrdengine_instance *ctx;
+
+ uv_thread_t thread;
+ uv_loop_t* loop;
+ uv_async_t async;
+ uv_work_t now_deleting;
+
+ /* FIFO command queue */
+ uv_mutex_t cmd_mutex;
+ uv_cond_t cmd_cond;
+ volatile unsigned queue_size;
+ struct rrdeng_cmdqueue cmd_queue;
+};
+
+/*
+ * Debug statistics not used by code logic.
+ * They only describe operations since DB engine instance load time.
+ */
+struct rrdengine_statistics {
+ rrdeng_stats_t metric_API_producers;
+ rrdeng_stats_t metric_API_consumers;
+ rrdeng_stats_t pg_cache_insertions;
+ rrdeng_stats_t pg_cache_deletions;
+ rrdeng_stats_t pg_cache_hits;
+ rrdeng_stats_t pg_cache_misses;
+ rrdeng_stats_t pg_cache_backfills;
+ rrdeng_stats_t pg_cache_evictions;
+ rrdeng_stats_t before_decompress_bytes;
+ rrdeng_stats_t after_decompress_bytes;
+ rrdeng_stats_t before_compress_bytes;
+ rrdeng_stats_t after_compress_bytes;
+ rrdeng_stats_t io_write_bytes;
+ rrdeng_stats_t io_write_requests;
+ rrdeng_stats_t io_read_bytes;
+ rrdeng_stats_t io_read_requests;
+ rrdeng_stats_t io_write_extent_bytes;
+ rrdeng_stats_t io_write_extents;
+ rrdeng_stats_t io_read_extent_bytes;
+ rrdeng_stats_t io_read_extents;
+ rrdeng_stats_t datafile_creations;
+ rrdeng_stats_t datafile_deletions;
+ rrdeng_stats_t journalfile_creations;
+ rrdeng_stats_t journalfile_deletions;
+};
+
+struct rrdengine_instance {
+ rrdengine_state_t rrdengine_state;
+ struct rrdengine_worker_config worker_config;
+ struct completion rrdengine_completion;
+ struct page_cache pg_cache;
+ uint8_t global_compress_alg;
+ struct transaction_commit_log commit_log;
+ struct rrdengine_datafile_list datafiles;
+ char dbfiles_path[FILENAME_MAX+1];
+ uint64_t disk_space;
+ uint64_t max_disk_space;
+ unsigned long max_cache_pages;
+ unsigned long cache_pages_low_watermark;
+
+ struct rrdengine_statistics stats;
+};
+
+extern void sanity_check(void);
+extern int init_rrd_files(struct rrdengine_instance *ctx);
+extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
+extern void rrdeng_worker(void* arg);
+extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
+extern struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
+
+#endif /* NETDATA_RRDENGINE_H */ \ No newline at end of file
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
new file mode 100644
index 000000000..a4e711554
--- /dev/null
+++ b/database/engine/rrdengineapi.c
@@ -0,0 +1,484 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+/* Default global database instance */
+static struct rrdengine_instance default_global_ctx;
+
+int default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB;
+
+/*
+ * Gets a handle for storing metrics to the database.
+ * The handle must be released with rrdeng_store_metric_final().
+ */
+void rrdeng_store_metric_init(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct page_cache *pg_cache;
+ struct rrdengine_instance *ctx;
+ uuid_t temp_id;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ EVP_MD_CTX *evpctx;
+ unsigned char hash_value[EVP_MAX_MD_SIZE];
+ unsigned int hash_len;
+
+ //&default_global_ctx; TODO: test this use case or remove it?
+
+ ctx = rd->rrdset->rrdhost->rrdeng_ctx;
+ pg_cache = &ctx->pg_cache;
+ handle = &rd->state->handle.rrdeng;
+ handle->ctx = ctx;
+
+ evpctx = EVP_MD_CTX_create();
+ EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(evpctx, rd->id, strlen(rd->id));
+ EVP_DigestUpdate(evpctx, rd->rrdset->id, strlen(rd->rrdset->id));
+ EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
+ EVP_MD_CTX_destroy(evpctx);
+ assert(hash_len > sizeof(temp_id));
+ memcpy(&temp_id, hash_value, sizeof(temp_id));
+
+ handle->descr = NULL;
+ handle->prev_descr = NULL;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ /* First time we see the UUID */
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
+ assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(&temp_id);
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+ rd->state->rrdeng_uuid = &page_index->id;
+ handle->page_index = page_index;
+}
+
+void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct page_cache *pg_cache;
+ struct rrdeng_page_cache_descr *descr;
+ storage_number *page;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ pg_cache = &ctx->pg_cache;
+ descr = handle->descr;
+ if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) {
+ if (descr) {
+ descr->handle = NULL;
+ if (descr->page_length) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+#endif
+ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
+ ++descr->refcnt;
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(handle->prev_descr);
+ }
+ handle->prev_descr = descr;
+ } else {
+ free(descr->page);
+ free(descr);
+ handle->descr = NULL;
+ }
+ }
+ page = rrdeng_create_page(&handle->page_index->id, &descr);
+ assert(page);
+ handle->prev_descr = handle->descr;
+ handle->descr = descr;
+ descr->handle = handle;
+ uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
+ handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
+ uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+ }
+ page = descr->page;
+
+ page[descr->page_length / sizeof(number)] = number;
+ descr->end_time = point_in_time;
+ descr->page_length += sizeof(number);
+ if (unlikely(INVALID_TIME == descr->start_time)) {
+ descr->start_time = point_in_time;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, 1);
+#endif
+ pg_cache_insert(ctx, handle->page_index, descr);
+ } else {
+ pg_cache_add_new_metric_time(handle->page_index, descr);
+ }
+}
+
+/*
+ * Releases the database reference from the handle for storing metrics.
+ */
+void rrdeng_store_metric_finalize(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_cache_descr *descr;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (descr) {
+ descr->handle = NULL;
+ if (descr->page_length) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+#endif
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(handle->prev_descr);
+ }
+ } else {
+ free(descr->page);
+ free(descr);
+ }
+ }
+}
+
+/*
+ * Gets a handle for loading metrics from the database.
+ * The handle must be released with rrdeng_load_metric_final().
+ */
+void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+
+ ctx = rd->rrdset->rrdhost->rrdeng_ctx;
+ rrdimm_handle->start_time = start_time;
+ rrdimm_handle->end_time = end_time;
+ handle = &rrdimm_handle->rrdeng;
+ handle->now = start_time;
+ handle->dt = rd->rrdset->update_every;
+ handle->ctx = ctx;
+ handle->descr = NULL;
+ handle->page_index = pg_cache_preload(ctx, rd->state->rrdeng_uuid,
+ start_time * USEC_PER_SEC, end_time * USEC_PER_SEC);
+}
+
+storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_cache_descr *descr;
+ storage_number *page, ret;
+ unsigned position;
+ usec_t point_in_time;
+
+ handle = &rrdimm_handle->rrdeng;
+ if (unlikely(INVALID_TIME == handle->now)) {
+ return SN_EMPTY_SLOT;
+ }
+ ctx = handle->ctx;
+ point_in_time = handle->now * USEC_PER_SEC;
+ descr = handle->descr;
+
+ if (unlikely(NULL == handle->page_index)) {
+ ret = SN_EMPTY_SLOT;
+ goto out;
+ }
+ if (unlikely(NULL == descr ||
+ point_in_time < descr->start_time ||
+ point_in_time > descr->end_time)) {
+ if (descr) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+#endif
+ pg_cache_put(descr);
+ handle->descr = NULL;
+ }
+ descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time);
+ if (NULL == descr) {
+ ret = SN_EMPTY_SLOT;
+ goto out;
+ }
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
+#endif
+ handle->descr = descr;
+ }
+ if (unlikely(INVALID_TIME == descr->start_time ||
+ INVALID_TIME == descr->end_time)) {
+ ret = SN_EMPTY_SLOT;
+ goto out;
+ }
+ page = descr->page;
+ if (unlikely(descr->start_time == descr->end_time)) {
+ ret = page[0];
+ goto out;
+ }
+ position = ((uint64_t)(point_in_time - descr->start_time)) * (descr->page_length / sizeof(storage_number)) /
+ (descr->end_time - descr->start_time + 1);
+ ret = page[position];
+
+out:
+ handle->now += handle->dt;
+ if (unlikely(handle->now > rrdimm_handle->end_time)) {
+ handle->now = INVALID_TIME;
+ }
+ return ret;
+}
+
+int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle;
+
+ handle = &rrdimm_handle->rrdeng;
+ return (INVALID_TIME == handle->now);
+}
+
+/*
+ * Releases the database reference from the handle for loading metrics.
+ */
+void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
+{
+ struct rrdeng_query_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_cache_descr *descr;
+
+ handle = &rrdimm_handle->rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (descr) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
+#endif
+ pg_cache_put(descr);
+ }
+}
+
+time_t rrdeng_metric_latest_time(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct pg_cache_page_index *page_index;
+
+ handle = &rd->state->handle.rrdeng;
+ page_index = handle->page_index;
+
+ return page_index->latest_time / USEC_PER_SEC;
+}
+time_t rrdeng_metric_oldest_time(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct pg_cache_page_index *page_index;
+
+ handle = &rd->state->handle.rrdeng;
+ page_index = handle->page_index;
+
+ return page_index->oldest_time / USEC_PER_SEC;
+}
+
+/* Also gets a reference for the page */
+void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr)
+{
+ struct rrdeng_page_cache_descr *descr;
+ void *page;
+ int ret;
+
+ /* TODO: check maximum number of pages in page cache limit */
+
+ page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ descr = pg_cache_create_descr();
+ descr->page = page;
+ descr->id = id; /* TODO: add page type: metric, log, something? */
+ descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
+ descr->refcnt = 1;
+
+ debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------");
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ *ret_descr = descr;
+ return page;
+}
+
+/* The page must not be empty */
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+ Word_t page_correlation_id)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Pvoid_t *PValue;
+
+ if (unlikely(NULL == descr)) {
+ debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-commited.", __func__);
+ return;
+ }
+ assert(descr->page_length);
+
+ uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
+ PValue = JudyLIns(&pg_cache->commited_page_index.JudyL_array, page_correlation_id, PJE0);
+ *PValue = descr;
+ ++pg_cache->commited_page_index.nr_commited_pages;
+ uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+
+ pg_cache_put(descr);
+}
+
+/* Gets a reference for the page */
+void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
+{
+ struct rrdeng_page_cache_descr *descr;
+
+ debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
+ if (NULL == descr) {
+ *handle = NULL;
+
+ return NULL;
+ }
+ *handle = descr;
+
+ return descr->page;
+}
+
+/* Gets a reference for the page */
+void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
+{
+ struct rrdeng_page_cache_descr *descr;
+
+ debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
+ if (NULL == descr) {
+ *handle = NULL;
+
+ return NULL;
+ }
+ *handle = descr;
+
+ return descr->page;
+}
+
+void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ array[0] = (uint64_t)ctx->stats.metric_API_producers;
+ array[1] = (uint64_t)ctx->stats.metric_API_consumers;
+ array[2] = (uint64_t)pg_cache->page_descriptors;
+ array[3] = (uint64_t)pg_cache->populated_pages;
+ array[4] = (uint64_t)pg_cache->commited_page_index.nr_commited_pages;
+ array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
+ array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
+ array[7] = (uint64_t)ctx->stats.pg_cache_hits;
+ array[8] = (uint64_t)ctx->stats.pg_cache_misses;
+ array[9] = (uint64_t)ctx->stats.pg_cache_backfills;
+ array[10] = (uint64_t)ctx->stats.pg_cache_evictions;
+ array[11] = (uint64_t)ctx->stats.before_compress_bytes;
+ array[12] = (uint64_t)ctx->stats.after_compress_bytes;
+ array[13] = (uint64_t)ctx->stats.before_decompress_bytes;
+ array[14] = (uint64_t)ctx->stats.after_decompress_bytes;
+ array[15] = (uint64_t)ctx->stats.io_write_bytes;
+ array[16] = (uint64_t)ctx->stats.io_write_requests;
+ array[17] = (uint64_t)ctx->stats.io_read_bytes;
+ array[18] = (uint64_t)ctx->stats.io_read_requests;
+ array[19] = (uint64_t)ctx->stats.io_write_extent_bytes;
+ array[20] = (uint64_t)ctx->stats.io_write_extents;
+ array[21] = (uint64_t)ctx->stats.io_read_extent_bytes;
+ array[22] = (uint64_t)ctx->stats.io_read_extents;
+ array[23] = (uint64_t)ctx->stats.datafile_creations;
+ array[24] = (uint64_t)ctx->stats.datafile_deletions;
+ array[25] = (uint64_t)ctx->stats.journalfile_creations;
+ array[26] = (uint64_t)ctx->stats.journalfile_deletions;
+}
+
+/* Releases reference to page */
+void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
+{
+ (void)ctx;
+ pg_cache_put((struct rrdeng_page_cache_descr *)handle);
+}
+
+/*
+ * Returns 0 on success, 1 on error
+ */
+int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb)
+{
+ struct rrdengine_instance *ctx;
+ int error;
+
+ sanity_check();
+ if (NULL == ctxp) {
+ /* for testing */
+ ctx = &default_global_ctx;
+ memset(ctx, 0, sizeof(*ctx));
+ } else {
+ *ctxp = ctx = callocz(1, sizeof(*ctx));
+ }
+ if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) {
+ return 1;
+ }
+ ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING;
+ ctx->global_compress_alg = RRD_LZ4;
+ if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
+ page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+ ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE);
+ /* try to keep 5% of the page cache free */
+ ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100;
+ if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
+ disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
+ ctx->max_disk_space = disk_space_mb * 1048576LLU;
+ strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
+ ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
+
+ memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
+ ctx->worker_config.ctx = ctx;
+ init_page_cache(ctx);
+ init_commit_log(ctx);
+ error = init_rrd_files(ctx);
+ if (error) {
+ ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED;
+ if (ctx != &default_global_ctx) {
+ freez(ctx);
+ }
+ return 1;
+ }
+
+ init_completion(&ctx->rrdengine_completion);
+ assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
+ /* wait for worker thread to initialize */
+ wait_for_completion(&ctx->rrdengine_completion);
+ destroy_completion(&ctx->rrdengine_completion);
+
+ ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED;
+ return 0;
+}
+
+/*
+ * Returns 0 on success, 1 on error
+ */
+int rrdeng_exit(struct rrdengine_instance *ctx)
+{
+ struct rrdeng_cmd cmd;
+
+ if (NULL == ctx) {
+ /* TODO: move to per host basis */
+ ctx = &default_global_ctx;
+ }
+ if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) {
+ return 1;
+ }
+
+ /* TODO: add page to page cache */
+ cmd.opcode = RRDENG_SHUTDOWN;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ assert(0 == uv_thread_join(&ctx->worker_config.thread));
+
+ if (ctx != &default_global_ctx) {
+ freez(ctx);
+ }
+ return 0;
+} \ No newline at end of file
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
new file mode 100644
index 000000000..e76629a4b
--- /dev/null
+++ b/database/engine/rrdengineapi.h
@@ -0,0 +1,37 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINEAPI_H
+#define NETDATA_RRDENGINEAPI_H
+
+#include "rrdengine.h"
+
+#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (32)
+#define RRDENG_MIN_DISK_SPACE_MB (256)
+extern int default_rrdeng_page_cache_mb;
+extern int default_rrdeng_disk_quota_mb;
+
+extern void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr);
+extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+ Word_t page_correlation_id);
+extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
+extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
+extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
+extern void rrdeng_store_metric_init(RRDDIM *rd);
+extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
+extern void rrdeng_store_metric_finalize(RRDDIM *rd);
+extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
+ time_t start_time, time_t end_time);
+extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle);
+extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle);
+extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
+extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
+extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
+extern void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+
+/* must call once before using anything */
+extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+ unsigned disk_space_mb);
+
+extern int rrdeng_exit(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_RRDENGINEAPI_H */ \ No newline at end of file
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
new file mode 100644
index 000000000..25f57ba1b
--- /dev/null
+++ b/database/engine/rrdenginelib.c
@@ -0,0 +1,116 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr)
+{
+ char uuid_str[37];
+ char str[512];
+ int pos = 0;
+
+ uuid_unparse_lower(*page_cache_descr->id, uuid_str);
+ pos += snprintfz(str, 512 - pos, "page(%p) id=%s\n"
+ "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
+ page_cache_descr->page, uuid_str,
+ page_cache_descr->page_length,
+ (uint64_t)page_cache_descr->start_time,
+ (uint64_t)page_cache_descr->end_time);
+ if (!page_cache_descr->extent) {
+ pos += snprintfz(str + pos, 512 - pos, "N/A");
+ } else {
+ pos += snprintfz(str + pos, 512 - pos, "%"PRIu64, page_cache_descr->extent->offset);
+ }
+ snprintfz(str + pos, 512 - pos, " flags:0x%2.2lX refcnt:%u\n\n", page_cache_descr->flags, page_cache_descr->refcnt);
+ fputs(str, stderr);
+}
+
+int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
+{
+ int ret;
+ uv_fs_t req;
+ uv_stat_t* s;
+
+ ret = uv_fs_fstat(NULL, &req, file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_fstat: %s\n", uv_strerror(ret));
+ }
+ assert(req.result == 0);
+ s = req.ptr;
+ if (!(s->st_mode & S_IFREG)) {
+ error("Not a regular file.\n");
+ uv_fs_req_cleanup(&req);
+ return UV_EINVAL;
+ }
+ if (s->st_size < min_size) {
+ error("File length is too short.\n");
+ uv_fs_req_cleanup(&req);
+ return UV_EINVAL;
+ }
+ *file_size = s->st_size;
+ uv_fs_req_cleanup(&req);
+
+ return 0;
+}
+
+char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size)
+{
+ struct page_cache *pg_cache;
+
+ pg_cache = &ctx->pg_cache;
+ snprintfz(str, size,
+ "metric_API_producers: %ld\n"
+ "metric_API_consumers: %ld\n"
+ "page_cache_total_pages: %ld\n"
+ "page_cache_populated_pages: %ld\n"
+ "page_cache_commited_pages: %ld\n"
+ "page_cache_insertions: %ld\n"
+ "page_cache_deletions: %ld\n"
+ "page_cache_hits: %ld\n"
+ "page_cache_misses: %ld\n"
+ "page_cache_backfills: %ld\n"
+ "page_cache_evictions: %ld\n"
+ "compress_before_bytes: %ld\n"
+ "compress_after_bytes: %ld\n"
+ "decompress_before_bytes: %ld\n"
+ "decompress_after_bytes: %ld\n"
+ "io_write_bytes: %ld\n"
+ "io_write_requests: %ld\n"
+ "io_read_bytes: %ld\n"
+ "io_read_requests: %ld\n"
+ "io_write_extent_bytes: %ld\n"
+ "io_write_extents: %ld\n"
+ "io_read_extent_bytes: %ld\n"
+ "io_read_extents: %ld\n"
+ "datafile_creations: %ld\n"
+ "datafile_deletions: %ld\n"
+ "journalfile_creations: %ld\n"
+ "journalfile_deletions: %ld\n",
+ (long)ctx->stats.metric_API_producers,
+ (long)ctx->stats.metric_API_consumers,
+ (long)pg_cache->page_descriptors,
+ (long)pg_cache->populated_pages,
+ (long)pg_cache->commited_page_index.nr_commited_pages,
+ (long)ctx->stats.pg_cache_insertions,
+ (long)ctx->stats.pg_cache_deletions,
+ (long)ctx->stats.pg_cache_hits,
+ (long)ctx->stats.pg_cache_misses,
+ (long)ctx->stats.pg_cache_backfills,
+ (long)ctx->stats.pg_cache_evictions,
+ (long)ctx->stats.before_compress_bytes,
+ (long)ctx->stats.after_compress_bytes,
+ (long)ctx->stats.before_decompress_bytes,
+ (long)ctx->stats.after_decompress_bytes,
+ (long)ctx->stats.io_write_bytes,
+ (long)ctx->stats.io_write_requests,
+ (long)ctx->stats.io_read_bytes,
+ (long)ctx->stats.io_read_requests,
+ (long)ctx->stats.io_write_extent_bytes,
+ (long)ctx->stats.io_write_extents,
+ (long)ctx->stats.io_read_extent_bytes,
+ (long)ctx->stats.io_read_extents,
+ (long)ctx->stats.datafile_creations,
+ (long)ctx->stats.datafile_deletions,
+ (long)ctx->stats.journalfile_creations,
+ (long)ctx->stats.journalfile_deletions
+ );
+ return str;
+}
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
new file mode 100644
index 000000000..bb6f072bf
--- /dev/null
+++ b/database/engine/rrdenginelib.h
@@ -0,0 +1,84 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDENGINELIB_H
+#define NETDATA_RRDENGINELIB_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdeng_page_cache_descr;
+
+#define STR_HELPER(x) #x
+#define STR(x) STR_HELPER(x)
+
+/* Taken from linux kernel */
+#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)]))
+
+#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
+#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
+
+typedef uintptr_t rrdeng_stats_t;
+
+#ifdef __ATOMIC_RELAXED
+#define rrd_stat_atomic_add(p, n) do {(void) __atomic_fetch_add(p, n, __ATOMIC_RELAXED);} while(0)
+#else
+#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0)
+#endif
+
+#ifndef O_DIRECT
+/* Workaround for OS X */
+#define O_DIRECT (0)
+#endif
+
+struct completion {
+ uv_mutex_t mutex;
+ uv_cond_t cond;
+ volatile unsigned completed;
+};
+
+static inline void init_completion(struct completion *p)
+{
+ p->completed = 0;
+ assert(0 == uv_cond_init(&p->cond));
+ assert(0 == uv_mutex_init(&p->mutex));
+}
+
+static inline void destroy_completion(struct completion *p)
+{
+ uv_cond_destroy(&p->cond);
+ uv_mutex_destroy(&p->mutex);
+}
+
+static inline void wait_for_completion(struct completion *p)
+{
+ uv_mutex_lock(&p->mutex);
+ while (0 == p->completed) {
+ uv_cond_wait(&p->cond, &p->mutex);
+ }
+ assert(1 == p->completed);
+ uv_mutex_unlock(&p->mutex);
+}
+
+static inline void complete(struct completion *p)
+{
+ uv_mutex_lock(&p->mutex);
+ p->completed = 1;
+ uv_mutex_unlock(&p->mutex);
+ uv_cond_broadcast(&p->cond);
+}
+
+static inline int crc32cmp(void *crcp, uLong crc)
+{
+ return (*(uint32_t *)crcp != crc);
+}
+
+static inline void crc32set(void *crcp, uLong crc)
+{
+ *(uint32_t *)crcp = crc;
+}
+
+extern void print_page_cache_descr(struct rrdeng_page_cache_descr *page_cache_descr);
+extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
+extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
+
+#endif /* NETDATA_RRDENGINELIB_H */ \ No newline at end of file
diff --git a/database/rrd.c b/database/rrd.c
index 119efa62e..2457cac01 100644
--- a/database/rrd.c
+++ b/database/rrd.c
@@ -38,6 +38,9 @@ inline const char *rrd_memory_mode_name(RRD_MEMORY_MODE id) {
case RRD_MEMORY_MODE_ALLOC:
return RRD_MEMORY_MODE_ALLOC_NAME;
+
+ case RRD_MEMORY_MODE_DBENGINE:
+ return RRD_MEMORY_MODE_DBENGINE_NAME;
}
return RRD_MEMORY_MODE_SAVE_NAME;
@@ -56,6 +59,9 @@ RRD_MEMORY_MODE rrd_memory_mode_id(const char *name) {
else if(unlikely(!strcmp(name, RRD_MEMORY_MODE_ALLOC_NAME)))
return RRD_MEMORY_MODE_ALLOC;
+ else if(unlikely(!strcmp(name, RRD_MEMORY_MODE_DBENGINE_NAME)))
+ return RRD_MEMORY_MODE_DBENGINE;
+
return RRD_MEMORY_MODE_SAVE;
}
@@ -140,7 +146,8 @@ char *rrdset_cache_dir(RRDHOST *host, const char *id, const char *config_section
snprintfz(n, FILENAME_MAX, "%s/%s", host->cache_dir, b);
ret = config_get(config_section, "cache directory", n);
- if(host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
+ if(host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE ||
+ host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
int r = mkdir(ret, 0775);
if(r != 0 && errno != EEXIST)
error("Cannot create directory '%s'", ret);
diff --git a/database/rrd.h b/database/rrd.h
index 1572e62ab..3f57b9037 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -14,6 +14,14 @@ typedef struct rrdcalc RRDCALC;
typedef struct rrdcalctemplate RRDCALCTEMPLATE;
typedef struct alarm_entry ALARM_ENTRY;
+// forward declarations
+struct rrddim_volatile;
+#ifdef ENABLE_DBENGINE
+struct rrdeng_page_cache_descr;
+struct rrdengine_instance;
+struct pg_cache_page_index;
+#endif
+
#include "../daemon/common.h"
#include "web/api/queries/query.h"
#include "rrdvar.h"
@@ -66,7 +74,8 @@ typedef enum rrd_memory_mode {
RRD_MEMORY_MODE_RAM = 1,
RRD_MEMORY_MODE_MAP = 2,
RRD_MEMORY_MODE_SAVE = 3,
- RRD_MEMORY_MODE_ALLOC = 4
+ RRD_MEMORY_MODE_ALLOC = 4,
+ RRD_MEMORY_MODE_DBENGINE = 5
} RRD_MEMORY_MODE;
#define RRD_MEMORY_MODE_NONE_NAME "none"
@@ -74,6 +83,7 @@ typedef enum rrd_memory_mode {
#define RRD_MEMORY_MODE_MAP_NAME "map"
#define RRD_MEMORY_MODE_SAVE_NAME "save"
#define RRD_MEMORY_MODE_ALLOC_NAME "alloc"
+#define RRD_MEMORY_MODE_DBENGINE_NAME "dbengine"
extern RRD_MEMORY_MODE default_rrd_memory_mode;
@@ -178,7 +188,8 @@ struct rrddim {
char *cache_filename; // the filename we load/save from/to this set
size_t collections_counter; // the number of times we added values to this rrdim
- size_t unused[9];
+ struct rrddim_volatile *state; // volatile state that is not persistently stored
+ size_t unused[8];
collected_number collected_value_max; // the absolute maximum of the collected value
@@ -227,6 +238,90 @@ struct rrddim {
};
// ----------------------------------------------------------------------------
+// iterator state for RRD dimension data collection
+union rrddim_collect_handle {
+ struct {
+ long slot;
+ long entries;
+ } slotted; // state the legacy code uses
+#ifdef ENABLE_DBENGINE
+ struct rrdeng_collect_handle {
+ struct rrdeng_page_cache_descr *descr, *prev_descr;
+ unsigned long page_correlation_id;
+ struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+ } rrdeng; // state the database engine uses
+#endif
+};
+
+// ----------------------------------------------------------------------------
+// iterator state for RRD dimension data queries
+struct rrddim_query_handle {
+ RRDDIM *rd;
+ time_t start_time;
+ time_t end_time;
+ union {
+ struct {
+ long slot;
+ long last_slot;
+ uint8_t finished;
+ } slotted; // state the legacy code uses
+#ifdef ENABLE_DBENGINE
+ struct rrdeng_query_handle {
+ struct rrdeng_page_cache_descr *descr;
+ struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+ time_t now; //TODO: remove now to implement next point iteration
+ time_t dt; //TODO: remove dt to implement next point iteration
+ } rrdeng; // state the database engine uses
+#endif
+ };
+};
+
+
+// ----------------------------------------------------------------------------
+// volatile state per RRD dimension
+struct rrddim_volatile {
+#ifdef ENABLE_DBENGINE
+ uuid_t *rrdeng_uuid; // database engine metric UUID
+#endif
+ union rrddim_collect_handle handle;
+ // ------------------------------------------------------------------------
+ // function pointers that handle data collection
+ struct rrddim_collect_ops {
+ // an initialization function to run before starting collection
+ void (*init)(RRDDIM *rd);
+
+ // run this to store each metric into the database
+ void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number);
+
+ // an finalization function to run after collection is over
+ void (*finalize)(RRDDIM *rd);
+ } collect_ops;
+
+ // function pointers that handle database queries
+ struct rrddim_query_ops {
+ // run this before starting a series of next_metric() database queries
+ void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
+
+ // run this to load each metric number from the database
+ storage_number (*next_metric)(struct rrddim_query_handle *handle);
+
+ // run this to test if the series of next_metric() database queries is finished
+ int (*is_finished)(struct rrddim_query_handle *handle);
+
+ // run this after finishing a series of load_metric() database queries
+ void (*finalize)(struct rrddim_query_handle *handle);
+
+ // get the timestamp of the last entry of this metric
+ time_t (*latest_time)(RRDDIM *rd);
+
+ // get the timestamp of the first entry of this metric
+ time_t (*oldest_time)(RRDDIM *rd);
+ } query_ops;
+};
+
+// ----------------------------------------------------------------------------
// these loop macros make sure the linked list is accessed with the right lock
#define rrddim_foreach_read(rd, st) \
@@ -490,6 +585,22 @@ typedef struct alarm_log {
// ----------------------------------------------------------------------------
// RRD HOST
+struct rrdhost_system_info {
+ char *os_name;
+ char *os_id;
+ char *os_id_like;
+ char *os_version;
+ char *os_version_id;
+ char *os_detection;
+ char *kernel_name;
+ char *kernel_version;
+ char *architecture;
+ char *virtualization;
+ char *virt_detection;
+ char *container;
+ char *container_detection;
+};
+
struct rrdhost {
avl avl; // the index of hosts
@@ -512,6 +623,10 @@ struct rrdhost {
int rrd_update_every; // the update frequency of the host
long rrd_history_entries; // the number of history entries for the host's charts
+#ifdef ENABLE_DBENGINE
+ unsigned page_cache_mb; // Database Engine page cache size in MiB
+ unsigned disk_space_mb; // Database Engine disk space quota in MiB
+#endif
RRD_MEMORY_MODE rrd_memory_mode; // the memory more for the charts of this host
char *cache_dir; // the directory to save RRD cache files
@@ -520,6 +635,8 @@ struct rrdhost {
char *program_name; // the program name that collects metrics for this host
char *program_version; // the program version that collects metrics for this host
+ struct rrdhost_system_info *system_info; // information collected from the host environment
+
// ------------------------------------------------------------------------
// streaming of data to remote hosts - rrdpush
@@ -602,6 +719,10 @@ struct rrdhost {
avl_tree_lock rrdfamily_root_index; // the host's chart families index
avl_tree_lock rrdvar_root_index; // the host's chart variables index
+#ifdef ENABLE_DBENGINE
+ struct rrdengine_instance *rrdeng_ctx; // DB engine instance for this host
+#endif
+
struct rrdhost *next;
};
extern RRDHOST *localhost;
@@ -634,7 +755,7 @@ extern netdata_rwlock_t rrd_rwlock;
extern size_t rrd_hosts_available;
extern time_t rrdhost_free_orphan_time;
-extern void rrd_init(char *hostname);
+extern void rrd_init(char *hostname, struct rrdhost_system_info *system_info);
extern RRDHOST *rrdhost_find_by_hostname(const char *hostname, uint32_t hash);
extern RRDHOST *rrdhost_find_by_guid(const char *guid, uint32_t hash);
@@ -656,8 +777,12 @@ extern RRDHOST *rrdhost_find_or_create(
, char *rrdpush_destination
, char *rrdpush_api_key
, char *rrdpush_send_charts_matching
+ , struct rrdhost_system_info *system_info
);
+extern int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, char *name, char *value);
+extern struct rrdhost_system_info *rrdhost_system_info_dup(struct rrdhost_system_info *system_info);
+
#if defined(NETDATA_INTERNAL_CHECKS) && defined(NETDATA_VERIFY_LOCKS)
extern void __rrdhost_check_wrlock(RRDHOST *host, const char *file, const char *function, const unsigned long line);
extern void __rrdhost_check_rdlock(RRDHOST *host, const char *file, const char *function, const unsigned long line);
@@ -714,6 +839,7 @@ extern void rrdhost_save_all(void);
extern void rrdhost_cleanup_all(void);
extern void rrdhost_cleanup_orphan_hosts_nolock(RRDHOST *protected);
+extern void rrdhost_system_info_free(struct rrdhost_system_info *system_info);
extern void rrdhost_free(RRDHOST *host);
extern void rrdhost_save_charts(RRDHOST *host);
extern void rrdhost_delete_charts(RRDHOST *host);
@@ -748,10 +874,41 @@ extern void rrdset_isnot_obsolete(RRDSET *st);
#define rrdset_duration(st) ((time_t)( (((st)->counter >= ((unsigned long)(st)->entries))?(unsigned long)(st)->entries:(st)->counter) * (st)->update_every ))
// get the timestamp of the last entry in the round robin database
-#define rrdset_last_entry_t(st) ((time_t)(((st)->last_updated.tv_sec)))
+static inline time_t rrdset_last_entry_t(RRDSET *st) {
+ if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+ RRDDIM *rd;
+ time_t last_entry_t = 0;
+
+ int ret = netdata_rwlock_tryrdlock(&st->rrdset_rwlock);
+ rrddim_foreach_read(rd, st) {
+ last_entry_t = MAX(last_entry_t, rd->state->query_ops.latest_time(rd));
+ }
+ if(0 == ret) netdata_rwlock_unlock(&st->rrdset_rwlock);
+
+ return last_entry_t;
+ } else {
+ return (time_t)st->last_updated.tv_sec;
+ }
+}
// get the timestamp of first entry in the round robin database
-#define rrdset_first_entry_t(st) ((time_t)(rrdset_last_entry_t(st) - rrdset_duration(st)))
+static inline time_t rrdset_first_entry_t(RRDSET *st) {
+ if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+ RRDDIM *rd;
+ time_t first_entry_t = LONG_MAX;
+
+ int ret = netdata_rwlock_tryrdlock(&st->rrdset_rwlock);
+ rrddim_foreach_read(rd, st) {
+ first_entry_t = MIN(first_entry_t, rd->state->query_ops.oldest_time(rd));
+ }
+ if(0 == ret) netdata_rwlock_unlock(&st->rrdset_rwlock);
+
+ if (unlikely(LONG_MAX == first_entry_t)) return 0;
+ return first_entry_t;
+ } else {
+ return (time_t)(rrdset_last_entry_t(st) - rrdset_duration(st));
+ }
+}
// get the last slot updated in the round robin database
#define rrdset_last_slot(st) ((size_t)(((st)->current_entry == 0) ? (st)->entries - 1 : (st)->current_entry - 1))
@@ -891,5 +1048,11 @@ extern void rrdhost_cleanup_obsolete_charts(RRDHOST *host);
#endif /* NETDATA_RRD_INTERNALS */
+// ----------------------------------------------------------------------------
+// RRD DB engine declarations
+
+#ifdef ENABLE_DBENGINE
+#include "database/engine/rrdengineapi.h"
+#endif
#endif /* NETDATA_RRD_H */
diff --git a/database/rrddim.c b/database/rrddim.c
index 0a5549b98..0cf6734a6 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -90,6 +90,69 @@ inline int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor)
}
// ----------------------------------------------------------------------------
+// RRDDIM legacy data collection functions
+
+static void rrddim_collect_init(RRDDIM *rd) {
+ rd->values[rd->rrdset->current_entry] = SN_EMPTY_SLOT; // pack_storage_number(0, SN_NOT_EXISTS);
+}
+static void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storage_number number) {
+ (void)point_in_time;
+
+ rd->values[rd->rrdset->current_entry] = number;
+}
+static void rrddim_collect_finalize(RRDDIM *rd) {
+ (void)rd;
+
+ return;
+}
+
+// ----------------------------------------------------------------------------
+// RRDDIM legacy database query functions
+
+static void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time) {
+ handle->rd = rd;
+ handle->start_time = start_time;
+ handle->end_time = end_time;
+ handle->slotted.slot = rrdset_time2slot(rd->rrdset, start_time);
+ handle->slotted.last_slot = rrdset_time2slot(rd->rrdset, end_time);
+ handle->slotted.finished = 0;
+}
+
+static storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle) {
+ RRDDIM *rd = handle->rd;
+ long entries = rd->rrdset->entries;
+ long slot = handle->slotted.slot;
+
+ if (unlikely(handle->slotted.slot == handle->slotted.last_slot))
+ handle->slotted.finished = 1;
+ storage_number n = rd->values[slot++];
+
+ if(unlikely(slot >= entries)) slot = 0;
+ handle->slotted.slot = slot;
+
+ return n;
+}
+
+static int rrddim_query_is_finished(struct rrddim_query_handle *handle) {
+ return handle->slotted.finished;
+}
+
+static void rrddim_query_finalize(struct rrddim_query_handle *handle) {
+ (void)handle;
+
+ return;
+}
+
+static time_t rrddim_query_latest_time(RRDDIM *rd) {
+ return rrdset_last_entry_t(rd->rrdset);
+}
+
+static time_t rrddim_query_oldest_time(RRDDIM *rd) {
+ return rrdset_first_entry_t(rd->rrdset);
+}
+
+
+// ----------------------------------------------------------------------------
// RRDDIM create a dimension
RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collected_number multiplier, collected_number divisor, RRD_ALGORITHM algorithm, RRD_MEMORY_MODE memory_mode) {
@@ -123,9 +186,10 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rrdset_strncpyz_name(filename, id, FILENAME_MAX);
snprintfz(fullfilename, FILENAME_MAX, "%s/%s.db", st->cache_dir, filename);
- if(memory_mode == RRD_MEMORY_MODE_SAVE || memory_mode == RRD_MEMORY_MODE_MAP || memory_mode == RRD_MEMORY_MODE_RAM) {
+ if(memory_mode == RRD_MEMORY_MODE_SAVE || memory_mode == RRD_MEMORY_MODE_MAP ||
+ memory_mode == RRD_MEMORY_MODE_RAM || memory_mode == RRD_MEMORY_MODE_DBENGINE) {
rd = (RRDDIM *)mymmap(
- (memory_mode == RRD_MEMORY_MODE_RAM)?NULL:fullfilename
+ (memory_mode == RRD_MEMORY_MODE_RAM || memory_mode == RRD_MEMORY_MODE_DBENGINE)?NULL:fullfilename
, size
, ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE)
, 1
@@ -146,7 +210,7 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
struct timeval now;
now_realtime_timeval(&now);
- if(memory_mode == RRD_MEMORY_MODE_RAM) {
+ if(memory_mode == RRD_MEMORY_MODE_RAM || memory_mode == RRD_MEMORY_MODE_DBENGINE) {
memset(rd, 0, size);
}
else {
@@ -243,11 +307,34 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rd->collected_volume = 0;
rd->stored_volume = 0;
rd->last_stored_value = 0;
- rd->values[st->current_entry] = SN_EMPTY_SLOT; // pack_storage_number(0, SN_NOT_EXISTS);
rd->last_collected_time.tv_sec = 0;
rd->last_collected_time.tv_usec = 0;
rd->rrdset = st;
-
+ rd->state = mallocz(sizeof(*rd->state));
+ if(memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+#ifdef ENABLE_DBENGINE
+ rd->state->collect_ops.init = rrdeng_store_metric_init;
+ rd->state->collect_ops.store_metric = rrdeng_store_metric_next;
+ rd->state->collect_ops.finalize = rrdeng_store_metric_finalize;
+ rd->state->query_ops.init = rrdeng_load_metric_init;
+ rd->state->query_ops.next_metric = rrdeng_load_metric_next;
+ rd->state->query_ops.is_finished = rrdeng_load_metric_is_finished;
+ rd->state->query_ops.finalize = rrdeng_load_metric_finalize;
+ rd->state->query_ops.latest_time = rrdeng_metric_latest_time;
+ rd->state->query_ops.oldest_time = rrdeng_metric_oldest_time;
+#endif
+ } else {
+ rd->state->collect_ops.init = rrddim_collect_init;
+ rd->state->collect_ops.store_metric = rrddim_collect_store_metric;
+ rd->state->collect_ops.finalize = rrddim_collect_finalize;
+ rd->state->query_ops.init = rrddim_query_init;
+ rd->state->query_ops.next_metric = rrddim_query_next_metric;
+ rd->state->query_ops.is_finished = rrddim_query_is_finished;
+ rd->state->query_ops.finalize = rrddim_query_finalize;
+ rd->state->query_ops.latest_time = rrddim_query_latest_time;
+ rd->state->query_ops.oldest_time = rrddim_query_oldest_time;
+ }
+ rd->state->collect_ops.init(rd);
// append this dimension
if(!st->dimensions)
st->dimensions = rd;
@@ -294,6 +381,9 @@ void rrddim_free(RRDSET *st, RRDDIM *rd)
{
debug(D_RRD_CALLS, "rrddim_free() %s.%s", st->name, rd->name);
+ rd->state->collect_ops.finalize(rd);
+ freez(rd->state);
+
if(rd == st->dimensions)
st->dimensions = rd->next;
else {
@@ -319,6 +409,7 @@ void rrddim_free(RRDSET *st, RRDDIM *rd)
case RRD_MEMORY_MODE_SAVE:
case RRD_MEMORY_MODE_MAP:
case RRD_MEMORY_MODE_RAM:
+ case RRD_MEMORY_MODE_DBENGINE:
debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
freez((void *)rd->id);
freez(rd->cache_filename);
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 5f0f8ac64..c552c6c39 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -122,6 +122,7 @@ RRDHOST *rrdhost_create(const char *hostname,
char *rrdpush_destination,
char *rrdpush_api_key,
char *rrdpush_send_charts_matching,
+ struct rrdhost_system_info *system_info,
int is_localhost
) {
debug(D_RRDHOST, "Host '%s': adding with guid '%s'", hostname, guid);
@@ -133,6 +134,10 @@ RRDHOST *rrdhost_create(const char *hostname,
host->rrd_update_every = (update_every > 0)?update_every:1;
host->rrd_history_entries = align_entries_to_pagesize(memory_mode, entries);
host->rrd_memory_mode = memory_mode;
+#ifdef ENABLE_DBENGINE
+ host->page_cache_mb = default_rrdeng_page_cache_mb;
+ host->disk_space_mb = default_rrdeng_disk_quota_mb;
+#endif
host->health_enabled = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled;
host->rrdpush_send_enabled = (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) ? 1 : 0;
host->rrdpush_send_destination = (host->rrdpush_send_enabled)?strdupz(rrdpush_destination):NULL;
@@ -157,6 +162,8 @@ RRDHOST *rrdhost_create(const char *hostname,
host->program_version = strdupz((program_version && *program_version)?program_version:"unknown");
host->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
+ host->system_info = rrdhost_system_info_dup(system_info);
+
avl_init_lock(&(host->rrdset_root_index), rrdset_compare);
avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name);
avl_init_lock(&(host->rrdfamily_root_index), rrdfamily_compare);
@@ -202,7 +209,8 @@ RRDHOST *rrdhost_create(const char *hostname,
snprintfz(filename, FILENAME_MAX, "%s/%s", netdata_configured_cache_dir, host->machine_guid);
host->cache_dir = strdupz(filename);
- if(host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
+ if(host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE ||
+ host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
int r = mkdir(host->cache_dir, 0775);
if(r != 0 && errno != EEXIST)
error("Host '%s': cannot create directory '%s'", host->hostname, host->cache_dir);
@@ -218,6 +226,30 @@ RRDHOST *rrdhost_create(const char *hostname,
}
}
+ if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+#ifdef ENABLE_DBENGINE
+ char dbenginepath[FILENAME_MAX + 1];
+ int ret;
+
+ snprintfz(dbenginepath, FILENAME_MAX, "%s/dbengine", host->cache_dir);
+ ret = mkdir(dbenginepath, 0775);
+ if(ret != 0 && errno != EEXIST)
+ error("Host '%s': cannot create directory '%s'", host->hostname, dbenginepath);
+ else
+ ret = rrdeng_init(&host->rrdeng_ctx, dbenginepath, host->page_cache_mb, host->disk_space_mb);
+ if(ret) {
+ error("Host '%s': cannot initialize host with machine guid '%s'. Failed to initialize DB engine at '%s'.",
+ host->hostname, host->machine_guid, host->cache_dir);
+ rrdhost_free(host);
+ host = NULL;
+ //rrd_hosts_available++; //TODO: maybe we want this?
+
+ return host;
+ }
+#else
+ fatal("RRD_MEMORY_MODE_DBENGINE is not supported in this platform.");
+#endif
+ }
if(host->health_enabled) {
snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir);
@@ -332,6 +364,7 @@ RRDHOST *rrdhost_find_or_create(
, char *rrdpush_destination
, char *rrdpush_api_key
, char *rrdpush_send_charts_matching
+ , struct rrdhost_system_info *system_info
) {
debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid);
@@ -355,6 +388,7 @@ RRDHOST *rrdhost_find_or_create(
, rrdpush_destination
, rrdpush_api_key
, rrdpush_send_charts_matching
+ , system_info
, 0
);
}
@@ -439,7 +473,7 @@ restart_after_removal:
// ----------------------------------------------------------------------------
// RRDHOST global / startup initialization
-void rrd_init(char *hostname) {
+void rrd_init(char *hostname, struct rrdhost_system_info *system_info) {
rrdset_free_obsolete_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup obsolete charts after seconds", rrdset_free_obsolete_time);
gap_when_lost_iterations_above = (int)config_get_number(CONFIG_SECTION_GLOBAL, "gap when lost iterations above", gap_when_lost_iterations_above);
if (gap_when_lost_iterations_above < 1)
@@ -468,6 +502,7 @@ void rrd_init(char *hostname) {
, default_rrdpush_destination
, default_rrdpush_api_key
, default_rrdpush_send_charts_matching
+ , system_info
, 1
);
rrd_unlock();
@@ -513,6 +548,27 @@ void __rrd_check_wrlock(const char *file, const char *function, const unsigned l
// ----------------------------------------------------------------------------
// RRDHOST - free
+void rrdhost_system_info_free(struct rrdhost_system_info *system_info) {
+ info("SYSTEM_INFO: free %p", system_info);
+
+ if(likely(system_info)) {
+ freez(system_info->os_name);
+ freez(system_info->os_id);
+ freez(system_info->os_id_like);
+ freez(system_info->os_version);
+ freez(system_info->os_version_id);
+ freez(system_info->os_detection);
+ freez(system_info->kernel_name);
+ freez(system_info->kernel_version);
+ freez(system_info->architecture);
+ freez(system_info->virtualization);
+ freez(system_info->virt_detection);
+ freez(system_info->container);
+ freez(system_info->container_detection);
+ freez(system_info);
+ }
+}
+
void rrdhost_free(RRDHOST *host) {
if(!host) return;
@@ -542,6 +598,12 @@ void rrdhost_free(RRDHOST *host) {
health_alarm_log_free(host);
+ if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+#ifdef ENABLE_DBENGINE
+ rrdeng_exit(host->rrdeng_ctx);
+#endif
+ }
+
// ------------------------------------------------------------------------
// remove it from the indexes
@@ -573,6 +635,7 @@ void rrdhost_free(RRDHOST *host) {
freez((void *)host->timezone);
freez(host->program_version);
freez(host->program_name);
+ rrdhost_system_info_free(host->system_info);
freez(host->cache_dir);
freez(host->varlib_dir);
freez(host->rrdpush_send_api_key);
@@ -744,3 +807,86 @@ restart_after_removal:
}
}
}
+
+// ----------------------------------------------------------------------------
+// RRDHOST - set system info from environment variables
+
+int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, char *name, char *value) {
+ if(!strcmp(name, "NETDATA_SYSTEM_OS_NAME")){
+ system_info->os_name = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_OS_ID")){
+ system_info->os_id = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE")){
+ system_info->os_id_like = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_OS_VERSION")){
+ system_info->os_version = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID")){
+ system_info->os_version_id = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION")){
+ system_info->os_detection = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_KERNEL_NAME")){
+ system_info->kernel_name = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_KERNEL_VERSION")){
+ system_info->kernel_version = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_ARCHITECTURE")){
+ system_info->architecture = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_VIRTUALIZATION")){
+ system_info->virtualization = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_VIRT_DETECTION")){
+ system_info->virt_detection = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_CONTAINER")){
+ system_info->container = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_SYSTEM_CONTAINER_DETECTION")){
+ system_info->container_detection = strdupz(value);
+ }
+ else return 1;
+
+ return 0;
+}
+
+struct rrdhost_system_info *rrdhost_system_info_dup(struct rrdhost_system_info *system_info) {
+ struct rrdhost_system_info *ret = callocz(1, sizeof(struct rrdhost_system_info));
+
+ if(likely(system_info)) {
+ if(system_info->os_name)
+ ret->os_name = strdupz(system_info->os_name);
+ if(system_info->os_id)
+ ret->os_id = strdupz(system_info->os_id);
+ if(system_info->os_id_like)
+ ret->os_id_like = strdupz(system_info->os_id_like);
+ if(system_info->os_version)
+ ret->os_version = strdupz(system_info->os_version);
+ if(system_info->os_version_id)
+ ret->os_version_id = strdupz(system_info->os_version_id);
+ if(system_info->os_detection)
+ ret->os_detection = strdupz(system_info->os_detection);
+ if(system_info->kernel_name)
+ ret->kernel_name = strdupz(system_info->kernel_name);
+ if(system_info->kernel_version)
+ ret->kernel_version = strdupz(system_info->kernel_version);
+ if(system_info->architecture)
+ ret->architecture = strdupz(system_info->architecture);
+ if(system_info->virtualization)
+ ret->virtualization = strdupz(system_info->virtualization);
+ if(system_info->virt_detection)
+ ret->virt_detection = strdupz(system_info->virt_detection);
+ if(system_info->container)
+ ret->container = strdupz(system_info->container);
+ if(system_info->container_detection)
+ ret->container_detection = strdupz(system_info->container_detection);
+ }
+
+ return ret;
+}
diff --git a/database/rrdset.c b/database/rrdset.c
index e04ffadcc..689591468 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -363,6 +363,7 @@ void rrdset_free(RRDSET *st) {
case RRD_MEMORY_MODE_SAVE:
case RRD_MEMORY_MODE_MAP:
case RRD_MEMORY_MODE_RAM:
+ case RRD_MEMORY_MODE_DBENGINE:
debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
munmap(st, st->memsize);
break;
@@ -541,6 +542,9 @@ RRDSET *rrdset_create_custom(
int enabled = config_get_boolean(config_section, "enabled", 1);
if(!enabled) entries = 5;
+ if(memory_mode == RRD_MEMORY_MODE_DBENGINE)
+ entries = config_set_number(config_section, "history", 5);
+
unsigned long size = sizeof(RRDSET);
char *cache_dir = rrdset_cache_dir(host, fullid, config_section);
@@ -552,9 +556,10 @@ RRDSET *rrdset_create_custom(
debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
- if(memory_mode == RRD_MEMORY_MODE_SAVE || memory_mode == RRD_MEMORY_MODE_MAP || memory_mode == RRD_MEMORY_MODE_RAM) {
+ if(memory_mode == RRD_MEMORY_MODE_SAVE || memory_mode == RRD_MEMORY_MODE_MAP ||
+ memory_mode == RRD_MEMORY_MODE_RAM || memory_mode == RRD_MEMORY_MODE_DBENGINE) {
st = (RRDSET *) mymmap(
- (memory_mode == RRD_MEMORY_MODE_RAM)?NULL:fullfilename
+ (memory_mode == RRD_MEMORY_MODE_RAM || memory_mode == RRD_MEMORY_MODE_DBENGINE)?NULL:fullfilename
, size
, ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE)
, 0
@@ -585,7 +590,7 @@ RRDSET *rrdset_create_custom(
st->alarms = NULL;
st->flags = 0x00000000;
- if(memory_mode == RRD_MEMORY_MODE_RAM) {
+ if(memory_mode == RRD_MEMORY_MODE_RAM || memory_mode == RRD_MEMORY_MODE_DBENGINE) {
memset(st, 0, size);
}
else {
@@ -631,7 +636,10 @@ RRDSET *rrdset_create_custom(
if(unlikely(!st)) {
st = callocz(1, size);
- st->rrd_memory_mode = (memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_ALLOC;
+ if (memory_mode == RRD_MEMORY_MODE_DBENGINE)
+ st->rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
+ else
+ st->rrd_memory_mode = (memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_ALLOC;
}
st->plugin_name = plugin?strdupz(plugin):NULL;
@@ -1052,12 +1060,14 @@ static inline size_t rrdset_done_interpolate(
}
if(unlikely(!store_this_entry)) {
- rd->values[current_entry] = SN_EMPTY_SLOT; //pack_storage_number(0, SN_NOT_EXISTS);
+ rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); //pack_storage_number(0, SN_NOT_EXISTS)
+// rd->values[current_entry] = SN_EMPTY_SLOT; //pack_storage_number(0, SN_NOT_EXISTS);
continue;
}
if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
- rd->values[current_entry] = pack_storage_number(new_value, storage_flags );
+ rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, storage_flags));
+// rd->values[current_entry] = pack_storage_number(new_value, storage_flags );
rd->last_stored_value = new_value;
#ifdef NETDATA_INTERNAL_CHECKS
@@ -1079,7 +1089,8 @@ static inline size_t rrdset_done_interpolate(
);
#endif
- rd->values[current_entry] = SN_EMPTY_SLOT; // pack_storage_number(0, SN_NOT_EXISTS);
+// rd->values[current_entry] = SN_EMPTY_SLOT; // pack_storage_number(0, SN_NOT_EXISTS);
+ rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); //pack_storage_number(0, SN_NOT_EXISTS)
rd->last_stored_value = NAN;
}
@@ -1119,11 +1130,16 @@ static inline size_t rrdset_done_interpolate(
// reset the storage flags for the next point, if any;
storage_flags = SN_EXISTS;
- counter++;
- current_entry = ((current_entry + 1) >= st->entries) ? 0 : current_entry + 1;
+ st->counter = ++counter;
+ st->current_entry = current_entry = ((current_entry + 1) >= st->entries) ? 0 : current_entry + 1;
+
+ st->last_updated.tv_sec = (time_t) (last_ut / USEC_PER_SEC);
+ st->last_updated.tv_usec = 0;
+
last_stored_ut = next_store_ut;
}
+/*
st->counter = counter;
st->current_entry = current_entry;
@@ -1131,6 +1147,7 @@ static inline size_t rrdset_done_interpolate(
st->last_updated.tv_sec = (time_t) (last_ut / USEC_PER_SEC);
st->last_updated.tv_usec = 0;
}
+*/
return stored_entries;
}
@@ -1201,7 +1218,8 @@ void rrdset_done(RRDSET *st) {
}
// check if the chart has a long time to be updated
- if(unlikely(st->usec_since_last_update > st->entries * update_every_ut)) {
+ if(unlikely(st->usec_since_last_update > st->entries * update_every_ut &&
+ st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) {
info("host '%s', chart %s: took too long to be updated (counter #%zu, update #%zu, %0.3" LONG_DOUBLE_MODIFIER " secs). Resetting it.", st->rrdhost->hostname, st->name, st->counter, st->counter_done, (LONG_DOUBLE)st->usec_since_last_update / USEC_PER_SEC);
rrdset_reset(st);
st->usec_since_last_update = update_every_ut;
@@ -1242,7 +1260,8 @@ void rrdset_done(RRDSET *st) {
}
// check if we will re-write the entire data set
- if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) {
+ if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut &&
+ st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) {
info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
rrdset_reset(st);
rrdset_init_last_updated_time(st);
@@ -1266,11 +1285,17 @@ void rrdset_done(RRDSET *st) {
// if we have not collected metrics this session (st->counter_done == 0)
// and we have collected metrics for this chart in the past (st->counter != 0)
// fill the gap (the chart has been just loaded from disk)
- if(unlikely(st->counter)) {
+ if(unlikely(st->counter) && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) {
rrdset_done_fill_the_gap(st);
last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
}
+ if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+ // set a fake last_updated to jump to current time
+ rrdset_init_last_updated_time(st);
+ last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
+ next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
+ }
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST))) {
store_this_entry = 1;