summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/Makefile.am4
-rw-r--r--database/engine/Makefile.in519
-rw-r--r--database/engine/README.md202
-rw-r--r--database/engine/datafile.c67
-rw-r--r--database/engine/datafile.h3
-rw-r--r--database/engine/journalfile.c39
-rw-r--r--database/engine/journalfile.h1
-rw-r--r--database/engine/metadata_log/Makefile.am8
-rw-r--r--database/engine/metadata_log/README.md0
-rw-r--r--database/engine/metadata_log/compaction.c86
-rw-r--r--database/engine/metadata_log/compaction.h14
-rw-r--r--database/engine/metadata_log/logfile.c453
-rw-r--r--database/engine/metadata_log/logfile.h39
-rw-r--r--database/engine/metadata_log/metadatalog.h28
-rwxr-xr-xdatabase/engine/metadata_log/metadatalogapi.c39
-rw-r--r--database/engine/metadata_log/metadatalogapi.h12
-rw-r--r--database/engine/metadata_log/metadatalogprotocol.h53
-rwxr-xr-xdatabase/engine/metadata_log/metalogpluginsd.c140
-rw-r--r--database/engine/metadata_log/metalogpluginsd.h33
-rw-r--r--database/engine/pagecache.c155
-rw-r--r--database/engine/pagecache.h9
-rw-r--r--database/engine/rrdengine.c587
-rw-r--r--database/engine/rrdengine.h65
-rwxr-xr-x[-rw-r--r--]database/engine/rrdengineapi.c353
-rw-r--r--database/engine/rrdengineapi.h25
-rw-r--r--database/engine/rrdenginelib.c119
-rw-r--r--database/engine/rrdenginelib.h57
-rw-r--r--database/engine/rrdenglocking.c56
28 files changed, 2259 insertions, 907 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am
index 161784b8f..43405001d 100644
--- a/database/engine/Makefile.am
+++ b/database/engine/Makefile.am
@@ -3,6 +3,10 @@
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+SUBDIRS = \
+ metadata_log \
+ $(NULL)
+
dist_noinst_DATA = \
README.md \
$(NULL)
diff --git a/database/engine/Makefile.in b/database/engine/Makefile.in
deleted file mode 100644
index 3c9b33089..000000000
--- a/database/engine/Makefile.in
+++ /dev/null
@@ -1,519 +0,0 @@
-# Makefile.in generated by automake 1.15.1 from Makefile.am.
-# @configure_input@
-
-# Copyright (C) 1994-2017 Free Software Foundation, Inc.
-
-# This Makefile.in is free software; the Free Software Foundation
-# gives unlimited permission to copy and/or distribute it,
-# with or without modifications, as long as this notice is preserved.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
-# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
-# PARTICULAR PURPOSE.
-
-@SET_MAKE@
-
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-VPATH = @srcdir@
-am__is_gnu_make = { \
- if test -z '$(MAKELEVEL)'; then \
- false; \
- elif test -n '$(MAKE_HOST)'; then \
- true; \
- elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \
- true; \
- else \
- false; \
- fi; \
-}
-am__make_running_with_option = \
- case $${target_option-} in \
- ?) ;; \
- *) echo "am__make_running_with_option: internal error: invalid" \
- "target option '$${target_option-}' specified" >&2; \
- exit 1;; \
- esac; \
- has_opt=no; \
- sane_makeflags=$$MAKEFLAGS; \
- if $(am__is_gnu_make); then \
- sane_makeflags=$$MFLAGS; \
- else \
- case $$MAKEFLAGS in \
- *\\[\ \ ]*) \
- bs=\\; \
- sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \
- | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \
- esac; \
- fi; \
- skip_next=no; \
- strip_trailopt () \
- { \
- flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \
- }; \
- for flg in $$sane_makeflags; do \
- test $$skip_next = yes && { skip_next=no; continue; }; \
- case $$flg in \
- *=*|--*) continue;; \
- -*I) strip_trailopt 'I'; skip_next=yes;; \
- -*I?*) strip_trailopt 'I';; \
- -*O) strip_trailopt 'O'; skip_next=yes;; \
- -*O?*) strip_trailopt 'O';; \
- -*l) strip_trailopt 'l'; skip_next=yes;; \
- -*l?*) strip_trailopt 'l';; \
- -[dEDm]) skip_next=yes;; \
- -[JT]) skip_next=yes;; \
- esac; \
- case $$flg in \
- *$$target_option*) has_opt=yes; break;; \
- esac; \
- done; \
- test $$has_opt = yes
-am__make_dryrun = (target_option=n; $(am__make_running_with_option))
-am__make_keepgoing = (target_option=k; $(am__make_running_with_option))
-pkgdatadir = $(datadir)/@PACKAGE@
-pkgincludedir = $(includedir)/@PACKAGE@
-pkglibdir = $(libdir)/@PACKAGE@
-pkglibexecdir = $(libexecdir)/@PACKAGE@
-am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
-install_sh_DATA = $(install_sh) -c -m 644
-install_sh_PROGRAM = $(install_sh) -c
-install_sh_SCRIPT = $(install_sh) -c
-INSTALL_HEADER = $(INSTALL_DATA)
-transform = $(program_transform_name)
-NORMAL_INSTALL = :
-PRE_INSTALL = :
-POST_INSTALL = :
-NORMAL_UNINSTALL = :
-PRE_UNINSTALL = :
-POST_UNINSTALL = :
-build_triplet = @build@
-host_triplet = @host@
-subdir = database/engine
-ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
-am__aclocal_m4_deps = $(top_srcdir)/build/m4/ax_c___atomic.m4 \
- $(top_srcdir)/build/m4/ax_c__generic.m4 \
- $(top_srcdir)/build/m4/ax_c_lto.m4 \
- $(top_srcdir)/build/m4/ax_c_mallinfo.m4 \
- $(top_srcdir)/build/m4/ax_c_mallopt.m4 \
- $(top_srcdir)/build/m4/ax_check_compile_flag.m4 \
- $(top_srcdir)/build/m4/ax_gcc_func_attribute.m4 \
- $(top_srcdir)/build/m4/ax_pthread.m4 \
- $(top_srcdir)/build/m4/jemalloc.m4 \
- $(top_srcdir)/build/m4/tcmalloc.m4 $(top_srcdir)/configure.ac
-am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
- $(ACLOCAL_M4)
-DIST_COMMON = $(srcdir)/Makefile.am $(dist_noinst_DATA) \
- $(am__DIST_COMMON)
-mkinstalldirs = $(install_sh) -d
-CONFIG_HEADER = $(top_builddir)/config.h
-CONFIG_CLEAN_FILES =
-CONFIG_CLEAN_VPATH_FILES =
-AM_V_P = $(am__v_P_@AM_V@)
-am__v_P_ = $(am__v_P_@AM_DEFAULT_V@)
-am__v_P_0 = false
-am__v_P_1 = :
-AM_V_GEN = $(am__v_GEN_@AM_V@)
-am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@)
-am__v_GEN_0 = @echo " GEN " $@;
-am__v_GEN_1 =
-AM_V_at = $(am__v_at_@AM_V@)
-am__v_at_ = $(am__v_at_@AM_DEFAULT_V@)
-am__v_at_0 = @
-am__v_at_1 =
-SOURCES =
-DIST_SOURCES =
-am__can_run_installinfo = \
- case $$AM_UPDATE_INFO_DIR in \
- n|no|NO) false;; \
- *) (install-info --version) >/dev/null 2>&1;; \
- esac
-DATA = $(dist_noinst_DATA)
-am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP)
-am__DIST_COMMON = $(srcdir)/Makefile.in
-DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
-ACLOCAL = @ACLOCAL@
-AMTAR = @AMTAR@
-AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@
-AUTOCONF = @AUTOCONF@
-AUTOHEADER = @AUTOHEADER@
-AUTOMAKE = @AUTOMAKE@
-AWK = @AWK@
-CC = @CC@
-CCDEPMODE = @CCDEPMODE@
-CFLAGS = @CFLAGS@
-CMOCKA_CFLAGS = @CMOCKA_CFLAGS@
-CMOCKA_LIBS = @CMOCKA_LIBS@
-CPP = @CPP@
-CPPFLAGS = @CPPFLAGS@
-CUPSCONFIG = @CUPSCONFIG@
-CXX = @CXX@
-CXXDEPMODE = @CXXDEPMODE@
-CXXFLAGS = @CXXFLAGS@
-CXX_BINARY = @CXX_BINARY@
-CYGPATH_W = @CYGPATH_W@
-DEFS = @DEFS@
-DEPDIR = @DEPDIR@
-ECHO_C = @ECHO_C@
-ECHO_N = @ECHO_N@
-ECHO_T = @ECHO_T@
-EGREP = @EGREP@
-ENABLE_UNITTESTS = @ENABLE_UNITTESTS@
-EXEEXT = @EXEEXT@
-GREP = @GREP@
-INSTALL = @INSTALL@
-INSTALL_DATA = @INSTALL_DATA@
-INSTALL_PROGRAM = @INSTALL_PROGRAM@
-INSTALL_SCRIPT = @INSTALL_SCRIPT@
-INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
-IPMIMONITORING_CFLAGS = @IPMIMONITORING_CFLAGS@
-IPMIMONITORING_LIBS = @IPMIMONITORING_LIBS@
-JSON_CFLAGS = @JSON_CFLAGS@
-JSON_LIBS = @JSON_LIBS@
-LDFLAGS = @LDFLAGS@
-LIBCAP_CFLAGS = @LIBCAP_CFLAGS@
-LIBCAP_LIBS = @LIBCAP_LIBS@
-LIBCRYPTO_CFLAGS = @LIBCRYPTO_CFLAGS@
-LIBCRYPTO_LIBS = @LIBCRYPTO_LIBS@
-LIBCURL_CFLAGS = @LIBCURL_CFLAGS@
-LIBCURL_LIBS = @LIBCURL_LIBS@
-LIBMNL_CFLAGS = @LIBMNL_CFLAGS@
-LIBMNL_LIBS = @LIBMNL_LIBS@
-LIBMONGOC_CFLAGS = @LIBMONGOC_CFLAGS@
-LIBMONGOC_LIBS = @LIBMONGOC_LIBS@
-LIBOBJS = @LIBOBJS@
-LIBS = @LIBS@
-LIBSSL_CFLAGS = @LIBSSL_CFLAGS@
-LIBSSL_LIBS = @LIBSSL_LIBS@
-LTLIBOBJS = @LTLIBOBJS@
-MAINT = @MAINT@
-MAKEINFO = @MAKEINFO@
-MATH_CFLAGS = @MATH_CFLAGS@
-MATH_LIBS = @MATH_LIBS@
-MKDIR_P = @MKDIR_P@
-NFACCT_CFLAGS = @NFACCT_CFLAGS@
-NFACCT_LIBS = @NFACCT_LIBS@
-OBJEXT = @OBJEXT@
-OPTIONAL_CUPS_CFLAGS = @OPTIONAL_CUPS_CFLAGS@
-OPTIONAL_CUPS_LIBS = @OPTIONAL_CUPS_LIBS@
-OPTIONAL_IPMIMONITORING_CFLAGS = @OPTIONAL_IPMIMONITORING_CFLAGS@
-OPTIONAL_IPMIMONITORING_LIBS = @OPTIONAL_IPMIMONITORING_LIBS@
-OPTIONAL_JSONC_LIBS = @OPTIONAL_JSONC_LIBS@
-OPTIONAL_JUDY_LIBS = @OPTIONAL_JUDY_LIBS@
-OPTIONAL_KINESIS_CFLAGS = @OPTIONAL_KINESIS_CFLAGS@
-OPTIONAL_KINESIS_LIBS = @OPTIONAL_KINESIS_LIBS@
-OPTIONAL_LIBCAP_CFLAGS = @OPTIONAL_LIBCAP_CFLAGS@
-OPTIONAL_LIBCAP_LIBS = @OPTIONAL_LIBCAP_LIBS@
-OPTIONAL_LZ4_LIBS = @OPTIONAL_LZ4_LIBS@
-OPTIONAL_MATH_CFLAGS = @OPTIONAL_MATH_CFLAGS@
-OPTIONAL_MATH_LIBS = @OPTIONAL_MATH_LIBS@
-OPTIONAL_MONGOC_CFLAGS = @OPTIONAL_MONGOC_CFLAGS@
-OPTIONAL_MONGOC_LIBS = @OPTIONAL_MONGOC_LIBS@
-OPTIONAL_NFACCT_CFLAGS = @OPTIONAL_NFACCT_CFLAGS@
-OPTIONAL_NFACCT_LIBS = @OPTIONAL_NFACCT_LIBS@
-OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS = @OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS@
-OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS = @OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS@
-OPTIONAL_SSL_LIBS = @OPTIONAL_SSL_LIBS@
-OPTIONAL_UUID_CFLAGS = @OPTIONAL_UUID_CFLAGS@
-OPTIONAL_UUID_LIBS = @OPTIONAL_UUID_LIBS@
-OPTIONAL_UV_LIBS = @OPTIONAL_UV_LIBS@
-OPTIONAL_XENSTAT_CFLAGS = @OPTIONAL_XENSTAT_CFLAGS@
-OPTIONAL_XENSTAT_LIBS = @OPTIONAL_XENSTAT_LIBS@
-OPTIONAL_ZLIB_CFLAGS = @OPTIONAL_ZLIB_CFLAGS@
-OPTIONAL_ZLIB_LIBS = @OPTIONAL_ZLIB_LIBS@
-PACKAGE = @PACKAGE@
-PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
-PACKAGE_NAME = @PACKAGE_NAME@
-PACKAGE_RPM_VERSION = @PACKAGE_RPM_VERSION@
-PACKAGE_STRING = @PACKAGE_STRING@
-PACKAGE_TARNAME = @PACKAGE_TARNAME@
-PACKAGE_URL = @PACKAGE_URL@
-PACKAGE_VERSION = @PACKAGE_VERSION@
-PATH_SEPARATOR = @PATH_SEPARATOR@
-PKG_CONFIG = @PKG_CONFIG@
-PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@
-PKG_CONFIG_PATH = @PKG_CONFIG_PATH@
-PROTOBUF_CFLAGS = @PROTOBUF_CFLAGS@
-PROTOBUF_LIBS = @PROTOBUF_LIBS@
-PROTOC = @PROTOC@
-PTHREAD_CC = @PTHREAD_CC@
-PTHREAD_CFLAGS = @PTHREAD_CFLAGS@
-PTHREAD_LIBS = @PTHREAD_LIBS@
-SET_MAKE = @SET_MAKE@
-SHELL = @SHELL@
-SSE_CANDIDATE = @SSE_CANDIDATE@
-STRIP = @STRIP@
-TEST_CFLAGS = @TEST_CFLAGS@
-TEST_LIBS = @TEST_LIBS@
-UUID_CFLAGS = @UUID_CFLAGS@
-UUID_LIBS = @UUID_LIBS@
-VERSION = @VERSION@
-XENLIGHT_CFLAGS = @XENLIGHT_CFLAGS@
-XENLIGHT_LIBS = @XENLIGHT_LIBS@
-YAJL_CFLAGS = @YAJL_CFLAGS@
-YAJL_LIBS = @YAJL_LIBS@
-ZLIB_CFLAGS = @ZLIB_CFLAGS@
-ZLIB_LIBS = @ZLIB_LIBS@
-abs_builddir = @abs_builddir@
-abs_srcdir = @abs_srcdir@
-abs_top_builddir = @abs_top_builddir@
-abs_top_srcdir = @abs_top_srcdir@
-ac_ct_CC = @ac_ct_CC@
-ac_ct_CXX = @ac_ct_CXX@
-am__include = @am__include@
-am__leading_dot = @am__leading_dot@
-am__quote = @am__quote@
-am__tar = @am__tar@
-am__untar = @am__untar@
-ax_pthread_config = @ax_pthread_config@
-bindir = @bindir@
-build = @build@
-build_alias = @build_alias@
-build_cpu = @build_cpu@
-build_os = @build_os@
-build_target = @build_target@
-build_vendor = @build_vendor@
-builddir = @builddir@
-cachedir = @cachedir@
-chartsdir = @chartsdir@
-configdir = @configdir@
-datadir = @datadir@
-datarootdir = @datarootdir@
-docdir = @docdir@
-dvidir = @dvidir@
-exec_prefix = @exec_prefix@
-has_jemalloc = @has_jemalloc@
-has_tcmalloc = @has_tcmalloc@
-host = @host@
-host_alias = @host_alias@
-host_cpu = @host_cpu@
-host_os = @host_os@
-host_vendor = @host_vendor@
-htmldir = @htmldir@
-includedir = @includedir@
-infodir = @infodir@
-install_sh = @install_sh@
-libconfigdir = @libconfigdir@
-libdir = @libdir@
-libexecdir = @libexecdir@
-localedir = @localedir@
-localstatedir = @localstatedir@
-logdir = @logdir@
-mandir = @mandir@
-mkdir_p = @mkdir_p@
-nodedir = @nodedir@
-oldincludedir = @oldincludedir@
-pdfdir = @pdfdir@
-pluginsdir = @pluginsdir@
-prefix = @prefix@
-program_transform_name = @program_transform_name@
-psdir = @psdir@
-pythondir = @pythondir@
-registrydir = @registrydir@
-runstatedir = @runstatedir@
-sbindir = @sbindir@
-sharedstatedir = @sharedstatedir@
-srcdir = @srcdir@
-sysconfdir = @sysconfdir@
-target_alias = @target_alias@
-top_build_prefix = @top_build_prefix@
-top_builddir = @top_builddir@
-top_srcdir = @top_srcdir@
-varlibdir = @varlibdir@
-webdir = @webdir@
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-dist_noinst_DATA = \
- README.md \
- $(NULL)
-
-all: all-am
-
-.SUFFIXES:
-$(srcdir)/Makefile.in: @MAINTAINER_MODE_TRUE@ $(srcdir)/Makefile.am $(am__configure_deps)
- @for dep in $?; do \
- case '$(am__configure_deps)' in \
- *$$dep*) \
- ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \
- && { if test -f $@; then exit 0; else break; fi; }; \
- exit 1;; \
- esac; \
- done; \
- echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu database/engine/Makefile'; \
- $(am__cd) $(top_srcdir) && \
- $(AUTOMAKE) --gnu database/engine/Makefile
-Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
- @case '$?' in \
- *config.status*) \
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
- *) \
- echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \
- cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \
- esac;
-
-$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-
-$(top_srcdir)/configure: @MAINTAINER_MODE_TRUE@ $(am__configure_deps)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-$(ACLOCAL_M4): @MAINTAINER_MODE_TRUE@ $(am__aclocal_m4_deps)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-$(am__aclocal_m4_deps):
-tags TAGS:
-
-ctags CTAGS:
-
-cscope cscopelist:
-
-
-distdir: $(DISTFILES)
- @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
- topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
- list='$(DISTFILES)'; \
- dist_files=`for file in $$list; do echo $$file; done | \
- sed -e "s|^$$srcdirstrip/||;t" \
- -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \
- case $$dist_files in \
- */*) $(MKDIR_P) `echo "$$dist_files" | \
- sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \
- sort -u` ;; \
- esac; \
- for file in $$dist_files; do \
- if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
- if test -d $$d/$$file; then \
- dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \
- if test -d "$(distdir)/$$file"; then \
- find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
- fi; \
- if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
- cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \
- find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
- fi; \
- cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \
- else \
- test -f "$(distdir)/$$file" \
- || cp -p $$d/$$file "$(distdir)/$$file" \
- || exit 1; \
- fi; \
- done
-check-am: all-am
-check: check-am
-all-am: Makefile $(DATA)
-installdirs:
-install: install-am
-install-exec: install-exec-am
-install-data: install-data-am
-uninstall: uninstall-am
-
-install-am: all-am
- @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
-
-installcheck: installcheck-am
-install-strip:
- if test -z '$(STRIP)'; then \
- $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
- install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
- install; \
- else \
- $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
- install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
- "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \
- fi
-mostlyclean-generic:
-
-clean-generic:
-
-distclean-generic:
- -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
- -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES)
-
-maintainer-clean-generic:
- @echo "This command is intended for maintainers to use"
- @echo "it deletes files that may require special tools to rebuild."
- -test -z "$(MAINTAINERCLEANFILES)" || rm -f $(MAINTAINERCLEANFILES)
-clean: clean-am
-
-clean-am: clean-generic mostlyclean-am
-
-distclean: distclean-am
- -rm -f Makefile
-distclean-am: clean-am distclean-generic
-
-dvi: dvi-am
-
-dvi-am:
-
-html: html-am
-
-html-am:
-
-info: info-am
-
-info-am:
-
-install-data-am:
-
-install-dvi: install-dvi-am
-
-install-dvi-am:
-
-install-exec-am:
-
-install-html: install-html-am
-
-install-html-am:
-
-install-info: install-info-am
-
-install-info-am:
-
-install-man:
-
-install-pdf: install-pdf-am
-
-install-pdf-am:
-
-install-ps: install-ps-am
-
-install-ps-am:
-
-installcheck-am:
-
-maintainer-clean: maintainer-clean-am
- -rm -f Makefile
-maintainer-clean-am: distclean-am maintainer-clean-generic
-
-mostlyclean: mostlyclean-am
-
-mostlyclean-am: mostlyclean-generic
-
-pdf: pdf-am
-
-pdf-am:
-
-ps: ps-am
-
-ps-am:
-
-uninstall-am:
-
-.MAKE: install-am install-strip
-
-.PHONY: all all-am check check-am clean clean-generic cscopelist-am \
- ctags-am distclean distclean-generic distdir dvi dvi-am html \
- html-am info info-am install install-am install-data \
- install-data-am install-dvi install-dvi-am install-exec \
- install-exec-am install-html install-html-am install-info \
- install-info-am install-man install-pdf install-pdf-am \
- install-ps install-ps-am install-strip installcheck \
- installcheck-am installdirs maintainer-clean \
- maintainer-clean-generic mostlyclean mostlyclean-generic pdf \
- pdf-am ps ps-am tags-am uninstall uninstall-am
-
-.PRECIOUS: Makefile
-
-
-# Tell versions [3.59,3.63) of GNU make to not export all variables.
-# Otherwise a system limit (for SysV at least) may be exceeded.
-.NOEXPORT:
diff --git a/database/engine/README.md b/database/engine/README.md
index 78f3b15ec..191366a46 100644
--- a/database/engine/README.md
+++ b/database/engine/README.md
@@ -1,93 +1,102 @@
-# Database engine
-
-The Database Engine works like a traditional database. There is some amount of RAM dedicated to data caching and
-indexing and the rest of the data reside compressed on disk. The number of history entries is not fixed in this case,
-but depends on the configured disk space and the effective compression ratio of the data stored. This is the **only
-mode** that supports changing the data collection update frequency (`update_every`) **without losing** the previously
-stored metrics.
-
-## Files
+<!--
+title: "Database engine"
+description: "Netdata's highly-efficient database engine use both RAM and disk for distributed, long-term storage of per-second metrics."
+custom_edit_url: https://github.com/netdata/netdata/edit/master/database/engine/README.md
+-->
-With the DB engine memory mode the metric data are stored in database files. These files are organized in pairs, the
-datafiles and their corresponding journalfiles, e.g.:
+# Database engine
-```sh
-datafile-1-0000000001.ndf
-journalfile-1-0000000001.njf
-datafile-1-0000000002.ndf
-journalfile-1-0000000002.njf
-datafile-1-0000000003.ndf
-journalfile-1-0000000003.njf
-...
-```
+The Database Engine works like a traditional database. It dedicates a certain amount of RAM to data caching and
+indexing, while the rest of the data resides compressed on disk. Unlike other [memory modes](/database/README.md), the
+amount of historical metrics stored is based on the amount of disk space you allocate and the effective compression
+ratio, not a fixed number of metrics collected.
-They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default
-location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user
-can safely delete some pairs of files when Netdata is stopped to manually free up some space.
+By using both RAM and disk space, the database engine allows for long-term storage of per-second metrics inside of the
+Agent itself.
-_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._
+In addition, the database engine is the only memory mode that supports changing the data collection update frequency
+(`update_every`) without losing the metrics your Agent already gathered and stored.
## Configuration
-There is one DB engine instance per Netdata host/node. That is, there is one `./dbengine` folder per node, and all
-charts of `dbengine` memory mode in such a host share the same storage space and DB engine instance memory state. You
-can select the memory mode for localhost by editing netdata.conf and setting:
+To use the database engine, open `netdata.conf` and set `memory mode` to `dbengine`.
```conf
[global]
memory mode = dbengine
```
-For setting the memory mode for the rest of the nodes you should look at
-[streaming](../../streaming/).
+To configure the database engine, look for the `page cache size` and `dbengine multihost disk space` settings in the
+`[global]` section of your `netdata.conf`. The Agent ignores the `history` setting when using the database engine.
-The `history` configuration option is meaningless for `memory mode = dbengine` and is ignored for any metrics being
-stored in the DB engine.
+```conf
+[global]
+ page cache size = 32
+ dbengine multihost disk space = 256
+```
+
+The above values are the default values for Page Cache size and DB engine disk space quota. Both numbers are
+in **MiB**.
+
+The `page cache size` option determines the amount of RAM in **MiB** dedicated to caching Netdata metric values. The
+actual page cache size will be slightly larger than this figure—see the [memory requirements](#memory-requirements)
+section for details.
-All DB engine instances, for localhost and all other streaming recipient nodes inherit their configuration from
-`netdata.conf`:
+The `dbengine multihost disk space` option determines the amount of disk space in **MiB** that is dedicated to storing
+Netdata metric values and all related metadata describing them. You can use the [**database engine
+calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics)
+to correctly set `dbengine multihost disk space` based on your metrics retention policy. The calculator gives an
+accurate estimate based on how many child nodes you have, how many metrics your Agent collects, and more.
+
+### Legacy configuration
+
+The deprecated `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated to storing
+Netdata metric values per legacy database engine instance (see [details on the legacy mode](#legacy-mode) below).
```conf
[global]
- page cache size = 32
dbengine disk space = 256
```
-The above values are the default and minimum values for Page Cache size and DB engine disk space quota. Both numbers are
-in **MiB**. All DB engine instances will allocate the configured resources separately.
+### Streaming metrics to the database engine
-The `page cache size` option determines the amount of RAM in **MiB** that is dedicated to caching Netdata metric values
-themselves as far as queries are concerned. The total page cache size will be greater since data collection itself will
-consume additional memory as is described in the [Memory requirements](#memory-requirements) section.
+When using the multihost database engine, all parent and child nodes share the same `page cache size` and `dbengine
+multihost disk space` in a single dbengine instance. The [**database engine
+calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics)
+helps you properly set `page cache size` and `dbengine multihost disk space` on your parent node to allocate enough
+resources based on your metrics retention policy and how many child nodes you have.
-The `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated to storing Netdata
-metric values and all related metadata describing them.
+#### Legacy mode
-## Operation
+_For Netdata Agents earlier than v1.23.2_, the Agent on the parent node uses one dbengine instance for itself, and
+another instance for every child node it receives metrics from. If you had four streaming nodes, you would have five
+instances in total (`1 parent + 4 child nodes = 5 instances`).
-The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store
-consecutive values generated from the data collectors. Those pages comprise the **Page Cache**.
+The Agent allocates resources for each instance separately using the `dbengine disk space` (**deprecated**) setting. If
+`dbengine disk space`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space, which
+means the total disk space required to store all instances is, roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`.
-When those pages fill up they are slowly compressed and flushed to disk. It can take `4096 / 4 = 1024 seconds = 17
-minutes`, for a chart dimension that is being collected every 1 second, to fill a page. Pages can be cut short when we
-stop Netdata or the DB engine instance so as to not lose the data. When we query the DB engine for data we trigger disk
-read I/O requests that fill the Page Cache with the requested pages and potentially evict cold (not recently used)
-pages.
+#### Backward compatibility
-When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting
-the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated
-and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time.
+All existing metrics belonging to child nodes are automatically converted to legacy dbengine instances and the localhost
+metrics are transferred to the multihost dbengine instance.
-The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O
-traffic so as to create the minimum possible interference with other applications.
+All new child nodes are automatically transferred to the multihost dbengine instance and share its page cache and disk
+space. If you want to migrate a child node from its legacy dbengine instance to the multihost dbengine instance, you
+must delete the instance's directory, which is located in `/var/cache/netdata/MACHINE_GUID/dbengine`, after stopping the
+Agent.
+
+##### Information
+
+For more information about setting `memory mode` on your nodes, in addition to other streaming configurations, see
+[streaming](/streaming/README.md).
-## Memory requirements
+### Memory requirements
Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the
available memory.
-There are explicit memory requirements **per** DB engine **instance**, meaning **per** Netdata **node** (e.g. localhost
-and streaming recipient nodes):
+There are explicit memory requirements **per** DB engine **instance**:
- The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what
the user configured with `page cache size`.
@@ -99,18 +108,30 @@ and streaming recipient nodes):
- for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space
footprint.
-An important observation is that RAM usage depends on both the `page cache size` and the `dbengine disk space` options.
+An important observation is that RAM usage depends on both the `page cache size` and the `dbengine multihost disk space`
+options.
-## File descriptor requirements
+You can use our [database engine
+calculator](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics)
+to validate the memory requirements for your particular system(s) and configuration (**out-of-date**).
-The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming slave or master
-server). When configuring your system you should make sure there are at least 50 file descriptors available per
+### Disk space requirements
+
+There are explicit disk space requirements **per** DB engine **instance**:
+
+- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what
+ the user configured with `dbengine multihost disk space` or `dbengine disk space`.
+
+### File descriptor requirements
+
+The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or
+parent server). When configuring your system you should make sure there are at least 50 file descriptors available per
`dbengine` instance.
Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% of
the file descriptors that are available to the Netdata service are accessible by dbengine instances. You should take
that into account when configuring your service or system-wide file descriptor limits. You can roughly estimate that the
-Netdata service needs 2048 file descriptors for every 10 streaming slave hosts when streaming is configured to use
+Netdata service needs 2048 file descriptors for every 10 streaming child hosts when streaming is configured to use
`memory mode = dbengine`.
If for example one wants to allocate 65536 file descriptors to the Netdata service on a systemd system one needs to
@@ -143,12 +164,53 @@ kern.maxfiles=65536
You can apply the settings by running `sysctl -p` or by rebooting.
+## Files
+
+With the DB engine memory mode the metric data are stored in database files. These files are organized in pairs, the
+datafiles and their corresponding journalfiles, e.g.:
+
+```sh
+datafile-1-0000000001.ndf
+journalfile-1-0000000001.njf
+datafile-1-0000000002.ndf
+journalfile-1-0000000002.njf
+datafile-1-0000000003.ndf
+journalfile-1-0000000003.njf
+...
+```
+
+They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default
+location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user
+can safely delete some pairs of files when Netdata is stopped to manually free up some space.
+
+_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._ You can also set up
+one or more [exporting connectors](/exporting/README.md) to send your Netdata metrics to other databases for long-term
+storage at lower granularity.
+
+## Operation
+
+The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store
+consecutive values generated from the data collectors. Those pages comprise the **Page Cache**.
+
+When those pages fill up they are slowly compressed and flushed to disk. It can take `4096 / 4 = 1024 seconds = 17
+minutes`, for a chart dimension that is being collected every 1 second, to fill a page. Pages can be cut short when we
+stop Netdata or the DB engine instance so as to not lose the data. When we query the DB engine for data we trigger disk
+read I/O requests that fill the Page Cache with the requested pages and potentially evict cold (not recently used)
+pages.
+
+When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting
+the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated
+and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time.
+
+The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O
+traffic so as to create the minimum possible interference with other applications.
+
## Evaluation
-We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the
-web API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a
-netdata master server. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a
-Seagate Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD.
+We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the web
+API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a Netdata
+parent node. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a Seagate
+Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD.
For our workload, we defined 32 charts with 128 metrics each, giving us a total of 4096 metrics. We defined 1 worker
thread per chart (32 threads) that generates new data points with a data generation interval of 1 second. The time axis
@@ -170,10 +232,10 @@ so as to avoid all disk bottlenecks.
The reported numbers are the following:
| device | page cache | dataset | reads/sec | writes/sec |
-| :---: | :---: | ---: | ---: | ---: |
-| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M |
-| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M |
-| N/A | 16 GiB | 6.8 GiB |118.2M | 30.2M |
+| :----: | :--------: | ------: | --------: | ---------: |
+| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M |
+| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M |
+| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M |
where "reads/sec" is the number of metric data points being read from the database via its API per second and
"writes/sec" is the number of metric data points being written to the database per second.
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 8ef4ed599..7a052f963 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -30,7 +30,7 @@ void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_dataf
struct rrdengine_datafile *next;
next = datafile->next;
- assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
+ fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
ctx->datafiles.first = next;
}
@@ -38,7 +38,7 @@ void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_dataf
static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
unsigned tier, unsigned fileno)
{
- assert(tier == 1);
+ fatal_assert(tier == 1);
datafile->tier = tier;
datafile->fileno = fileno;
datafile->file = (uv_file)0;
@@ -75,6 +75,27 @@ int close_data_file(struct rrdengine_datafile *datafile)
return ret;
}
+int unlink_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return ret;
+}
int destroy_data_file(struct rrdengine_datafile *datafile)
{
@@ -146,7 +167,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- assert(req.result < 0);
+ fatal_assert(req.result < 0);
error("uv_fs_write: %s", uv_strerror(ret));
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
@@ -184,7 +205,7 @@ static int check_data_file_superblock(uv_file file)
uv_fs_req_cleanup(&req);
goto error;
}
- assert(req.result >= 0);
+ fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
@@ -271,7 +292,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
if (ret < 0) {
- assert(req.result < 0);
+ fatal_assert(req.result < 0);
uv_fs_req_cleanup(&req);
error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
++ctx->stats.fs_errors;
@@ -305,33 +326,47 @@ static int scan_data_files(struct rrdengine_instance *ctx)
ctx->last_fileno = datafiles[matched_files - 1]->fileno;
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ uint8_t must_delete_pair = 0;
+
datafile = datafiles[i];
ret = load_data_file(datafile);
if (0 != ret) {
- freez(datafile);
- ++failed_to_load;
- break;
+ must_delete_pair = 1;
}
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
- close_data_file(datafile);
- freez(datafile);
+ if (!must_delete_pair) /* If datafile is still open close it */
+ close_data_file(datafile);
+ must_delete_pair = 1;
+ }
+ if (must_delete_pair) {
+ char path[RRDENG_PATH_MAX];
+
+ error("Deleting invalid data and journal file pair.");
+ ret = unlink_journal_file(journalfile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Deleted journal file \"%s\".", path);
+ }
+ ret = unlink_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Deleted data file \"%s\".", path);
+ }
freez(journalfile);
+ freez(datafile);
++failed_to_load;
- break;
+ continue;
}
+
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
}
+ matched_files -= failed_to_load;
freez(datafiles);
- if (failed_to_load) {
- error("%u datafiles failed to load.", failed_to_load);
- finalize_data_files(ctx);
- return UV_EIO;
- }
return matched_files;
}
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index eeb11310b..ae94bfdd0 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -14,7 +14,7 @@ struct rrdengine_instance;
#define DATAFILE_EXTENSION ".ndf"
#define MAX_DATAFILE_SIZE (1073741824LU)
-#define MIN_DATAFILE_SIZE (16777216LU)
+#define MIN_DATAFILE_SIZE (4194304LU)
#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
#define TARGET_DATAFILES (20)
@@ -57,6 +57,7 @@ extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengin
extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
extern int close_data_file(struct rrdengine_datafile *datafile);
+extern int unlink_data_file(struct rrdengine_datafile *datafile);
extern int destroy_data_file(struct rrdengine_datafile *datafile);
extern int create_data_file(struct rrdengine_datafile *datafile);
extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index fac680aa0..9fecc48ff 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -52,7 +52,7 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
journalfile->pos, flush_transaction_buffer_cb);
- assert (-1 != ret);
+ fatal_assert(-1 != ret);
journalfile->pos += RRDENG_BLOCK_SIZE;
ctx->disk_space += RRDENG_BLOCK_SIZE;
ctx->commit_log.buf = NULL;
@@ -64,9 +64,9 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
{
struct rrdengine_instance *ctx = wc->ctx;
int ret;
- unsigned buf_pos, buf_size;
+ unsigned buf_pos = 0, buf_size;
- assert(size);
+ fatal_assert(size);
if (ctx->commit_log.buf) {
unsigned remaining;
@@ -125,6 +125,29 @@ int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengi
return ret;
}
+int unlink_journal_file(struct rrdengine_journalfile *journalfile)
+{
+ struct rrdengine_datafile *datafile = journalfile->datafile;
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return ret;
+}
+
int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
@@ -194,7 +217,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- assert(req.result < 0);
+ fatal_assert(req.result < 0);
error("uv_fs_write: %s", uv_strerror(ret));
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
@@ -232,7 +255,7 @@ static int check_journal_file_superblock(uv_file file)
uv_fs_req_cleanup(&req);
goto error;
}
- assert(req.result >= 0);
+ fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
@@ -275,7 +298,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
for (i = 0, valid_pages = 0 ; i < count ; ++i) {
uuid_t *temp_id;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
if (PAGE_METRICS != jf_metric_data->descr[i].type) {
error("Unknown page type encountered.");
@@ -293,7 +316,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
/* First time we see the UUID */
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
- assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(temp_id);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
@@ -408,7 +431,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
fatal("uv_fs_read: %s", uv_strerror(ret));
/*uv_fs_req_cleanup(&req);*/
}
- assert(req.result >= 0);
+ fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
ctx->stats.io_read_bytes += size_bytes;
++ctx->stats.io_read_requests;
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index 0df66304d..f6c43cd16 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -38,6 +38,7 @@ extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct r
extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+extern int unlink_journal_file(struct rrdengine_journalfile *journalfile);
extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
diff --git a/database/engine/metadata_log/Makefile.am b/database/engine/metadata_log/Makefile.am
new file mode 100644
index 000000000..161784b8f
--- /dev/null
+++ b/database/engine/metadata_log/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/database/engine/metadata_log/README.md b/database/engine/metadata_log/README.md
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/database/engine/metadata_log/README.md
diff --git a/database/engine/metadata_log/compaction.c b/database/engine/metadata_log/compaction.c
new file mode 100644
index 000000000..ba19e1edf
--- /dev/null
+++ b/database/engine/metadata_log/compaction.c
@@ -0,0 +1,86 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+
+/* Return 0 on success. */
+int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
+ unsigned *matched_files)
+{
+ int ret;
+ unsigned starting_fileno, fileno, i, j, recovered_files;
+ struct metadata_logfile *metalogfile = NULL, *compactionfile = NULL, **tmp_metalogfiles;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ for (i = 0 ; i < *matched_files ; ++i) {
+ metalogfile = metalogfiles[i];
+ if (0 == metalogfile->starting_fileno)
+ continue; /* skip standard metadata log files */
+ break; /* this is a compaction temporary file */
+ }
+ if (i == *matched_files) /* no recovery needed */
+ return 0;
+ info("Starting metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+
+ if (*matched_files - i > 1) { /* Can't have more than 1 temporary compaction files */
+ error("Metadata log files are in an invalid state. Cannot proceed.");
+ return 1;
+ }
+ compactionfile = metalogfile;
+ starting_fileno = compactionfile->starting_fileno;
+ fileno = compactionfile->fileno;
+ /* scratchpad space to move file pointers around */
+ tmp_metalogfiles = callocz(*matched_files, sizeof(*tmp_metalogfiles));
+
+ for (j = 0, recovered_files = 0 ; j < i ; ++j) {
+ metalogfile = metalogfiles[j];
+ fatal_assert(0 == metalogfile->starting_fileno);
+ if (metalogfile->fileno < starting_fileno) {
+ tmp_metalogfiles[recovered_files++] = metalogfile;
+ continue;
+ }
+ break; /* reached compaction file serial number */
+ }
+
+ if ((j == i) /* Shouldn't be possible, invalid compaction temporary file */ ||
+ (metalogfile->fileno == starting_fileno && metalogfile->fileno == fileno)) {
+ error("Deleting invalid compaction temporary file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, starting_fileno, fileno);
+ unlink_metadata_logfile(compactionfile);
+ freez(compactionfile);
+ freez(tmp_metalogfiles);
+ --*matched_files; /* delete the last one */
+
+ info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+ return 0;
+ }
+
+ for ( ; j < i ; ++j) { /* continue iterating through normal metadata log files */
+ metalogfile = metalogfiles[j];
+ fatal_assert(0 == metalogfile->starting_fileno);
+ if (metalogfile->fileno < fileno) { /* It has already been compacted */
+ error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, 0U, metalogfile->fileno);
+ unlink_metadata_logfile(metalogfile);
+ freez(metalogfile);
+ continue;
+ }
+ tmp_metalogfiles[recovered_files++] = metalogfile;
+ }
+
+ /* compaction temporary file is valid */
+ tmp_metalogfiles[recovered_files++] = compactionfile;
+ ret = rename_metadata_logfile(compactionfile, 0, starting_fileno);
+ if (ret < 0) {
+ error("Cannot rename temporary compaction files. Cannot proceed.");
+ freez(tmp_metalogfiles);
+ return 1;
+ }
+
+ memcpy(metalogfiles, tmp_metalogfiles, recovered_files * sizeof(*metalogfiles));
+ *matched_files = recovered_files;
+ freez(tmp_metalogfiles);
+
+ info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+ return 0;
+}
diff --git a/database/engine/metadata_log/compaction.h b/database/engine/metadata_log/compaction.h
new file mode 100644
index 000000000..d04613440
--- /dev/null
+++ b/database/engine/metadata_log/compaction.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_COMPACTION_H
+#define NETDATA_COMPACTION_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include "../rrdengine.h"
+
+extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
+ unsigned *matched_files);
+
+#endif /* NETDATA_COMPACTION_H */
diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c
new file mode 100644
index 000000000..b7c5c0618
--- /dev/null
+++ b/database/engine/metadata_log/logfile.c
@@ -0,0 +1,453 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include <database/sqlite/sqlite_functions.h>
+#include "metadatalog.h"
+#include "metalogpluginsd.h"
+
+
+void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION,
+ metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
+}
+
+void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno,
+ unsigned fileno)
+{
+ metalogfile->starting_fileno = starting_fileno;
+ metalogfile->fileno = fileno;
+ metalogfile->file = (uv_file)0;
+ metalogfile->pos = 0;
+ metalogfile->next = NULL;
+ metalogfile->ctx = ctx;
+}
+
+int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno)
+{
+ //struct metalog_instance *ctx = metalogfile->ctx;
+ uv_fs_t req;
+ int ret;
+ char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX];
+ unsigned backup_starting_fileno, backup_fileno;
+
+ backup_starting_fileno = metalogfile->starting_fileno;
+ backup_fileno = metalogfile->fileno;
+ generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath));
+ metalogfile->starting_fileno = new_starting_fileno;
+ metalogfile->fileno = new_fileno;
+ generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath));
+
+ info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath);
+ ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL);
+ if (ret < 0) {
+ error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret));
+ //++ctx->stats.fs_errors; /* this is racy, may miss some errors */
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ /* restore previous values */
+ metalogfile->starting_fileno = backup_starting_fileno;
+ metalogfile->fileno = backup_fileno;
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+int unlink_metadata_logfile(struct metadata_logfile *metalogfile)
+{
+ //struct metalog_instance *ctx = metalogfile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_metadata_logfile_path(metalogfile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+static int check_metadata_logfile_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_metalog_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ if (superblock->version > RRDENG_METALOG_VER) {
+ error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version);
+ }
+error:
+ free(superblock);
+ return ret;
+}
+
+void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload)
+{
+ struct metalog_instance *ctx = metalogfile->ctx;
+ char *line, *nextline, *record_end;
+ int ret;
+
+ debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload);
+ record_end = (char *)payload + header->payload_length - 1;
+ *record_end = '\0';
+
+ for (line = payload ; line ; line = nextline) {
+ nextline = strchr(line, '\n');
+ if (nextline) {
+ *nextline++ = '\0';
+ }
+ ret = parser_action(ctx->metalog_parser_object->parser, line);
+ debug(D_METADATALOG, "parser_action ret:%d", ret);
+ if (ret)
+ return; /* skip record due to error */
+ };
+}
+
+/* This function only works with buffered I/O */
+static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset)
+{
+// struct metalog_instance *ctx;
+ uv_file file;
+ uv_buf_t iov;
+ uv_fs_t req;
+ int ret;
+
+// ctx = metalogfile->ctx;
+ file = metalogfile->file;
+ iov = uv_buf_init(buf, len);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL);
+ if (unlikely(ret < 0 && ret != req.result)) {
+ fatal("uv_fs_read: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+// ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__,
+ uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno);
+ }
+ uv_fs_req_cleanup(&req);
+// ctx->stats.io_read_bytes += len;
+// ++ctx->stats.io_read_requests;
+
+ return ret;
+}
+
+/* Return 0 on success */
+static int metadata_record_integrity_check(void *record)
+{
+ int ret;
+ uint32_t data_size;
+ struct rrdeng_metalog_record_header *header;
+ struct rrdeng_metalog_record_trailer *trailer;
+ uLong crc;
+
+ header = record;
+ data_size = header->header_length + header->payload_length;
+ trailer = record + data_size;
+
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, record, data_size);
+ ret = crc32cmp(trailer->checksum, crc);
+
+ return ret;
+}
+
+#define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */
+
+/*
+ * Iterates metadata log file records and creates database objects (host/chart/dimension)
+ */
+static void iterate_records(struct metadata_logfile *metalogfile)
+{
+ uint32_t file_size, pos, bytes_remaining, record_size;
+ void *buf;
+ struct rrdeng_metalog_record_header *header;
+ struct metalog_instance *ctx = metalogfile->ctx;
+ struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private;
+ const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) +
+ sizeof(header->header_length);
+
+ file_size = metalogfile->pos;
+ state->metalogfile = metalogfile;
+
+ buf = mallocz(MAX_READ_BYTES);
+
+ for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) {
+ bytes_remaining = file_size - pos;
+ if (bytes_remaining < min_header_size) {
+ error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ break;
+ }
+ if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0)
+ break;
+ header = (struct rrdeng_metalog_record_header *)buf;
+ if (METALOG_STORE_PADDING == header->type) {
+ info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos;
+ continue;
+ }
+ if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size,
+ pos + min_header_size) < 0)
+ break;
+ record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer);
+ if (header->header_length < min_header_size || record_size > bytes_remaining) {
+ error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ break;
+ }
+ if (record_size > MAX_READ_BYTES) {
+ error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size,
+ metalogfile->starting_fileno, metalogfile->fileno);
+ continue;
+ }
+ if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header),
+ pos + sizeof(*header)) < 0)
+ break;
+ if (metadata_record_integrity_check(buf)) {
+ error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos);
+ continue;
+ }
+ debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__,
+ pos);
+
+ replay_record(metalogfile, header, buf + header->header_length);
+ }
+
+ freez(buf);
+}
+
+int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile)
+{
+ UNUSED(ctx);
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd, error;
+ uint64_t file_size;
+ char path[RRDENG_PATH_MAX];
+
+ generate_metadata_logfile_path(metalogfile, path, sizeof(path));
+ if (file_is_migrated(path))
+ return 0;
+
+ fd = open_file_buffered_io(path, O_RDWR, &file);
+ if (fd < 0) {
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ info("Loading metadata log \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb));
+ if (ret)
+ goto error;
+
+ ret = check_metadata_logfile_superblock(file);
+ if (ret)
+ goto error;
+// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
+// ++ctx->stats.io_read_requests;
+
+ metalogfile->file = file;
+ metalogfile->pos = file_size;
+
+ iterate_records(metalogfile);
+
+ info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size);
+ add_migrated_file(path, file_size);
+ return 0;
+
+error:
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ return error;
+}
+
+static int scan_metalog_files_cmp(const void *a, const void *b)
+{
+ struct metadata_logfile *file1, *file2;
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
+
+ file1 = *(struct metadata_logfile **)a;
+ file2 = *(struct metadata_logfile **)b;
+ generate_metadata_logfile_path(file1, path1, sizeof(path1));
+ generate_metadata_logfile_path(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of metadata logfiles that were loaded or < 0 on error */
+static int scan_metalog_files(struct metalog_instance *ctx)
+{
+ int ret;
+ unsigned starting_no, no, matched_files, i, failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct metadata_logfile **metalogfiles, *metalogfile;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
+ info("Found %d files in path %s", ret, dbfiles_path);
+
+ metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s/%s\"", dbfiles_path, dent.name);
+ ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no);
+ if (2 == ret) {
+ info("Matched file \"%s/%s\"", dbfiles_path, dent.name);
+ metalogfile = mallocz(sizeof(*metalogfile));
+ metadata_logfile_init(metalogfile, ctx, starting_no, no);
+ metalogfiles[matched_files++] = metalogfile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (0 == matched_files) {
+ freez(metalogfiles);
+ return 0;
+ }
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp);
+ ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files);
+ if (ret) { /* If the files are corrupted fail */
+ for (i = 0 ; i < matched_files ; ++i) {
+ freez(metalogfiles[i]);
+ }
+ freez(metalogfiles);
+ return UV_EINVAL;
+ }
+ //ctx->last_fileno = metalogfiles[matched_files - 1]->fileno;
+
+ struct plugind cd = {
+ .enabled = 1,
+ .update_every = 0,
+ .pid = 0,
+ .serial_failures = 0,
+ .successful_collections = 0,
+ .obsolete = 0,
+ .started_t = INVALID_TIME,
+ .next = NULL,
+ .version = 0,
+ };
+
+ struct metalog_pluginsd_state metalog_parser_state;
+ metalog_pluginsd_state_init(&metalog_parser_state, ctx);
+
+ PARSER_USER_OBJECT metalog_parser_object;
+ metalog_parser_object.enabled = cd.enabled;
+ metalog_parser_object.host = ctx->rrdeng_ctx->host;
+ metalog_parser_object.cd = &cd;
+ metalog_parser_object.trust_durations = 0;
+ metalog_parser_object.private = &metalog_parser_state;
+
+ PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT);
+ if (unlikely(!parser)) {
+ error("Failed to initialize metadata log parser.");
+ failed_to_load = matched_files;
+ goto after_failed_to_parse;
+ }
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone);
+ parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action;
+ parser->plugins_action->chart_action = &metalog_pluginsd_chart_action;
+ parser->plugins_action->guid_action = &metalog_pluginsd_guid_action;
+ parser->plugins_action->context_action = &metalog_pluginsd_context_action;
+ parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action;
+ parser->plugins_action->host_action = &metalog_pluginsd_host_action;
+
+
+ metalog_parser_object.parser = parser;
+ ctx->metalog_parser_object = &metalog_parser_object;
+
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ metalogfile = metalogfiles[i];
+ db_lock();
+ db_execute("BEGIN TRANSACTION;");
+ ret = load_metadata_logfile(ctx, metalogfile);
+ if (0 != ret) {
+ error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
+ unlink_metadata_logfile(metalogfile);
+ ++failed_to_load;
+ db_execute("ROLLBACK TRANSACTION;");
+ }
+ else
+ db_execute("COMMIT TRANSACTION;");
+ db_unlock();
+ freez(metalogfile);
+ }
+ matched_files -= failed_to_load;
+ debug(D_METADATALOG, "PARSER ended");
+
+ parser_destroy(parser);
+
+ size_t count __maybe_unused = metalog_parser_object.count;
+
+ debug(D_METADATALOG, "Parsing count=%u", (unsigned)count);
+after_failed_to_parse:
+
+ freez(metalogfiles);
+
+ return matched_files;
+}
+
+/* Return 0 on success. */
+int init_metalog_files(struct metalog_instance *ctx)
+{
+ int ret;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ ret = scan_metalog_files(ctx);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", dbfiles_path);
+ return ret;
+ }/* else if (0 == ret) {
+ ctx->last_fileno = 1;
+ }*/
+
+ return 0;
+}
diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h
new file mode 100644
index 000000000..df12ac714
--- /dev/null
+++ b/database/engine/metadata_log/logfile.h
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_LOGFILE_H
+#define NETDATA_LOGFILE_H
+
+#include "metadatalogprotocol.h"
+#include "../rrdengine.h"
+
+/* Forward declarations */
+struct metadata_logfile;
+struct metalog_worker_config;
+
+#define METALOG_PREFIX "metadatalog-"
+#define METALOG_EXTENSION ".mlf"
+
+/* only one event loop is supported for now */
+struct metadata_logfile {
+ unsigned fileno; /* Starts at 1 */
+ unsigned starting_fileno; /* 0 for normal files, staring number during compaction */
+ uv_file file;
+ uint64_t pos;
+ struct metalog_instance *ctx;
+ struct metadata_logfile *next;
+};
+
+struct metadata_logfile_list {
+ struct metadata_logfile *first; /* oldest */
+ struct metadata_logfile *last; /* newest */
+};
+
+extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen);
+extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno,
+ unsigned new_fileno);
+extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile);
+extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile);
+extern int init_metalog_files(struct metalog_instance *ctx);
+
+
+#endif /* NETDATA_LOGFILE_H */
diff --git a/database/engine/metadata_log/metadatalog.h b/database/engine/metadata_log/metadatalog.h
new file mode 100644
index 000000000..b484686de
--- /dev/null
+++ b/database/engine/metadata_log/metadatalog.h
@@ -0,0 +1,28 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOG_H
+#define NETDATA_METADATALOG_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include "../rrdengine.h"
+#include "metadatalogprotocol.h"
+#include "logfile.h"
+#include "metadatalogapi.h"
+#include "compaction.h"
+
+/* Forward declerations */
+struct metalog_instance;
+struct parser_user_object;
+
+#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u"
+#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u"
+
+struct metalog_instance {
+ struct rrdengine_instance *rrdeng_ctx;
+ struct parser_user_object *metalog_parser_object;
+ uint8_t initialized; /* set to 1 to mark context initialized */
+};
+
+#endif /* NETDATA_METADATALOG_H */
diff --git a/database/engine/metadata_log/metadatalogapi.c b/database/engine/metadata_log/metadatalogapi.c
new file mode 100755
index 000000000..b206cca05
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogapi.c
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+
+/*
+ * Returns 0 on success, negative on error
+ */
+int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx)
+{
+ struct metalog_instance *ctx;
+ int error;
+
+ ctx = callocz(1, sizeof(*ctx));
+ ctx->initialized = 0;
+ rrdeng_parent_ctx->metalog_ctx = ctx;
+
+ ctx->rrdeng_ctx = rrdeng_parent_ctx;
+ error = init_metalog_files(ctx);
+ if (error) {
+ goto error_after_init_rrd_files;
+ }
+ ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */
+ return 0;
+
+error_after_init_rrd_files:
+ freez(ctx);
+ return UV_EIO;
+}
+
+/* This function is called by dbengine rotation logic when the metric has no writers */
+void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid)
+{
+ uuid_t multihost_uuid;
+
+ delete_dimension_uuid(metric_uuid);
+ rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid);
+ delete_dimension_uuid(&multihost_uuid);
+}
diff --git a/database/engine/metadata_log/metadatalogapi.h b/database/engine/metadata_log/metadatalogapi.h
new file mode 100644
index 000000000..d558b9317
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogapi.h
@@ -0,0 +1,12 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOGAPI_H
+#define NETDATA_METADATALOGAPI_H
+
+extern void metalog_commit_delete_chart(RRDSET *st);
+extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid);
+
+/* must call once before using anything */
+extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx);
+
+#endif /* NETDATA_METADATALOGAPI_H */
diff --git a/database/engine/metadata_log/metadatalogprotocol.h b/database/engine/metadata_log/metadatalogprotocol.h
new file mode 100644
index 000000000..1017213ae
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogprotocol.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOGPROTOCOL_H
+#define NETDATA_METADATALOGPROTOCOL_H
+
+#include "../rrddiskprotocol.h"
+
+#define RRDENG_METALOG_MAGIC "netdata-metadata-log"
+
+#define RRDENG_METALOG_VER (1)
+
+#define RRDENG_METALOG_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + sizeof(uint16_t)))
+/*
+ * Metadata log persistent super-block
+ */
+struct rrdeng_metalog_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ uint16_t version;
+ uint8_t padding[RRDENG_METALOG_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Metadata log record types
+ */
+#define METALOG_STORE_PADDING (0)
+#define METALOG_CREATE_OBJECT (1)
+#define METALOG_DELETE_OBJECT (2)
+#define METALOG_OTHER (3) /* reserved */
+
+/*
+ * Metadata log record header
+ */
+struct rrdeng_metalog_record_header {
+ /* when set to METALOG_STORE_PADDING jump to start of next block */
+ uint8_t type;
+
+ uint16_t header_length;
+ uint32_t payload_length;
+ /******************************************************
+ * No fields above this point can ever change. *
+ ******************************************************
+ * All fields below this point are subject to change. *
+ ******************************************************/
+} __attribute__ ((packed));
+
+/*
+ * Metadata log record trailer
+ */
+struct rrdeng_metalog_record_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+#endif /* NETDATA_METADATALOGPROTOCOL_H */
diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c
new file mode 100755
index 000000000..88c1453a9
--- /dev/null
+++ b/database/engine/metadata_log/metalogpluginsd.c
@@ -0,0 +1,140 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+#include "metalogpluginsd.h"
+
+extern struct config stream_config;
+
+PARSER_RC metalog_pluginsd_host_action(
+ void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone,
+ char *tags)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
+ if (host) {
+ if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) {
+ error("Archived host '%s' has memory mode '%s', but the archived one is '%s'. Ignoring archived state.",
+ host->hostname, rrd_memory_mode_name(host->rrd_memory_mode),
+ rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE));
+ ((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */
+ }
+ ((PARSER_USER_OBJECT *) user)->host = host;
+ return PARSER_RC_OK;
+ }
+
+ if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) {
+ ((PARSER_USER_OBJECT *) user)->host = host;
+ return PARSER_RC_OK;
+ }
+
+ if (likely(!uuid_parse(machine_guid, state->host_uuid))) {
+ int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags);
+ if (unlikely(rc)) {
+ errno = 0;
+ error("Failed to store host %s with UUID %s in the database", hostname, machine_guid);
+ }
+ }
+ else {
+ errno = 0;
+ error("Host machine GUID %s is not valid", machine_guid);
+ }
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context,
+ char *title, char *units, char *plugin, char *module, int priority,
+ int update_every, RRDSET_TYPE chart_type, char *options)
+{
+ UNUSED(options);
+
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+ RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+
+ if (unlikely(uuid_is_null(state->host_uuid))) {
+ debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host.");
+ return PARSER_RC_OK;
+ }
+ uuid_copy(state->chart_uuid, state->uuid);
+ uuid_clear(state->uuid); /* Consume UUID */
+ (void) sql_store_chart(&state->chart_uuid, &state->host_uuid,
+ type, id, name, family, context, title, units,
+ plugin, module, priority, update_every,
+ chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 1;
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm,
+ long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+ UNUSED(user);
+ UNUSED(options);
+ UNUSED(algorithm);
+ UNUSED(st);
+
+ if (unlikely(uuid_is_null(state->chart_uuid))) {
+ debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart.");
+ info("Ignoring dimension belonging to missing or ignored chart.");
+ return PARSER_RC_OK;
+ }
+
+ if (unlikely(uuid_is_null(state->uuid))) {
+ debug(D_METADATALOG, "Ignoring dimension without unknown UUID");
+ info("Ignoring dimension without unknown UUID");
+ return PARSER_RC_OK;
+ }
+
+ (void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type);
+ uuid_clear(state->uuid); /* Consume UUID */
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ uuid_copy(state->uuid, *uuid);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ int rc = find_uuid_type(uuid);
+
+ if (rc == 1) {
+ uuid_copy(state->host_uuid, *uuid);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 0;
+ ((PARSER_USER_OBJECT *)user)->host_exists = 1;
+ } else if (rc == 2) {
+ uuid_copy(state->chart_uuid, *uuid);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 1;
+ } else
+ uuid_copy(state->uuid, *uuid);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid)
+{
+ UNUSED(user);
+ UNUSED(uuid);
+
+ return PARSER_RC_OK;
+}
+
+void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx)
+{
+ state->ctx = ctx;
+ state->skip_record = 0;
+ uuid_clear(state->uuid);
+ state->metalogfile = NULL;
+}
diff --git a/database/engine/metadata_log/metalogpluginsd.h b/database/engine/metadata_log/metalogpluginsd.h
new file mode 100644
index 000000000..96808aaa2
--- /dev/null
+++ b/database/engine/metadata_log/metalogpluginsd.h
@@ -0,0 +1,33 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METALOGPLUGINSD_H
+#define NETDATA_METALOGPLUGINSD_H
+
+#include "../../../collectors/plugins.d/pluginsd_parser.h"
+#include "../../../collectors/plugins.d/plugins_d.h"
+#include "../../../parser/parser.h"
+
+struct metalog_pluginsd_state {
+ struct metalog_instance *ctx;
+ uuid_t uuid;
+ uuid_t host_uuid;
+ uuid_t chart_uuid;
+ uint8_t skip_record; /* skip this record due to errors in parsing */
+ struct metadata_logfile *metalogfile; /* current metadata log file being replayed */
+};
+
+extern void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx);
+
+extern PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family,
+ char *context, char *title, char *units, char *plugin, char *module,
+ int priority, int update_every, RRDSET_TYPE chart_type, char *options);
+extern PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm,
+ long multiplier, long divisor, char *options,
+ RRD_ALGORITHM algorithm_type);
+extern PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC metalog_pluginsd_host_action(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, char *tags);
+
+#endif /* NETDATA_METALOGPLUGINSD_H */
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 449138b4d..a18207100 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -101,6 +101,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
uv_cond_broadcast(&pg_cache_descr->cond);
}
+void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+}
+
/*
* The caller must hold page descriptor lock.
* The lock will be released and re-acquired. The descriptor is not guaranteed
@@ -116,6 +123,24 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
}
/*
+ * The caller must hold page descriptor lock.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
+ */
+int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
+{
+ int ret;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
+ --pg_cache_descr->waiters;
+
+ return ret;
+}
+
+/*
* Returns page flags.
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
@@ -135,10 +160,8 @@ unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_
/*
* The caller must hold page descriptor lock.
- * Gets a reference to the page descriptor.
- * Returns 1 on success and 0 on failure.
*/
-int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
@@ -146,25 +169,25 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_acces
(exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
- if (exclusive_access)
- pg_cache_descr->flags |= RRD_PAGE_LOCKED;
- ++pg_cache_descr->refcnt;
return 1;
}
/*
* The caller must hold page descriptor lock.
- * Same return values as pg_cache_try_get_unsafe() without doing anything.
+ * Gets a reference to the page descriptor.
+ * Returns 1 on success and 0 on failure.
*/
-int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && pg_cache_descr->refcnt)) {
+ if (!pg_cache_can_get_unsafe(descr, exclusive_access))
return 0;
- }
+
+ if (exclusive_access)
+ pg_cache_descr->flags |= RRD_PAGE_LOCKED;
+ ++pg_cache_descr->refcnt;
return 1;
}
@@ -212,23 +235,31 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb
/*
* This function returns the maximum number of pages allowed in the page cache.
- * The caller must hold the page cache lock.
*/
unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
{
/* it's twice the number of producers since we pin 2 pages per producer */
- return ctx->max_cache_pages + 2 * (unsigned long)ctx->stats.metric_API_producers;
+ return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers;
}
/*
* This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
* number of pages below that number.
- * The caller must hold the page cache lock.
*/
unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
{
/* it's twice the number of producers since we pin 2 pages per producer */
- return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->stats.metric_API_producers;
+ return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers;
+}
+
+/*
+ * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
+ * cache.
+ */
+unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
+{
+ /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
+ return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
}
/*
@@ -370,31 +401,52 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
return 0;
}
-void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty)
+/**
+ * Deletes a page from the database.
+ * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
+ * @param ctx is the database instance.
+ * @param descr is the page descriptor.
+ * @param remove_dirty must be non-zero if the page to be deleted is dirty.
+ * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
+ * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
+ * NULL. Otherwise, metric_id is not set.
+ * @return 1 if it's safe to delete the metric, 0 otherwise.
+ */
+uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
+ uint8_t is_exclusive_holder, uuid_t *metric_id)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct page_cache_descr *pg_cache_descr = NULL;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
int ret;
+ uint8_t can_delete_metric = 0;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
- assert(NULL != PValue);
+ fatal_assert(NULL != PValue);
page_index = *PValue;
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
uv_rwlock_wrlock(&page_index->lock);
ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
- uv_rwlock_wrunlock(&page_index->lock);
if (unlikely(0 == ret)) {
+ uv_rwlock_wrunlock(&page_index->lock);
error("Page under deletion was not in index.");
if (unlikely(debug_flags & D_RRDENGINE)) {
print_page_descr(descr);
}
goto destroy;
}
- assert(1 == ret);
+ --page_index->page_count;
+ if (!page_index->writers && !page_index->page_count) {
+ can_delete_metric = 1;
+ if (metric_id) {
+ memcpy(metric_id, page_index->id, sizeof(uuid_t));
+ }
+ }
+ uv_rwlock_wrunlock(&page_index->lock);
+ fatal_assert(1 == ret);
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
++ctx->stats.pg_cache_deletions;
@@ -403,13 +455,18 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
- while (!pg_cache_try_get_unsafe(descr, 1)) {
- debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
+ if (!is_exclusive_holder) {
+ /* If we don't hold an exclusive page reference get one */
+ while (!pg_cache_try_get_unsafe(descr, 1)) {
+ debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
}
- if (!remove_dirty) {
+ if (remove_dirty) {
+ pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
+ } else {
/* even a locked page could be dirty */
while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
@@ -429,11 +486,16 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
pg_cache_put(ctx, descr);
-
- rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+ while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
+ (void)sleep_usec(1000); /* 1 msec */
+ }
destroy:
freez(descr);
pg_cache_update_metric_times(page_index);
+
+ return can_delete_metric;
}
static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
@@ -521,7 +583,7 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
uv_rwlock_rdunlock(&page_index->lock);
if (unlikely(NULL == firstPValue)) {
- assert(NULL == lastPValue);
+ fatal_assert(NULL == lastPValue);
page_index->oldest_time = page_index->latest_time = INVALID_TIME;
return;
}
@@ -542,7 +604,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
/* there is page cache descriptor pre-allocated state */
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
+ fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
pg_cache_reserve_pages(ctx, 1);
if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
@@ -553,7 +615,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
if (unlikely(NULL == index)) {
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
- assert(NULL != PValue);
+ fatal_assert(NULL != PValue);
page_index = *PValue;
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
} else {
@@ -563,6 +625,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
uv_rwlock_wrlock(&page_index->lock);
PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
*PValue = descr;
+ ++page_index->page_count;
pg_cache_add_new_metric_time(page_index, descr);
uv_rwlock_wrunlock(&page_index->lock);
@@ -577,7 +640,7 @@ usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
@@ -617,7 +680,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c
Word_t Index;
(void)pg_cache;
- assert(NULL != page_index);
+ fatal_assert(NULL != page_index);
Index = (Word_t)(point_in_time / USEC_PER_SEC);
uv_rwlock_rdlock(&page_index->lock);
@@ -658,11 +721,11 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
unsigned i, j, k, preload_count, count, page_info_array_max_size;
unsigned long flags;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
Word_t Index;
uint8_t failed_to_reserve;
- assert(NULL != ret_page_indexp);
+ fatal_assert(NULL != ret_page_indexp);
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
@@ -804,7 +867,7 @@ struct rrdeng_page_descr *
struct page_cache_descr *pg_cache_descr = NULL;
unsigned long flags;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
Word_t Index;
uint8_t page_not_in_cache;
@@ -911,7 +974,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
struct page_cache_descr *pg_cache_descr = NULL;
unsigned long flags;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
uint8_t page_not_in_cache;
if (unlikely(NULL == index)) {
@@ -1003,10 +1066,12 @@ struct pg_cache_page_index *create_page_index(uuid_t *id)
page_index = mallocz(sizeof(*page_index));
page_index->JudyL_array = (Pvoid_t) NULL;
uuid_copy(page_index->id, *id);
- assert(0 == uv_rwlock_init(&page_index->lock));
+ fatal_assert(0 == uv_rwlock_init(&page_index->lock));
page_index->oldest_time = INVALID_TIME;
page_index->latest_time = INVALID_TIME;
page_index->prev = NULL;
+ page_index->page_count = 0;
+ page_index->writers = 0;
return page_index;
}
@@ -1017,7 +1082,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx)
pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
pg_cache->metrics_index.last_page_index = NULL;
- assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
}
static void init_replaceQ(struct rrdengine_instance *ctx)
@@ -1026,7 +1091,7 @@ static void init_replaceQ(struct rrdengine_instance *ctx)
pg_cache->replaceQ.head = NULL;
pg_cache->replaceQ.tail = NULL;
- assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
}
static void init_committed_page_index(struct rrdengine_instance *ctx)
@@ -1034,7 +1099,7 @@ static void init_committed_page_index(struct rrdengine_instance *ctx)
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
- assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
pg_cache->committed_page_index.latest_corr_id = 0;
pg_cache->committed_page_index.nr_committed_pages = 0;
}
@@ -1045,7 +1110,7 @@ void init_page_cache(struct rrdengine_instance *ctx)
pg_cache->page_descriptors = 0;
pg_cache->populated_pages = 0;
- assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
init_metrics_index(ctx);
init_replaceQ(ctx);
@@ -1064,7 +1129,7 @@ void free_page_cache(struct rrdengine_instance *ctx)
/* Free committed page index */
ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
- assert(NULL == pg_cache->committed_page_index.JudyL_array);
+ fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
bytes_freed += ret_Judy;
for (page_index = pg_cache->metrics_index.last_page_index ;
@@ -1099,14 +1164,14 @@ void free_page_cache(struct rrdengine_instance *ctx)
/* Free page index */
ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
- assert(NULL == page_index->JudyL_array);
+ fatal_assert(NULL == page_index->JudyL_array);
bytes_freed += ret_Judy;
freez(page_index);
bytes_freed += sizeof(*page_index);
}
/* Free metrics index */
ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
- assert(NULL == pg_cache->metrics_index.JudyHS_array);
+ fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
bytes_freed += ret_Judy;
info("Freed %lu bytes of memory from page cache.", bytes_freed);
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 7d8fa2a1d..31e9739da 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -85,6 +85,8 @@ struct pg_cache_page_index {
* TODO: examine if we want to support better granularity than seconds
*/
Pvoid_t JudyL_array;
+ Word_t page_count;
+ unsigned short writers;
uv_rwlock_t lock;
/*
@@ -148,6 +150,7 @@ struct page_cache { /* TODO: add statistics */
};
extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
+extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
@@ -162,7 +165,8 @@ extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
struct rrdeng_page_descr *descr);
-extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty);
+extern uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+ uint8_t remove_dirty, uint8_t is_exclusive_holder, uuid_t *metric_id);
extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
usec_t start_time, usec_t end_time);
extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
@@ -184,6 +188,7 @@ extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index,
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
+extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx);
static inline void
pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)
@@ -214,7 +219,7 @@ static inline void
/* The caller must hold a reference to the page and must have already set the new data */
static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length)
{
- assert(!(end_time & 1));
+ fatal_assert(!(end_time & 1));
__sync_synchronize();
descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */
__sync_synchronize();
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index b6b6548ec..43135ff01 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -6,9 +6,10 @@
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
-rrdeng_stats_t global_flushing_errors = 0;
+rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
+rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
-void sanity_check(void)
+static void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
@@ -26,10 +27,188 @@ void sanity_check(void)
/* page count must fit in 8 bits */
BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
+ /* extent cache count must fit in 32 bits */
+ BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32);
+
/* page info scratch space must be able to hold 2 32-bit integers */
BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t));
}
+/* always inserts into tail */
+static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+
+ xt_cache_elem->prev = NULL;
+ xt_cache_elem->next = NULL;
+
+ if (likely(NULL != xt_cache->replaceQ_tail)) {
+ xt_cache_elem->prev = xt_cache->replaceQ_tail;
+ xt_cache->replaceQ_tail->next = xt_cache_elem;
+ }
+ if (unlikely(NULL == xt_cache->replaceQ_head)) {
+ xt_cache->replaceQ_head = xt_cache_elem;
+ }
+ xt_cache->replaceQ_tail = xt_cache_elem;
+}
+
+static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *prev, *next;
+
+ prev = xt_cache_elem->prev;
+ next = xt_cache_elem->next;
+
+ if (likely(NULL != prev)) {
+ prev->next = next;
+ }
+ if (likely(NULL != next)) {
+ next->prev = prev;
+ }
+ if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) {
+ xt_cache->replaceQ_head = next;
+ }
+ if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) {
+ xt_cache->replaceQ_tail = prev;
+ }
+ xt_cache_elem->prev = xt_cache_elem->next = NULL;
+}
+
+static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ xt_cache_replaceQ_insert(wc, xt_cache_elem);
+}
+
+/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */
+static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ unsigned idx;
+ int ret;
+
+ ret = find_first_zero(xt_cache->allocation_bitmap);
+ if (-1 == ret || ret >= MAX_CACHED_EXTENTS) {
+ for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) {
+ idx = xt_cache_elem - xt_cache->extent_array;
+ if (!check_bit(xt_cache->inflight_bitmap, idx)) {
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ break;
+ }
+ }
+ if (NULL == xt_cache_elem)
+ return -1;
+ } else {
+ idx = (unsigned)ret;
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ }
+ xt_cache_elem->extent = extent;
+ xt_cache_elem->fileno = extent->datafile->fileno;
+ xt_cache_elem->inflight_io_descr = NULL;
+ xt_cache_replaceQ_insert(wc, xt_cache_elem);
+ modify_bit(&xt_cache->allocation_bitmap, idx, 1);
+
+ return (int)idx;
+}
+
+/**
+ * Returns 0 if the cached extent was found in the extent cache, 1 otherwise.
+ * Sets *idx to point to the position of the extent inside the cache.
+ **/
+static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ unsigned i;
+
+ for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) {
+ xt_cache_elem = &xt_cache->extent_array[i];
+ if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent &&
+ xt_cache_elem->fileno == extent->datafile->fileno) {
+ *idx = i;
+ return 0;
+ }
+ }
+ return 1;
+}
+
+#if 0 /* disabled code */
+static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ xt_cache_elem->extent = NULL;
+ modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */
+ modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */
+}
+#endif
+
+void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx,
+ struct extent_io_descriptor *xt_io_descr)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ struct extent_io_descriptor *old_next;
+
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ old_next = xt_cache_elem->inflight_io_descr->next;
+ xt_cache_elem->inflight_io_descr->next = xt_io_descr;
+ xt_io_descr->next = old_next;
+}
+
+void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr)
+{
+ unsigned i, j, page_offset;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ void *page;
+ struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
+
+ for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
+ page = mallocz(RRDENG_BLOCK_SIZE);
+ descr = xt_io_descr->descr_array[i];
+ for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) {
+ /* care, we don't hold the descriptor mutex */
+ if (!uuid_compare(*extent->pages[j]->id, *descr->id) &&
+ extent->pages[j]->page_length == descr->page_length &&
+ extent->pages[j]->start_time == descr->start_time &&
+ extent->pages[j]->end_time == descr->end_time) {
+ break;
+ }
+ page_offset += extent->pages[j]->page_length;
+
+ }
+ /* care, we don't hold the descriptor mutex */
+ (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags |= RRD_PAGE_POPULATED;
+ pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
+ if (xt_io_descr->release_descr) {
+ pg_cache_put(ctx, descr);
+ } else {
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
+ }
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+ freez(xt_io_descr);
+}
+
void read_extent_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
@@ -40,7 +219,7 @@ void read_extent_cb(uv_fs_t* req)
int ret;
unsigned i, j, count;
void *page, *uncompressed_buf = NULL;
- uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
+ uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0;
uint8_t have_read_error = 0;
/* persistent structures */
struct rrdeng_df_extent_header *header;
@@ -98,6 +277,33 @@ after_crc_check:
debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret);
/* care, we don't hold the descriptor mutex */
}
+ {
+ uint8_t xt_is_cached = 0;
+ unsigned xt_idx;
+ struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
+
+ xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
+ if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) {
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx];
+ struct extent_io_descriptor *curr, *next;
+
+ if (have_read_error) {
+ memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages));
+ } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length);
+ } else {
+ (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length);
+ }
+ /* complete all connected in-flight read requests */
+ for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) {
+ next = curr->next;
+ read_cached_extent_cb(wc, xt_idx, curr);
+ }
+ xt_cache_elem->inflight_io_descr = NULL;
+ modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */
+ }
+ }
for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
page = mallocz(RRDENG_BLOCK_SIZE);
@@ -121,19 +327,19 @@ after_crc_check:
} else {
(void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
}
- pg_cache_replaceQ_insert(ctx, descr);
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
pg_cache_descr->page = page;
pg_cache_descr->flags |= RRD_PAGE_POPULATED;
pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
- debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
if (xt_io_descr->release_descr) {
- pg_cache_put_unsafe(descr);
+ pg_cache_put(ctx, descr);
} else {
- pg_cache_wake_up_waiters_unsafe(descr);
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
}
- rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
freez(uncompressed_buf);
@@ -158,18 +364,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
// uint32_t payload_length;
struct extent_io_descriptor *xt_io_descr;
struct rrdengine_datafile *datafile;
+ struct extent_info *extent = descr[0]->extent;
+ uint8_t xt_is_cached = 0, xt_is_inflight = 0;
+ unsigned xt_idx;
- datafile = descr[0]->extent->datafile;
- pos = descr[0]->extent->offset;
- size_bytes = descr[0]->extent->size;
+ datafile = extent->datafile;
+ pos = extent->offset;
+ size_bytes = extent->size;
- xt_io_descr = mallocz(sizeof(*xt_io_descr));
- ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
- /* freez(xt_io_descr);
- return;*/
- }
+ xt_io_descr = callocz(1, sizeof(*xt_io_descr));
for (i = 0 ; i < count; ++i) {
rrdeng_page_descr_mutex_lock(ctx, descr[i]);
pg_cache_descr = descr[i]->pg_cache_descr;
@@ -187,10 +390,34 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
/* xt_io_descr->descr_commit_idx_array[0] */
xt_io_descr->release_descr = release_descr;
+ xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
+ if (xt_is_cached) {
+ xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]);
+ xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx);
+ if (xt_is_inflight) {
+ enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr);
+ return;
+ }
+ return read_cached_extent_cb(wc, xt_idx, xt_io_descr);
+ } else {
+ ret = try_insert_into_xt_cache(wc, extent);
+ if (-1 != ret) {
+ xt_idx = (unsigned)ret;
+ modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1);
+ wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr;
+ }
+ }
+
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* freez(xt_io_descr);
+ return;*/
+ }
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb);
- assert (-1 != ret);
+ fatal_assert(-1 != ret);
ctx->stats.io_read_bytes += real_io_size;
++ctx->stats.io_read_requests;
ctx->stats.io_read_extent_bytes += real_io_size;
@@ -243,11 +470,117 @@ static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t ty
commit_data_extent(wc, (struct extent_io_descriptor *)data);
break;
default:
- assert(type == STORE_DATA);
+ fatal_assert(type == STORE_DATA);
break;
}
}
+static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ int error;
+
+ error = uv_thread_join(wc->now_invalidating_dirty_pages);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+}
+
+static void invalidate_oldest_committed(void *arg)
+{
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config *wc = &ctx->worker_config;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ Pvoid_t *PValue;
+ Word_t Index;
+ unsigned nr_committed_pages;
+
+ do {
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ for (Index = 0,
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+
+ descr != NULL;
+
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ fatal_assert(0 != descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) {
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
+ fatal_assert(1 == ret);
+ break;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (!descr) {
+ info("Failed to invalidate any dirty pages to relieve page cache pressure.");
+
+ goto out;
+ }
+ pg_cache_punch_hole(ctx, descr, 1, 1, NULL);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1);
+ rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1);
+
+ } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx));
+out:
+ wc->cleanup_thread_invalidating_dirty_pages = 1;
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
+}
+
+void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned nr_committed_pages;
+ int error;
+
+ if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */
+ return;
+
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+
+ if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
+ /* delete the oldest page in memory */
+ if (wc->now_invalidating_dirty_pages) {
+ /* already deleting a page */
+ return;
+ }
+ errno = 0;
+ error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
+ "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+
+ wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages));
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+
+ error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ }
+ }
+}
+
void flush_pages_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
@@ -294,6 +627,7 @@ void flush_pages_cb(uv_fs_t* req)
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
pg_cache->committed_page_index.nr_committed_pages -= count;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ wc->inflight_dirty_pages -= count;
}
/*
@@ -338,7 +672,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
uint8_t page_write_pending;
- assert(0 != descr->page_length);
+ fatal_assert(0 != descr->page_length);
page_write_pending = 0;
rrdeng_page_descr_mutex_lock(ctx, descr);
@@ -355,7 +689,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
if (page_write_pending) {
ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
- assert(1 == ret);
+ fatal_assert(1 == ret);
}
}
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
@@ -366,6 +700,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
complete(completion);
return 0;
}
+ wc->inflight_dirty_pages += count;
+
xt_io_descr = mallocz(sizeof(*xt_io_descr));
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
switch (compression_algorithm) {
@@ -373,7 +709,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
break;
default: /* Compress */
- assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
+ fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
compressed_buf = mallocz(max_compressed_size);
size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
@@ -453,7 +789,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb);
- assert (-1 != ret);
+ fatal_assert(-1 != ret);
ctx->stats.io_write_bytes += real_io_size;
++ctx->stats.io_write_requests;
ctx->stats.io_write_extent_bytes += real_io_size;
@@ -466,17 +802,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
return ALIGN_BYTES_CEILING(size_bytes);
}
-static void after_delete_old_data(uv_work_t *req, int status)
+static void after_delete_old_data(struct rrdengine_worker_config* wc)
{
- struct rrdengine_instance *ctx = req->data;
- struct rrdengine_worker_config* wc = &ctx->worker_config;
+ struct rrdengine_instance *ctx = wc->ctx;
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
- int ret;
+ int ret, error;
char path[RRDENG_PATH_MAX];
- (void)status;
datafile = ctx->datafiles.first;
journalfile = datafile->journalfile;
datafile_bytes = datafile->pos;
@@ -503,19 +837,30 @@ static void after_delete_old_data(uv_work_t *req, int status)
ctx->disk_space -= deleted_bytes;
info("Reclaimed %u bytes of disk space.", deleted_bytes);
+ error = uv_thread_join(wc->now_deleting_files);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_deleting_files);
/* unfreeze command processing */
- wc->now_deleting.data = NULL;
- /* wake up event loop */
- assert(0 == uv_async_send(&wc->async));
+ wc->now_deleting_files = NULL;
+
+ wc->cleanup_thread_deleting_files = 0;
+
+ /* interrupt event loop */
+ uv_stop(wc->loop);
}
-static void delete_old_data(uv_work_t *req)
+static void delete_old_data(void *arg)
{
- struct rrdengine_instance *ctx = req->data;
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_datafile *datafile;
struct extent_info *extent, *next;
struct rrdeng_page_descr *descr;
unsigned count, i;
+ uint8_t can_delete_metric;
+ uuid_t metric_id;
/* Safe to use since it will be deleted after we are done */
datafile = ctx->datafiles.first;
@@ -524,11 +869,21 @@ static void delete_old_data(uv_work_t *req)
count = extent->number_of_pages;
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
- pg_cache_punch_hole(ctx, descr, 0);
+ can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id);
+ if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) {
+ /*
+ * If the metric is empty, has no active writers and if the metadata log has been initialized then
+ * attempt to delete the corresponding netdata dimension.
+ */
+ metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id);
+ }
}
next = extent->next;
freez(extent);
}
+ wc->cleanup_thread_deleting_files = 1;
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
}
void rrdeng_test_quota(struct rrdengine_worker_config* wc)
@@ -537,10 +892,11 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
struct rrdengine_datafile *datafile;
unsigned current_size, target_size;
uint8_t out_of_space, only_one_datafile;
- int ret;
+ int ret, error;
out_of_space = 0;
- if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
+ /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */
+ if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) {
out_of_space = 1;
}
datafile = ctx->datafiles.last;
@@ -557,9 +913,9 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
++ctx->last_fileno;
}
}
- if (unlikely(out_of_space)) {
+ if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) {
/* delete old data */
- if (wc->now_deleting.data) {
+ if (wc->now_deleting_files) {
/* already deleting data */
return;
}
@@ -571,8 +927,39 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
}
info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
- wc->now_deleting.data = ctx;
- assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data));
+ wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files));
+ wc->cleanup_thread_deleting_files = 0;
+
+ error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_deleting_files);
+ wc->now_deleting_files = NULL;
+ }
+ }
+}
+
+static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc)
+{
+ if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) {
+ return 1;
+ }
+ return 0;
+}
+
+static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) {
+ after_invalidate_oldest_committed(wc);
+ }
+ if (unlikely(wc->cleanup_thread_deleting_files)) {
+ after_delete_old_data(wc);
+ }
+ if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) {
+ ctx->quiesce = QUIESCED;
+ complete(&ctx->rrdengine_completion);
}
}
@@ -591,8 +978,8 @@ void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
{
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
wc->queue_size = 0;
- assert(0 == uv_cond_init(&wc->cmd_cond));
- assert(0 == uv_mutex_init(&wc->cmd_mutex));
+ fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
+ fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
}
void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
@@ -604,7 +991,7 @@ void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) {
uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
}
- assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
+ fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
/* enqueue command */
wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ?
@@ -613,7 +1000,7 @@ void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
uv_mutex_unlock(&wc->cmd_mutex);
/* wake up event loop */
- assert(0 == uv_async_send(&wc->async));
+ fatal_assert(0 == uv_async_send(&wc->async));
}
struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
@@ -657,39 +1044,44 @@ void async_cb(uv_async_t *handle)
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
+ struct rrdengine_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
+ if (unlikely(!ctx->metalog_ctx->initialized))
+ return; /* Wait for the metadata log to initialize */
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
- if (likely(!wc->now_deleting.data)) {
- /* There is free space so we can write to disk */
- struct rrdengine_instance *ctx = wc->ctx;
+ if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
+ /* There is free space so we can write to disk and we are not actively deleting dirty buffers */
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
high_watermark;
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
- producers = ctx->stats.metric_API_producers;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+ producers = ctx->metric_API_max_producers;
/* are flushable pages more than 25% of the maximum page cache size */
high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
- if (nr_committed_pages > producers &&
- /* committed to be written pages are more than the produced number */
- nr_committed_pages - producers > high_watermark) {
- /* Flushing speed must increase to stop page cache from filling with dirty pages */
- bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
- }
- bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
+ /* Flush more pages only if disk can keep up */
+ if (wc->inflight_dirty_pages < high_watermark + producers) {
+ if (nr_committed_pages > producers &&
+ /* committed to be written pages are more than the produced number */
+ nr_committed_pages - producers > high_watermark) {
+ /* Flushing speed must increase to stop page cache from filling with dirty pages */
+ bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
+ }
+ bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
- debug(D_RRDENGINE, "Flushing pages to disk.");
- for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
- bytes_written && (total_bytes < bytes_to_write) ;
- total_bytes += bytes_written) {
- bytes_written = do_flush_pages(wc, 0, NULL);
+ debug(D_RRDENGINE, "Flushing pages to disk.");
+ for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL);
+ bytes_written && (total_bytes < bytes_to_write);
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 0, NULL);
+ }
}
}
#ifdef NETDATA_INTERNAL_CHECKS
@@ -730,7 +1122,12 @@ void rrdeng_worker(void* arg)
}
wc->async.data = wc;
- wc->now_deleting.data = NULL;
+ wc->now_deleting_files = NULL;
+ wc->cleanup_thread_deleting_files = 0;
+
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+ wc->inflight_dirty_pages = 0;
/* dirty page flushing timer */
ret = uv_timer_init(loop, &timer_req);
@@ -744,10 +1141,11 @@ void rrdeng_worker(void* arg)
/* wake up initialization thread */
complete(&ctx->rrdengine_completion);
- assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
+ fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
- while (shutdown == 0 || uv_loop_alive(loop)) {
+ while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
uv_run(loop, UV_RUN_DEFAULT);
+ rrdeng_cleanup_finished_threads(wc);
/* wait for commands */
cmd_batch_size = 0;
@@ -769,14 +1167,20 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_SHUTDOWN:
shutdown = 1;
- /*
- * uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour and we need to be aware if this becomes
- * an issue in the future.
- */
- uv_close((uv_handle_t *)&wc->async, NULL);
- assert(0 == uv_timer_stop(&timer_req));
+ break;
+ case RRDENG_QUIESCE:
+ ctx->drop_metrics_under_page_cache_pressure = 0;
+ ctx->quiesce = SET_QUIESCE;
+ fatal_assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all committed pages. */
+ }
+ wal_flush_transaction_buffer(wc);
+ if (!rrdeng_threads_alive(wc)) {
+ ctx->quiesce = QUIESCED;
+ complete(&ctx->rrdengine_completion);
+ }
break;
case RRDENG_READ_PAGE:
do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
@@ -788,16 +1192,16 @@ void rrdeng_worker(void* arg)
do_commit_transaction(wc, STORE_DATA, NULL);
break;
case RRDENG_FLUSH_PAGES: {
- unsigned bytes_written;
-
- /* First I/O should be enough to call completion */
- bytes_written = do_flush_pages(wc, 1, cmd.completion);
- if (bytes_written) {
- while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) {
- ; /* Force flushing of all committed pages if there is free space. */
- }
+ if (wc->now_invalidating_dirty_pages) {
+ /* Do not flush if the disk cannot keep up */
+ complete(cmd.completion);
+ } else {
+ (void)do_flush_pages(wc, 1, cmd.completion);
}
break;
+ case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE:
+ rrdeng_invalidate_oldest_committed(wc);
+ break;
}
default:
debug(D_RRDENGINE, "%s: default.", __func__);
@@ -805,11 +1209,17 @@ void rrdeng_worker(void* arg)
}
} while (opcode != RRDENG_NOOP);
}
+
/* cleanup operations of the event loop */
- if (unlikely(wc->now_deleting.data)) {
- info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
- }
info("Shutting down RRD engine event loop.");
+
+ /*
+ * uv_async_send after uv_close does not seem to crash in linux at the moment,
+ * it is however undocumented behaviour and we need to be aware if this becomes
+ * an issue in the future.
+ */
+ uv_close((uv_handle_t *)&wc->async, NULL);
+
while (do_flush_pages(wc, 1, NULL)) {
; /* Force flushing of all committed pages. */
}
@@ -820,7 +1230,7 @@ void rrdeng_worker(void* arg)
/* TODO: don't let the API block by waiting to enqueue commands */
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
- assert(0 == uv_loop_close(loop));
+ fatal_assert(0 == uv_loop_close(loop));
freez(loop);
return;
@@ -828,7 +1238,7 @@ void rrdeng_worker(void* arg)
error_after_timer_init:
uv_close((uv_handle_t *)&wc->async, NULL);
error_after_async_init:
- assert(0 == uv_loop_close(loop));
+ fatal_assert(0 == uv_loop_close(loop));
error_after_loop_init:
freez(loop);
@@ -845,11 +1255,12 @@ void rrdengine_main(void)
int ret;
struct rrdengine_instance *ctx;
- ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
+ sanity_check();
+ ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
if (ret) {
exit(ret);
}
rrdeng_exit(ctx);
fprintf(stderr, "Hello world!");
exit(0);
-} \ No newline at end of file
+}
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 78a35a0bf..87af04bff 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -7,19 +7,17 @@
#define _GNU_SOURCE
#endif
#include <fcntl.h>
-#include <aio.h>
-#include <uv.h>
-#include <assert.h>
#include <lz4.h>
#include <Judy.h>
#include <openssl/sha.h>
#include <openssl/evp.h>
-#include <stdint.h>
+#include "../../daemon/common.h"
#include "../rrd.h"
#include "rrddiskprotocol.h"
#include "rrdenginelib.h"
#include "datafile.h"
#include "journalfile.h"
+#include "metadata_log/metadatalog.h"
#include "rrdengineapi.h"
#include "pagecache.h"
#include "rrdenglocking.h"
@@ -52,6 +50,8 @@ enum rrdeng_opcode {
RRDENG_COMMIT_PAGE,
RRDENG_FLUSH_PAGES,
RRDENG_SHUTDOWN,
+ RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE,
+ RRDENG_QUIESCE,
RRDENG_MAX_OPCODE
};
@@ -88,6 +88,7 @@ struct extent_io_descriptor {
int release_descr;
struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */
};
struct generic_io_descriptor {
@@ -99,13 +100,43 @@ struct generic_io_descriptor {
struct completion *completion;
};
+struct extent_cache_element {
+ struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */
+ unsigned fileno;
+ struct extent_cache_element *prev; /* LRU */
+ struct extent_cache_element *next; /* LRU */
+ struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */
+ uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE];
+};
+
+#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */
+
+/* Initialize by setting the structure to zero */
+struct extent_cache {
+ struct extent_cache_element extent_array[MAX_CACHED_EXTENTS];
+ unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */
+ unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */
+
+ struct extent_cache_element *replaceQ_head; /* LRU */
+ struct extent_cache_element *replaceQ_tail; /* MRU */
+};
+
struct rrdengine_worker_config {
struct rrdengine_instance *ctx;
uv_thread_t thread;
uv_loop_t* loop;
uv_async_t async;
- uv_work_t now_deleting;
+
+ /* file deletion thread */
+ uv_thread_t *now_deleting_files;
+ unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */
+
+ /* dirty page deletion thread */
+ uv_thread_t *now_invalidating_dirty_pages;
+ /* set to 0 when now_invalidating_dirty_pages is still running */
+ unsigned long cleanup_thread_invalidating_dirty_pages;
+ unsigned inflight_dirty_pages;
/* FIFO command queue */
uv_mutex_t cmd_mutex;
@@ -113,6 +144,8 @@ struct rrdengine_worker_config {
volatile unsigned queue_size;
struct rrdeng_cmdqueue cmd_queue;
+ struct extent_cache xt_cache;
+
int error;
};
@@ -148,7 +181,8 @@ struct rrdengine_statistics {
rrdeng_stats_t page_cache_descriptors;
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
- rrdeng_stats_t flushing_errors;
+ rrdeng_stats_t pg_cache_over_half_dirty_events;
+ rrdeng_stats_t flushing_pressure_page_deletions;
};
/* I/O errors global counter */
@@ -157,27 +191,38 @@ extern rrdeng_stats_t global_io_errors;
extern rrdeng_stats_t global_fs_errors;
/* number of File-Descriptors that have been reserved by dbengine */
extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
-/* inability to flush global counter */
-extern rrdeng_stats_t global_flushing_errors;
+/* inability to flush global counters */
+extern rrdeng_stats_t global_pg_cache_over_half_dirty_events;
+extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */
+
+#define NO_QUIESCE (0) /* initial state when all operations function normally */
+#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */
+#define QUIESCED (2) /* is set after all threads have finished running */
struct rrdengine_instance {
+ struct metalog_instance *metalog_ctx;
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
struct page_cache pg_cache;
+ uint8_t drop_metrics_under_page_cache_pressure; /* boolean */
uint8_t global_compress_alg;
struct transaction_commit_log commit_log;
struct rrdengine_datafile_list datafiles;
- char dbfiles_path[FILENAME_MAX+1];
+ RRDHOST *host; /* the legacy host, or NULL for multi-host DB */
+ char dbfiles_path[FILENAME_MAX + 1];
+ char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */
uint64_t disk_space;
uint64_t max_disk_space;
unsigned last_fileno; /* newest index of datafile and journalfile */
unsigned long max_cache_pages;
unsigned long cache_pages_low_watermark;
+ unsigned long metric_API_max_producers;
+
+ uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */
struct rrdengine_statistics stats;
};
-extern void sanity_check(void);
extern int init_rrd_files(struct rrdengine_instance *ctx);
extern void finalize_rrd_files(struct rrdengine_instance *ctx);
extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index baf4a9973..7b2ff5b72 100644..100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -2,65 +2,148 @@
#include "rrdengine.h"
/* Default global database instance */
-static struct rrdengine_instance default_global_ctx;
+struct rrdengine_instance multidb_ctx;
int default_rrdeng_page_cache_mb = 32;
-int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB;
+int default_rrdeng_disk_quota_mb = 256;
+int default_multidb_disk_quota_mb = 256;
+/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
+uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
-/*
- * Gets a handle for storing metrics to the database.
- * The handle must be released with rrdeng_store_metric_final().
- */
-void rrdeng_store_metric_init(RRDDIM *rd)
+static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host)
+{
+ return host->rrdeng_ctx;
+}
+
+/* This UUID is not unique across hosts */
+void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid)
{
- struct rrdeng_collect_handle *handle;
- struct page_cache *pg_cache;
- struct rrdengine_instance *ctx;
- uuid_t temp_id;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
EVP_MD_CTX *evpctx;
unsigned char hash_value[EVP_MAX_MD_SIZE];
unsigned int hash_len;
- //&default_global_ctx; TODO: test this use case or remove it?
+ evpctx = EVP_MD_CTX_create();
+ EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id));
+ EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id));
+ EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
+ EVP_MD_CTX_destroy(evpctx);
+ fatal_assert(hash_len > sizeof(uuid_t));
+ memcpy(ret_uuid, hash_value, sizeof(uuid_t));
+}
- ctx = rd->rrdset->rrdhost->rrdeng_ctx;
- pg_cache = &ctx->pg_cache;
- handle = &rd->state->handle.rrdeng;
- handle->ctx = ctx;
+/* Transform legacy UUID to be unique across hosts deterministacally */
+void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid)
+{
+ EVP_MD_CTX *evpctx;
+ unsigned char hash_value[EVP_MAX_MD_SIZE];
+ unsigned int hash_len;
evpctx = EVP_MD_CTX_create();
EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
- EVP_DigestUpdate(evpctx, rd->id, strlen(rd->id));
- EVP_DigestUpdate(evpctx, rd->rrdset->id, strlen(rd->rrdset->id));
+ EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN);
+ EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t));
EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
EVP_MD_CTX_destroy(evpctx);
- assert(hash_len > sizeof(temp_id));
- memcpy(&temp_id, hash_value, sizeof(temp_id));
+ fatal_assert(hash_len > sizeof(uuid_t));
+ memcpy(ret_uuid, hash_value, sizeof(uuid_t));
+}
- handle->descr = NULL;
- handle->prev_descr = NULL;
- handle->unaligned_page = 0;
+void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid)
+{
+ struct page_cache *pg_cache;
+ struct rrdengine_instance *ctx;
+ uuid_t legacy_uuid;
+ uuid_t multihost_legacy_uuid;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ int is_multihost_child = 0;
+ RRDHOST *host = rd->rrdset->rrdhost;
+
+ ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
+ if (unlikely(!ctx)) {
+ error("Failed to fetch multidb context");
+ return;
+ }
+ pg_cache = &ctx->pg_cache;
+
+ rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid);
+ rd->state->metric_uuid = dim_uuid;
+ if (host != localhost && host->rrdeng_ctx == &multidb_ctx)
+ is_multihost_child = 1;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t));
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t));
if (likely(NULL != PValue)) {
page_index = *PValue;
}
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- /* First time we see the UUID */
- uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
- assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(&temp_id);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
- uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ if (is_multihost_child || NULL == PValue) {
+ /* First time we see the legacy UUID or metric belongs to child host in multi-host DB.
+ * Drop legacy support, normal path */
+
+ if (unlikely(!rd->state->metric_uuid))
+ rd->state->metric_uuid = create_dimension_uuid(rd->rrdset, rd);
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t), PJE0);
+ fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(rd->state->metric_uuid);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+ } else {
+ /* There are legacy UUIDs in the database, implement backward compatibility */
+
+ rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid,
+ &multihost_legacy_uuid);
+
+ if (unlikely(!rd->state->metric_uuid))
+ rd->state->metric_uuid = mallocz(sizeof(uuid_t));
+
+ int need_to_store = (dim_uuid == NULL || uuid_compare(*rd->state->metric_uuid, multihost_legacy_uuid));
+
+ uuid_copy(*rd->state->metric_uuid, multihost_legacy_uuid);
+
+ if (unlikely(need_to_store))
+ (void)sql_store_dimension(rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor,
+ rd->algorithm);
+
}
rd->state->rrdeng_uuid = &page_index->id;
- handle->page_index = page_index;
+ rd->state->page_index = page_index;
+}
+
+/*
+ * Gets a handle for storing metrics to the database.
+ * The handle must be released with rrdeng_store_metric_final().
+ */
+void rrdeng_store_metric_init(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+
+ ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
+ handle = &rd->state->handle.rrdeng;
+ handle->ctx = ctx;
+
+ handle->descr = NULL;
+ handle->prev_descr = NULL;
+ handle->unaligned_page = 0;
+
+ page_index = rd->state->page_index;
+ uv_rwlock_wrlock(&page_index->lock);
+ ++page_index->writers;
+ uv_rwlock_wrunlock(&page_index->lock);
}
/* The page must be populated and referenced */
@@ -88,12 +171,14 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
+ if (unlikely(!ctx))
+ return;
descr = handle->descr;
if (unlikely(NULL == descr)) {
return;
}
if (likely(descr->page_length)) {
- int ret, page_is_empty;
+ int page_is_empty;
rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
@@ -107,17 +192,21 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
pg_cache_put(ctx, descr);
- pg_cache_punch_hole(ctx, descr, 1);
+ pg_cache_punch_hole(ctx, descr, 1, 0, NULL);
handle->prev_descr = NULL;
} else {
+ /*
+ * Disable pinning for now as it leads to deadlocks. When a collector stops collecting the extra pinned page
+ * eventually gets rotated but it cannot be destroyed due to the extra reference.
+ */
/* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
- rrdeng_page_descr_mutex_lock(ctx, descr);
+/* rrdeng_page_descr_mutex_lock(ctx, descr);
ret = pg_cache_try_get_unsafe(descr, 0);
rrdeng_page_descr_mutex_unlock(ctx, descr);
- assert (1 == ret);
+ fatal_assert(1 == ret);*/
rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- handle->prev_descr = descr;
+ /* handle->prev_descr = descr;*/
}
} else {
freez(descr->pg_cache_descr->page);
@@ -168,14 +257,12 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
must_flush_unaligned_page)) {
rrdeng_store_metric_flush_current_page(rd);
- page = rrdeng_create_page(ctx, &handle->page_index->id, &descr);
- assert(page);
+ page = rrdeng_create_page(ctx, &rd->state->page_index->id, &descr);
+ fatal_assert(page);
handle->descr = descr;
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
- handle->page_correlation_id = pg_cache->committed_page_index.latest_corr_id++;
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
if (0 == rd->rrdset->rrddim_page_alignment) {
/* this is the leading dimension that defines chart alignment */
@@ -189,30 +276,53 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
if (perfect_page_alignment)
rd->rrdset->rrddim_page_alignment = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time)) {
+ unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
descr->start_time = point_in_time;
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, 1);
- pg_cache_insert(ctx, handle->page_index, descr);
+ new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1);
+ while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) {
+ /* Increase ctx->metric_API_max_producers */
+ ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers,
+ old_metric_API_max_producers,
+ new_metric_API_producers);
+ if (old_metric_API_max_producers == ret_metric_API_max_producers) {
+ /* success */
+ break;
+ }
+ }
+
+ pg_cache_insert(ctx, rd->state->page_index, descr);
} else {
- pg_cache_add_new_metric_time(handle->page_index, descr);
+ pg_cache_add_new_metric_time(rd->state->page_index, descr);
}
}
/*
* Releases the database reference from the handle for storing metrics.
+ * Returns 1 if it's safe to delete the dimension.
*/
-void rrdeng_store_metric_finalize(RRDDIM *rd)
+int rrdeng_store_metric_finalize(RRDDIM *rd)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+ uint8_t can_delete_metric = 0;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
+ page_index = rd->state->page_index;
rrdeng_store_metric_flush_current_page(rd);
if (handle->prev_descr) {
/* unpin old second page */
pg_cache_put(ctx, handle->prev_descr);
}
+ uv_rwlock_wrlock(&page_index->lock);
+ if (!--page_index->writers && !page_index->page_count) {
+ can_delete_metric = 1;
+ }
+ uv_rwlock_wrunlock(&page_index->lock);
+
+ return can_delete_metric;
}
/* Returns 1 if the data collection interval is well defined, 0 otherwise */
@@ -253,7 +363,7 @@ static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info)
* @return number of regions with different data collection intervals.
*/
unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
- struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp)
+ struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list)
{
struct pg_cache_page_index *page_index;
struct rrdengine_instance *ctx;
@@ -266,15 +376,16 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
struct rrdeng_region_info *region_info_array;
uint8_t is_first_region_initialized;
- ctx = st->rrdhost->rrdeng_ctx;
+ ctx = get_rrdeng_ctx_from_host(st->rrdhost);
regions = 1;
*max_intervalp = max_interval = 0;
region_info_array = NULL;
*region_info_arrayp = NULL;
page_info_array = NULL;
+ RRDDIM *temp_rd = context_param_list ? context_param_list->rd : NULL;
rrdset_rdlock(st);
- for(rd_iter = st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) {
+ for(rd_iter = temp_rd?temp_rd:st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) {
/*
* Choose oldest dimension as reference. This is not equivalent to the union of all dimensions
* but it is a best effort approximation with a bias towards older metrics in a chart. It
@@ -316,7 +427,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
continue;
}
page_entries = curr->page_length / sizeof(storage_number);
- assert(0 != page_entries);
+ fatal_assert(0 != page_entries);
if (likely(1 != page_entries)) {
dt = (curr->end_time - curr->start_time) / (page_entries - 1);
*pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt);
@@ -352,7 +463,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
if (1 == page_points)
first_valid_time_in_page = current_position_time;
if (unlikely(!is_first_region_initialized)) {
- assert(1 == regions);
+ fatal_assert(1 == regions);
/* this is the first region */
region_info_array[0].start_time = current_position_time;
is_first_region_initialized = 1;
@@ -365,7 +476,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
}
if (unlikely(0 == *pginfo_to_dt(curr))) { /* unknown data collection interval */
- assert(1 == page_points);
+ fatal_assert(1 == page_points);
if (likely(NULL != prev)) { /* get interval from previous page */
*pginfo_to_dt(curr) = *pginfo_to_dt(prev);
@@ -428,7 +539,7 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
struct rrdengine_instance *ctx;
unsigned pages_nr;
- ctx = rd->rrdset->rrdhost->rrdeng_ctx;
+ ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
rrdimm_handle->start_time = start_time;
rrdimm_handle->end_time = end_time;
handle = &rrdimm_handle->rrdeng;
@@ -452,7 +563,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
struct rrdeng_page_descr *descr;
storage_number *page, ret;
unsigned position, entries;
- usec_t next_page_time, current_position_time, page_end_time;
+ usec_t next_page_time = 0, current_position_time, page_end_time = 0;
uint32_t page_length;
handle = &rrdimm_handle->rrdeng;
@@ -473,17 +584,18 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
/* We need to get a new page */
if (descr) {
/* Drop old page's reference */
- handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
- if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
- goto no_more_metrics;
- }
- next_page_time = handle->next_page_time * USEC_PER_SEC;
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
pg_cache_put(ctx, descr);
handle->descr = NULL;
+ handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
+ if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
+ goto no_more_metrics;
+ }
+ next_page_time = handle->next_page_time * USEC_PER_SEC;
}
+
descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
if (NULL == descr) {
@@ -520,7 +632,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
}
handle->position = position;
handle->now = current_position_time / USEC_PER_SEC;
-/* assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time);
+/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time);
The above assertion is an approximation and needs to take update_every into account */
if (unlikely(handle->now >= rrdimm_handle->end_time)) {
/* next calls will not load any more metrics */
@@ -564,21 +676,17 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
time_t rrdeng_metric_latest_time(RRDDIM *rd)
{
- struct rrdeng_collect_handle *handle;
struct pg_cache_page_index *page_index;
- handle = &rd->state->handle.rrdeng;
- page_index = handle->page_index;
+ page_index = rd->state->page_index;
return page_index->latest_time / USEC_PER_SEC;
}
time_t rrdeng_metric_oldest_time(RRDDIM *rd)
{
- struct rrdeng_collect_handle *handle;
struct pg_cache_page_index *page_index;
- handle = &rd->state->handle.rrdeng;
- page_index = handle->page_index;
+ page_index = rd->state->page_index;
return page_index->oldest_time / USEC_PER_SEC;
}
@@ -620,7 +728,7 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr
debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__);
return;
}
- assert(descr->page_length);
+ fatal_assert(descr->page_length);
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0);
@@ -628,16 +736,27 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr
nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
- if (nr_committed_pages >= (pg_cache_hard_limit(ctx) - (unsigned long)ctx->stats.metric_API_producers) / 2) {
- /* 50% of pages have not been committed yet */
- if (0 == (unsigned long)ctx->stats.flushing_errors) {
- /* only print the first time */
- error("Failed to flush quickly enough in dbengine instance \"%s\""
- ". Metric data will not be stored in the database"
- ", please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+ if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) {
+ /* over 50% of pages have not been committed yet */
+
+ if (ctx->drop_metrics_under_page_cache_pressure &&
+ nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
+ /* 100% of pages are dirty */
+ struct rrdeng_cmd cmd;
+
+ cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ } else {
+ if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) {
+ /* only print the first time */
+ errno = 0;
+ error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
+ "Metric data at risk of not being stored in the database, "
+ "please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+ }
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1);
+ rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1);
}
- rrd_stat_atomic_add(&ctx->stats.flushing_errors, 1);
- rrd_stat_atomic_add(&global_flushing_errors, 1);
}
pg_cache_put(ctx, descr);
@@ -687,8 +806,11 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
* You must not change the indices of the statistics or user code will break.
* You must not exceed RRDENG_NR_STATS or it will crash.
*/
-void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
+ if (ctx == NULL)
+ return;
+
struct page_cache *pg_cache = &ctx->pg_cache;
array[0] = (uint64_t)ctx->stats.metric_API_producers;
@@ -724,9 +846,11 @@ void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long
array[30] = (uint64_t)global_io_errors;
array[31] = (uint64_t)global_fs_errors;
array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
- array[33] = (uint64_t)ctx->stats.flushing_errors;
- array[34] = (uint64_t)global_flushing_errors;
- assert(RRDENG_NR_STATS == 35);
+ array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events;
+ array[34] = (uint64_t)global_pg_cache_over_half_dirty_events;
+ array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions;
+ array[36] = (uint64_t)global_flushing_pressure_page_deletions;
+ fatal_assert(RRDENG_NR_STATS == 37);
}
/* Releases reference to page */
@@ -739,21 +863,21 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
/*
* Returns 0 on success, negative on error
*/
-int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb)
+int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+ unsigned disk_space_mb)
{
struct rrdengine_instance *ctx;
int error;
uint32_t max_open_files;
- sanity_check();
-
max_open_files = rlimit_nofile.rlim_cur / 4;
/* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
if (rrdeng_reserved_file_descriptors > max_open_files) {
- error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
- (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files);
+ error(
+ "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
+ (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files);
rrd_stat_atomic_add(&global_fs_errors, 1);
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
@@ -761,8 +885,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
}
if (NULL == ctxp) {
- /* for testing */
- ctx = &default_global_ctx;
+ ctx = &multidb_ctx;
memset(ctx, 0, sizeof(*ctx));
} else {
*ctxp = ctx = callocz(1, sizeof(*ctx));
@@ -778,6 +901,16 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
ctx->max_disk_space = disk_space_mb * 1048576LLU;
strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
+ if (NULL == host)
+ strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN);
+ else
+ strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN);
+
+ ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure;
+ ctx->metric_API_max_producers = 0;
+ ctx->quiesce = NO_QUIESCE;
+ ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */
+ ctx->host = host;
memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
ctx->worker_config.ctx = ctx;
@@ -789,20 +922,27 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
}
init_completion(&ctx->rrdengine_completion);
- assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
+ fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
/* wait for worker thread to initialize */
wait_for_completion(&ctx->rrdengine_completion);
destroy_completion(&ctx->rrdengine_completion);
+ uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE");
if (ctx->worker_config.error) {
goto error_after_rrdeng_worker;
}
+ error = metalog_init(ctx);
+ if (error) {
+ error("Failed to initialize metadata log file event loop.");
+ goto error_after_rrdeng_worker;
+ }
+
return 0;
error_after_rrdeng_worker:
finalize_rrd_files(ctx);
error_after_init_rrd_files:
free_page_cache(ctx);
- if (ctx != &default_global_ctx) {
+ if (ctx != &multidb_ctx) {
freez(ctx);
*ctxp = NULL;
}
@@ -825,14 +965,35 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
cmd.opcode = RRDENG_SHUTDOWN;
rrdeng_enq_cmd(&ctx->worker_config, &cmd);
- assert(0 == uv_thread_join(&ctx->worker_config.thread));
+ fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread));
finalize_rrd_files(ctx);
+ //metalog_exit(ctx->metalog_ctx);
free_page_cache(ctx);
- if (ctx != &default_global_ctx) {
+ if (ctx != &multidb_ctx) {
freez(ctx);
}
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return 0;
-} \ No newline at end of file
+}
+
+void rrdeng_prepare_exit(struct rrdengine_instance *ctx)
+{
+ struct rrdeng_cmd cmd;
+
+ if (NULL == ctx) {
+ return;
+ }
+
+ init_completion(&ctx->rrdengine_completion);
+ cmd.opcode = RRDENG_QUIESCE;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ /* wait for dbengine to quiesce */
+ wait_for_completion(&ctx->rrdengine_completion);
+ destroy_completion(&ctx->rrdengine_completion);
+
+ //metalog_prepare_exit(ctx->metalog_ctx);
+}
+
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index 1ef0569d5..41375b980 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -6,14 +6,17 @@
#include "rrdengine.h"
#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8)
-#define RRDENG_MIN_DISK_SPACE_MB (256)
+#define RRDENG_MIN_DISK_SPACE_MB (64)
-#define RRDENG_NR_STATS (35)
+#define RRDENG_NR_STATS (37)
#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
extern int default_rrdeng_page_cache_mb;
extern int default_rrdeng_disk_quota_mb;
+extern int default_multidb_disk_quota_mb;
+extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure;
+extern struct rrdengine_instance multidb_ctx;
struct rrdeng_region_info {
time_t start_time;
@@ -27,13 +30,20 @@ extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_pag
extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
+
+extern void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid);
+extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid,
+ uuid_t *ret_uuid);
+
+
+extern void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid);
extern void rrdeng_store_metric_init(RRDDIM *rd);
extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd);
extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
-extern void rrdeng_store_metric_finalize(RRDDIM *rd);
+extern int rrdeng_store_metric_finalize(RRDDIM *rd);
extern unsigned
rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
- struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp);
+ struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list);
extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
time_t start_time, time_t end_time);
extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time);
@@ -41,12 +51,13 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
-extern void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
/* must call once before using anything */
-extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
unsigned disk_space_mb);
extern int rrdeng_exit(struct rrdengine_instance *ctx);
+extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
-#endif /* NETDATA_RRDENGINEAPI_H */ \ No newline at end of file
+#endif /* NETDATA_RRDENGINEAPI_H */
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
index 0606dd938..287b86be8 100644
--- a/database/engine/rrdenginelib.c
+++ b/database/engine/rrdenginelib.c
@@ -60,7 +60,7 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
if (ret < 0) {
fatal("uv_fs_fstat: %s\n", uv_strerror(ret));
}
- assert(req.result == 0);
+ fatal_assert(req.result == 0);
s = req.ptr;
if (!(s->st_mode & S_IFREG)) {
error("Not a regular file.\n");
@@ -78,17 +78,22 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
return 0;
}
-/*
- * Tries to open a file in direct I/O mode, falls back to buffered mode if not possible.
- * Returns UV error number that is < 0 on failure.
- * On success sets (*file) to be the uv_file that was opened.
+/**
+ * Open file for I/O.
+ *
+ * @param path The full path of the file.
+ * @param flags Same flags as the open() system call uses.
+ * @param file On success sets (*file) to be the uv_file that was opened.
+ * @param direct Tries to open a file in direct I/O mode when direct=1, falls back to buffered mode if not possible.
+ * @return Returns UV error number that is < 0 on failure. 0 on success.
*/
-int open_file_direct_io(char *path, int flags, uv_file *file)
+int open_file_for_io(char *path, int flags, uv_file *file, int direct)
{
uv_fs_t req;
- int fd, current_flags, direct;
+ int fd = -1, current_flags;
- for (direct = 1 ; direct >= 0 ; --direct) {
+ fatal_assert(0 == direct || 1 == direct);
+ for ( ; direct >= 0 ; --direct) {
#ifdef __APPLE__
/* Apple OS does not support O_DIRECT */
direct = 0;
@@ -106,7 +111,7 @@ int open_file_direct_io(char *path, int flags, uv_file *file)
--direct; /* break the loop */
}
} else {
- assert(req.result >= 0);
+ fatal_assert(req.result >= 0);
*file = req.result;
#ifdef __APPLE__
info("Disabling OS X caching for file \"%s\".", path);
@@ -159,8 +164,10 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
"global_io_errors: %ld\n"
"global_fs_errors: %ld\n"
"rrdeng_reserved_file_descriptors: %ld\n"
- "flushing_errors: %ld\n"
- "global_flushing_errors: %ld\n",
+ "pg_cache_over_half_dirty_events: %ld\n"
+ "global_pg_cache_over_half_dirty_events: %ld\n"
+ "flushing_pressure_page_deletions: %ld\n"
+ "global_flushing_pressure_page_deletions: %ld\n",
(long)ctx->stats.metric_API_producers,
(long)ctx->stats.metric_API_consumers,
(long)pg_cache->page_descriptors,
@@ -194,8 +201,94 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
(long)global_io_errors,
(long)global_fs_errors,
(long)rrdeng_reserved_file_descriptors,
- (long)ctx->stats.flushing_errors,
- (long)global_flushing_errors
+ (long)ctx->stats.pg_cache_over_half_dirty_events,
+ (long)global_pg_cache_over_half_dirty_events,
+ (long)ctx->stats.flushing_pressure_page_deletions,
+ (long)global_flushing_pressure_page_deletions
);
return str;
}
+
+int is_legacy_child(const char *machine_guid)
+{
+ uuid_t uuid;
+ char dbengine_file[FILENAME_MAX+1];
+
+ if (unlikely(!strcmp(machine_guid, "unittest-dbengine") || !strcmp(machine_guid, "dbengine-dataset") ||
+ !strcmp(machine_guid, "dbengine-stress-test"))) {
+ return 1;
+ }
+ if (!uuid_parse(machine_guid, uuid)) {
+ uv_fs_t stat_req;
+ snprintfz(dbengine_file, FILENAME_MAX, "%s/%s/dbengine", netdata_configured_cache_dir, machine_guid);
+ int rc = uv_fs_stat(NULL, &stat_req, dbengine_file, NULL);
+ if (likely(rc == 0 && ((stat_req.statbuf.st_mode & S_IFMT) == S_IFDIR))) {
+ //info("Found legacy engine folder \"%s\"", dbengine_file);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+int count_legacy_children(char *dbfiles_path)
+{
+ int ret;
+ uv_fs_t req;
+ uv_dirent_t dent;
+ int legacy_engines = 0;
+
+ ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
+ if (ret < 0) {
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
+ return ret;
+ }
+
+ while(UV_EOF != uv_fs_scandir_next(&req, &dent)) {
+ if (dent.type == UV_DIRENT_DIR) {
+ if (is_legacy_child(dent.name))
+ legacy_engines++;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+ return legacy_engines;
+}
+
+int compute_multidb_diskspace()
+{
+ char multidb_disk_space_file[FILENAME_MAX + 1];
+ FILE *fp;
+ int computed_multidb_disk_quota_mb = -1;
+
+ snprintfz(multidb_disk_space_file, FILENAME_MAX, "%s/dbengine_multihost_size", netdata_configured_varlib_dir);
+ fp = fopen(multidb_disk_space_file, "r");
+ if (likely(fp)) {
+ int rc = fscanf(fp, "%d", &computed_multidb_disk_quota_mb);
+ fclose(fp);
+ if (unlikely(rc != 1 || computed_multidb_disk_quota_mb < RRDENG_MIN_DISK_SPACE_MB)) {
+ errno = 0;
+ error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file);
+ computed_multidb_disk_quota_mb = -1;
+ }
+ }
+
+ if (computed_multidb_disk_quota_mb == -1) {
+ int rc = count_legacy_children(netdata_configured_cache_dir);
+ if (likely(rc >= 0)) {
+ computed_multidb_disk_quota_mb = (rc + 1) * default_rrdeng_disk_quota_mb;
+ info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb);
+
+ fp = fopen(multidb_disk_space_file, "w");
+ if (likely(fp)) {
+ fprintf(fp, "%d", computed_multidb_disk_quota_mb);
+ info("Created file '%s' to store the computed value", multidb_disk_space_file);
+ fclose(fp);
+ } else
+ error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file);
+ }
+ else
+ computed_multidb_disk_quota_mb = default_rrdeng_disk_quota_mb;
+ }
+
+ return computed_multidb_disk_quota_mb;
+}
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
index 5685b65a5..ebab93c8f 100644
--- a/database/engine/rrdenginelib.h
+++ b/database/engine/rrdenginelib.h
@@ -3,10 +3,9 @@
#ifndef NETDATA_RRDENGINELIB_H
#define NETDATA_RRDENGINELIB_H
-#include "rrdengine.h"
-
/* Forward declarations */
struct rrdeng_page_descr;
+struct rrdengine_instance;
#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)
@@ -28,11 +27,43 @@ struct rrdeng_page_descr;
typedef uintptr_t rrdeng_stats_t;
#ifdef __ATOMIC_RELAXED
-#define rrd_stat_atomic_add(p, n) do {(void) __atomic_fetch_add(p, n, __ATOMIC_RELAXED);} while(0)
+#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED)
+#define rrd_atomic_add_fetch(p, n) __atomic_add_fetch(p, n, __ATOMIC_RELAXED)
#else
-#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0)
+#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n)
+#define rrd_atomic_add_fetch(p, n) __sync_add_and_fetch(p, n)
#endif
+#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n)
+
+/* returns -1 if it didn't find the first cleared bit, the position otherwise. Starts from LSB. */
+static inline int find_first_zero(unsigned x)
+{
+ return ffs((int)(~x)) - 1;
+}
+
+/* Starts from LSB. */
+static inline uint8_t check_bit(unsigned x, size_t pos)
+{
+ return !!(x & (1 << pos));
+}
+
+/* Starts from LSB. val is 0 or 1 */
+static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val)
+{
+ switch(val) {
+ case 0:
+ *x &= ~(1U << pos);
+ break;
+ case 1:
+ *x |= 1U << pos;
+ break;
+ default:
+ error("modify_bit() called with invalid argument.");
+ break;
+ }
+}
+
#define RRDENG_PATH_MAX (4096)
/* returns old *ptr value */
@@ -56,8 +87,8 @@ struct completion {
static inline void init_completion(struct completion *p)
{
p->completed = 0;
- assert(0 == uv_cond_init(&p->cond));
- assert(0 == uv_mutex_init(&p->mutex));
+ fatal_assert(0 == uv_cond_init(&p->cond));
+ fatal_assert(0 == uv_mutex_init(&p->mutex));
}
static inline void destroy_completion(struct completion *p)
@@ -72,7 +103,7 @@ static inline void wait_for_completion(struct completion *p)
while (0 == p->completed) {
uv_cond_wait(&p->cond, &p->mutex);
}
- assert(1 == p->completed);
+ fatal_assert(1 == p->completed);
uv_mutex_unlock(&p->mutex);
}
@@ -97,7 +128,17 @@ static inline void crc32set(void *crcp, uLong crc)
extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr);
extern void print_page_descr(struct rrdeng_page_descr *descr);
extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
-extern int open_file_direct_io(char *path, int flags, uv_file *file);
+extern int open_file_for_io(char *path, int flags, uv_file *file, int direct);
+static inline int open_file_direct_io(char *path, int flags, uv_file *file)
+{
+ return open_file_for_io(path, flags, file, 1);
+}
+static inline int open_file_buffered_io(char *path, int flags, uv_file *file)
+{
+ return open_file_for_io(path, flags, file, 0);
+}
extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
+extern int compute_multidb_diskspace();
+extern int is_legacy_child(const char *machine_guid);
#endif /* NETDATA_RRDENGINELIB_H */ \ No newline at end of file
diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c
index 0eb9019b4..a23abf307 100644
--- a/database/engine/rrdenglocking.c
+++ b/database/engine/rrdenglocking.c
@@ -12,8 +12,8 @@ struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance
pg_cache_descr->prev = pg_cache_descr->next = NULL;
pg_cache_descr->refcnt = 0;
pg_cache_descr->waiters = 0;
- assert(0 == uv_cond_init(&pg_cache_descr->cond));
- assert(0 == uv_mutex_init(&pg_cache_descr->mutex));
+ fatal_assert(0 == uv_cond_init(&pg_cache_descr->cond));
+ fatal_assert(0 == uv_mutex_init(&pg_cache_descr->mutex));
return pg_cache_descr;
}
@@ -39,7 +39,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_
old_users = old_state >> PG_CACHE_DESCR_SHIFT;
if (unlikely(we_locked)) {
- assert(old_state & PG_CACHE_DESCR_LOCKED);
+ fatal_assert(old_state & PG_CACHE_DESCR_LOCKED);
new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED;
ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
if (old_state == ret_state) {
@@ -49,7 +49,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_
continue; /* spin */
}
if (old_state & PG_CACHE_DESCR_LOCKED) {
- assert(0 == old_users);
+ fatal_assert(0 == old_users);
continue; /* spin */
}
if (0 == old_state) {
@@ -71,8 +71,9 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_
continue; /* spin */
}
/* page cache descriptor is already allocated */
- assert(old_state & PG_CACHE_DESCR_ALLOCATED);
-
+ if (unlikely(!(old_state & PG_CACHE_DESCR_ALLOCATED))) {
+ fatal("Invalid page cache descriptor locking state:%#lX", old_state);
+ }
new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT;
new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
@@ -94,7 +95,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_
void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
unsigned long old_state, new_state, ret_state, old_users;
- struct page_cache_descr *pg_cache_descr;
+ struct page_cache_descr *pg_cache_descr, *delete_pg_cache_descr = NULL;
uint8_t we_locked;
uv_mutex_unlock(&descr->pg_cache_descr->mutex);
@@ -105,35 +106,39 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
old_users = old_state >> PG_CACHE_DESCR_SHIFT;
if (unlikely(we_locked)) {
- assert(0 == old_users);
+ fatal_assert(0 == old_users);
ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0);
if (old_state == ret_state) {
/* success */
- break;
+ rrdeng_destroy_pg_cache_descr(ctx, delete_pg_cache_descr);
+ return;
}
continue; /* spin */
}
if (old_state & PG_CACHE_DESCR_LOCKED) {
- assert(0 == old_users);
+ fatal_assert(0 == old_users);
continue; /* spin */
}
- assert(old_state & PG_CACHE_DESCR_ALLOCATED);
+ fatal_assert(old_state & PG_CACHE_DESCR_ALLOCATED);
pg_cache_descr = descr->pg_cache_descr;
/* caller is the only page cache descriptor user and there are no pending references on the page */
if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) &&
!pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ fatal_assert(!pg_cache_descr->waiters);
+
new_state = PG_CACHE_DESCR_LOCKED;
ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
if (old_state == ret_state) {
we_locked = 1;
- rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ delete_pg_cache_descr = pg_cache_descr;
+ descr->pg_cache_descr = NULL;
/* retry */
continue;
}
continue; /* spin */
}
- assert(old_users > 0);
+ fatal_assert(old_users > 0);
new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT;
new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK;
@@ -144,7 +149,6 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
}
/* spin */
}
-
}
/*
@@ -155,34 +159,36 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
unsigned long old_state, new_state, ret_state, old_users;
- struct page_cache_descr *pg_cache_descr;
- uint8_t just_locked, we_freed, must_unlock;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ uint8_t just_locked, can_free, must_unlock;
just_locked = 0;
- we_freed = 0;
+ can_free = 0;
must_unlock = 0;
while (1) { /* spin */
old_state = descr->pg_cache_descr_state;
old_users = old_state >> PG_CACHE_DESCR_SHIFT;
if (unlikely(just_locked)) {
- assert(0 == old_users);
+ fatal_assert(0 == old_users);
must_unlock = 1;
just_locked = 0;
/* Try deallocate if there are no pending references on the page */
if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) {
- rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
- we_freed = 1;
+ fatal_assert(!pg_cache_descr->waiters);
+
+ descr->pg_cache_descr = NULL;
+ can_free = 1;
/* success */
continue;
}
continue; /* spin */
}
if (unlikely(must_unlock)) {
- assert(0 == old_users);
+ fatal_assert(0 == old_users);
- if (we_freed) {
+ if (can_free) {
/* success */
new_state = 0;
} else {
@@ -192,6 +198,8 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct
ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
if (old_state == ret_state) {
/* unlocked */
+ if (can_free)
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
return;
}
continue; /* spin */
@@ -201,16 +209,16 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct
return;
}
if (old_state & PG_CACHE_DESCR_LOCKED) {
- assert(0 == old_users);
+ fatal_assert(0 == old_users);
continue; /* spin */
}
- pg_cache_descr = descr->pg_cache_descr;
/* caller is the only page cache descriptor user */
if (0 == old_users) {
new_state = old_state | PG_CACHE_DESCR_LOCKED;
ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
if (old_state == ret_state) {
just_locked = 1;
+ pg_cache_descr = descr->pg_cache_descr;
/* retry */
continue;
}