From a8220ab2d293bb7f4b014b79d16b2fb05090fa93 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Feb 2021 12:45:55 +0100 Subject: Adding upstream version 1.29.0. Signed-off-by: Daniel Baumann --- database/engine/Makefile.am | 4 + database/engine/Makefile.in | 519 ------------------ database/engine/README.md | 202 ++++--- database/engine/datafile.c | 67 ++- database/engine/datafile.h | 3 +- database/engine/journalfile.c | 39 +- database/engine/journalfile.h | 1 + database/engine/metadata_log/Makefile.am | 8 + database/engine/metadata_log/README.md | 0 database/engine/metadata_log/compaction.c | 86 +++ database/engine/metadata_log/compaction.h | 14 + database/engine/metadata_log/logfile.c | 453 ++++++++++++++++ database/engine/metadata_log/logfile.h | 39 ++ database/engine/metadata_log/metadatalog.h | 28 + database/engine/metadata_log/metadatalogapi.c | 39 ++ database/engine/metadata_log/metadatalogapi.h | 12 + database/engine/metadata_log/metadatalogprotocol.h | 53 ++ database/engine/metadata_log/metalogpluginsd.c | 140 +++++ database/engine/metadata_log/metalogpluginsd.h | 33 ++ database/engine/pagecache.c | 155 ++++-- database/engine/pagecache.h | 9 +- database/engine/rrdengine.c | 587 ++++++++++++++++++--- database/engine/rrdengine.h | 65 ++- database/engine/rrdengineapi.c | 353 +++++++++---- database/engine/rrdengineapi.h | 25 +- database/engine/rrdenginelib.c | 119 ++++- database/engine/rrdenginelib.h | 57 +- database/engine/rrdenglocking.c | 56 +- 28 files changed, 2259 insertions(+), 907 deletions(-) delete mode 100644 database/engine/Makefile.in create mode 100644 database/engine/metadata_log/Makefile.am create mode 100644 database/engine/metadata_log/README.md create mode 100644 database/engine/metadata_log/compaction.c create mode 100644 database/engine/metadata_log/compaction.h create mode 100644 database/engine/metadata_log/logfile.c create mode 100644 database/engine/metadata_log/logfile.h create mode 100644 database/engine/metadata_log/metadatalog.h create mode 100755 database/engine/metadata_log/metadatalogapi.c create mode 100644 database/engine/metadata_log/metadatalogapi.h create mode 100644 database/engine/metadata_log/metadatalogprotocol.h create mode 100755 database/engine/metadata_log/metalogpluginsd.c create mode 100644 database/engine/metadata_log/metalogpluginsd.h mode change 100644 => 100755 database/engine/rrdengineapi.c (limited to 'database/engine') diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am index 161784b8f..43405001d 100644 --- a/database/engine/Makefile.am +++ b/database/engine/Makefile.am @@ -3,6 +3,10 @@ AUTOMAKE_OPTIONS = subdir-objects MAINTAINERCLEANFILES = $(srcdir)/Makefile.in +SUBDIRS = \ + metadata_log \ + $(NULL) + dist_noinst_DATA = \ README.md \ $(NULL) diff --git a/database/engine/Makefile.in b/database/engine/Makefile.in deleted file mode 100644 index 3c9b33089..000000000 --- a/database/engine/Makefile.in +++ /dev/null @@ -1,519 +0,0 @@ -# Makefile.in generated by automake 1.15.1 from Makefile.am. -# @configure_input@ - -# Copyright (C) 1994-2017 Free Software Foundation, Inc. - -# This Makefile.in is free software; the Free Software Foundation -# gives unlimited permission to copy and/or distribute it, -# with or without modifications, as long as this notice is preserved. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY, to the extent permitted by law; without -# even the implied warranty of MERCHANTABILITY or FITNESS FOR A -# PARTICULAR PURPOSE. - -@SET_MAKE@ - -# SPDX-License-Identifier: GPL-3.0-or-later - -VPATH = @srcdir@ -am__is_gnu_make = { \ - if test -z '$(MAKELEVEL)'; then \ - false; \ - elif test -n '$(MAKE_HOST)'; then \ - true; \ - elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \ - true; \ - else \ - false; \ - fi; \ -} -am__make_running_with_option = \ - case $${target_option-} in \ - ?) ;; \ - *) echo "am__make_running_with_option: internal error: invalid" \ - "target option '$${target_option-}' specified" >&2; \ - exit 1;; \ - esac; \ - has_opt=no; \ - sane_makeflags=$$MAKEFLAGS; \ - if $(am__is_gnu_make); then \ - sane_makeflags=$$MFLAGS; \ - else \ - case $$MAKEFLAGS in \ - *\\[\ \ ]*) \ - bs=\\; \ - sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \ - | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \ - esac; \ - fi; \ - skip_next=no; \ - strip_trailopt () \ - { \ - flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \ - }; \ - for flg in $$sane_makeflags; do \ - test $$skip_next = yes && { skip_next=no; continue; }; \ - case $$flg in \ - *=*|--*) continue;; \ - -*I) strip_trailopt 'I'; skip_next=yes;; \ - -*I?*) strip_trailopt 'I';; \ - -*O) strip_trailopt 'O'; skip_next=yes;; \ - -*O?*) strip_trailopt 'O';; \ - -*l) strip_trailopt 'l'; skip_next=yes;; \ - -*l?*) strip_trailopt 'l';; \ - -[dEDm]) skip_next=yes;; \ - -[JT]) skip_next=yes;; \ - esac; \ - case $$flg in \ - *$$target_option*) has_opt=yes; break;; \ - esac; \ - done; \ - test $$has_opt = yes -am__make_dryrun = (target_option=n; $(am__make_running_with_option)) -am__make_keepgoing = (target_option=k; $(am__make_running_with_option)) -pkgdatadir = $(datadir)/@PACKAGE@ -pkgincludedir = $(includedir)/@PACKAGE@ -pkglibdir = $(libdir)/@PACKAGE@ -pkglibexecdir = $(libexecdir)/@PACKAGE@ -am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd -install_sh_DATA = $(install_sh) -c -m 644 -install_sh_PROGRAM = $(install_sh) -c -install_sh_SCRIPT = $(install_sh) -c -INSTALL_HEADER = $(INSTALL_DATA) -transform = $(program_transform_name) -NORMAL_INSTALL = : -PRE_INSTALL = : -POST_INSTALL = : -NORMAL_UNINSTALL = : -PRE_UNINSTALL = : -POST_UNINSTALL = : -build_triplet = @build@ -host_triplet = @host@ -subdir = database/engine -ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 -am__aclocal_m4_deps = $(top_srcdir)/build/m4/ax_c___atomic.m4 \ - $(top_srcdir)/build/m4/ax_c__generic.m4 \ - $(top_srcdir)/build/m4/ax_c_lto.m4 \ - $(top_srcdir)/build/m4/ax_c_mallinfo.m4 \ - $(top_srcdir)/build/m4/ax_c_mallopt.m4 \ - $(top_srcdir)/build/m4/ax_check_compile_flag.m4 \ - $(top_srcdir)/build/m4/ax_gcc_func_attribute.m4 \ - $(top_srcdir)/build/m4/ax_pthread.m4 \ - $(top_srcdir)/build/m4/jemalloc.m4 \ - $(top_srcdir)/build/m4/tcmalloc.m4 $(top_srcdir)/configure.ac -am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \ - $(ACLOCAL_M4) -DIST_COMMON = $(srcdir)/Makefile.am $(dist_noinst_DATA) \ - $(am__DIST_COMMON) -mkinstalldirs = $(install_sh) -d -CONFIG_HEADER = $(top_builddir)/config.h -CONFIG_CLEAN_FILES = -CONFIG_CLEAN_VPATH_FILES = -AM_V_P = $(am__v_P_@AM_V@) -am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) -am__v_P_0 = false -am__v_P_1 = : -AM_V_GEN = $(am__v_GEN_@AM_V@) -am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) -am__v_GEN_0 = @echo " GEN " $@; -am__v_GEN_1 = -AM_V_at = $(am__v_at_@AM_V@) -am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) -am__v_at_0 = @ -am__v_at_1 = -SOURCES = -DIST_SOURCES = -am__can_run_installinfo = \ - case $$AM_UPDATE_INFO_DIR in \ - n|no|NO) false;; \ - *) (install-info --version) >/dev/null 2>&1;; \ - esac -DATA = $(dist_noinst_DATA) -am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP) -am__DIST_COMMON = $(srcdir)/Makefile.in -DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) -ACLOCAL = @ACLOCAL@ -AMTAR = @AMTAR@ -AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@ -AUTOCONF = @AUTOCONF@ -AUTOHEADER = @AUTOHEADER@ -AUTOMAKE = @AUTOMAKE@ -AWK = @AWK@ -CC = @CC@ -CCDEPMODE = @CCDEPMODE@ -CFLAGS = @CFLAGS@ -CMOCKA_CFLAGS = @CMOCKA_CFLAGS@ -CMOCKA_LIBS = @CMOCKA_LIBS@ -CPP = @CPP@ -CPPFLAGS = @CPPFLAGS@ -CUPSCONFIG = @CUPSCONFIG@ -CXX = @CXX@ -CXXDEPMODE = @CXXDEPMODE@ -CXXFLAGS = @CXXFLAGS@ -CXX_BINARY = @CXX_BINARY@ -CYGPATH_W = @CYGPATH_W@ -DEFS = @DEFS@ -DEPDIR = @DEPDIR@ -ECHO_C = @ECHO_C@ -ECHO_N = @ECHO_N@ -ECHO_T = @ECHO_T@ -EGREP = @EGREP@ -ENABLE_UNITTESTS = @ENABLE_UNITTESTS@ -EXEEXT = @EXEEXT@ -GREP = @GREP@ -INSTALL = @INSTALL@ -INSTALL_DATA = @INSTALL_DATA@ -INSTALL_PROGRAM = @INSTALL_PROGRAM@ -INSTALL_SCRIPT = @INSTALL_SCRIPT@ -INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ -IPMIMONITORING_CFLAGS = @IPMIMONITORING_CFLAGS@ -IPMIMONITORING_LIBS = @IPMIMONITORING_LIBS@ -JSON_CFLAGS = @JSON_CFLAGS@ -JSON_LIBS = @JSON_LIBS@ -LDFLAGS = @LDFLAGS@ -LIBCAP_CFLAGS = @LIBCAP_CFLAGS@ -LIBCAP_LIBS = @LIBCAP_LIBS@ -LIBCRYPTO_CFLAGS = @LIBCRYPTO_CFLAGS@ -LIBCRYPTO_LIBS = @LIBCRYPTO_LIBS@ -LIBCURL_CFLAGS = @LIBCURL_CFLAGS@ -LIBCURL_LIBS = @LIBCURL_LIBS@ -LIBMNL_CFLAGS = @LIBMNL_CFLAGS@ -LIBMNL_LIBS = @LIBMNL_LIBS@ -LIBMONGOC_CFLAGS = @LIBMONGOC_CFLAGS@ -LIBMONGOC_LIBS = @LIBMONGOC_LIBS@ -LIBOBJS = @LIBOBJS@ -LIBS = @LIBS@ -LIBSSL_CFLAGS = @LIBSSL_CFLAGS@ -LIBSSL_LIBS = @LIBSSL_LIBS@ -LTLIBOBJS = @LTLIBOBJS@ -MAINT = @MAINT@ -MAKEINFO = @MAKEINFO@ -MATH_CFLAGS = @MATH_CFLAGS@ -MATH_LIBS = @MATH_LIBS@ -MKDIR_P = @MKDIR_P@ -NFACCT_CFLAGS = @NFACCT_CFLAGS@ -NFACCT_LIBS = @NFACCT_LIBS@ -OBJEXT = @OBJEXT@ -OPTIONAL_CUPS_CFLAGS = @OPTIONAL_CUPS_CFLAGS@ -OPTIONAL_CUPS_LIBS = @OPTIONAL_CUPS_LIBS@ -OPTIONAL_IPMIMONITORING_CFLAGS = @OPTIONAL_IPMIMONITORING_CFLAGS@ -OPTIONAL_IPMIMONITORING_LIBS = @OPTIONAL_IPMIMONITORING_LIBS@ -OPTIONAL_JSONC_LIBS = @OPTIONAL_JSONC_LIBS@ -OPTIONAL_JUDY_LIBS = @OPTIONAL_JUDY_LIBS@ -OPTIONAL_KINESIS_CFLAGS = @OPTIONAL_KINESIS_CFLAGS@ -OPTIONAL_KINESIS_LIBS = @OPTIONAL_KINESIS_LIBS@ -OPTIONAL_LIBCAP_CFLAGS = @OPTIONAL_LIBCAP_CFLAGS@ -OPTIONAL_LIBCAP_LIBS = @OPTIONAL_LIBCAP_LIBS@ -OPTIONAL_LZ4_LIBS = @OPTIONAL_LZ4_LIBS@ -OPTIONAL_MATH_CFLAGS = @OPTIONAL_MATH_CFLAGS@ -OPTIONAL_MATH_LIBS = @OPTIONAL_MATH_LIBS@ -OPTIONAL_MONGOC_CFLAGS = @OPTIONAL_MONGOC_CFLAGS@ -OPTIONAL_MONGOC_LIBS = @OPTIONAL_MONGOC_LIBS@ -OPTIONAL_NFACCT_CFLAGS = @OPTIONAL_NFACCT_CFLAGS@ -OPTIONAL_NFACCT_LIBS = @OPTIONAL_NFACCT_LIBS@ -OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS = @OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS@ -OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS = @OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS@ -OPTIONAL_SSL_LIBS = @OPTIONAL_SSL_LIBS@ -OPTIONAL_UUID_CFLAGS = @OPTIONAL_UUID_CFLAGS@ -OPTIONAL_UUID_LIBS = @OPTIONAL_UUID_LIBS@ -OPTIONAL_UV_LIBS = @OPTIONAL_UV_LIBS@ -OPTIONAL_XENSTAT_CFLAGS = @OPTIONAL_XENSTAT_CFLAGS@ -OPTIONAL_XENSTAT_LIBS = @OPTIONAL_XENSTAT_LIBS@ -OPTIONAL_ZLIB_CFLAGS = @OPTIONAL_ZLIB_CFLAGS@ -OPTIONAL_ZLIB_LIBS = @OPTIONAL_ZLIB_LIBS@ -PACKAGE = @PACKAGE@ -PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@ -PACKAGE_NAME = @PACKAGE_NAME@ -PACKAGE_RPM_VERSION = @PACKAGE_RPM_VERSION@ -PACKAGE_STRING = @PACKAGE_STRING@ -PACKAGE_TARNAME = @PACKAGE_TARNAME@ -PACKAGE_URL = @PACKAGE_URL@ -PACKAGE_VERSION = @PACKAGE_VERSION@ -PATH_SEPARATOR = @PATH_SEPARATOR@ -PKG_CONFIG = @PKG_CONFIG@ -PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@ -PKG_CONFIG_PATH = @PKG_CONFIG_PATH@ -PROTOBUF_CFLAGS = @PROTOBUF_CFLAGS@ -PROTOBUF_LIBS = @PROTOBUF_LIBS@ -PROTOC = @PROTOC@ -PTHREAD_CC = @PTHREAD_CC@ -PTHREAD_CFLAGS = @PTHREAD_CFLAGS@ -PTHREAD_LIBS = @PTHREAD_LIBS@ -SET_MAKE = @SET_MAKE@ -SHELL = @SHELL@ -SSE_CANDIDATE = @SSE_CANDIDATE@ -STRIP = @STRIP@ -TEST_CFLAGS = @TEST_CFLAGS@ -TEST_LIBS = @TEST_LIBS@ -UUID_CFLAGS = @UUID_CFLAGS@ -UUID_LIBS = @UUID_LIBS@ -VERSION = @VERSION@ -XENLIGHT_CFLAGS = @XENLIGHT_CFLAGS@ -XENLIGHT_LIBS = @XENLIGHT_LIBS@ -YAJL_CFLAGS = @YAJL_CFLAGS@ -YAJL_LIBS = @YAJL_LIBS@ -ZLIB_CFLAGS = @ZLIB_CFLAGS@ -ZLIB_LIBS = @ZLIB_LIBS@ -abs_builddir = @abs_builddir@ -abs_srcdir = @abs_srcdir@ -abs_top_builddir = @abs_top_builddir@ -abs_top_srcdir = @abs_top_srcdir@ -ac_ct_CC = @ac_ct_CC@ -ac_ct_CXX = @ac_ct_CXX@ -am__include = @am__include@ -am__leading_dot = @am__leading_dot@ -am__quote = @am__quote@ -am__tar = @am__tar@ -am__untar = @am__untar@ -ax_pthread_config = @ax_pthread_config@ -bindir = @bindir@ -build = @build@ -build_alias = @build_alias@ -build_cpu = @build_cpu@ -build_os = @build_os@ -build_target = @build_target@ -build_vendor = @build_vendor@ -builddir = @builddir@ -cachedir = @cachedir@ -chartsdir = @chartsdir@ -configdir = @configdir@ -datadir = @datadir@ -datarootdir = @datarootdir@ -docdir = @docdir@ -dvidir = @dvidir@ -exec_prefix = @exec_prefix@ -has_jemalloc = @has_jemalloc@ -has_tcmalloc = @has_tcmalloc@ -host = @host@ -host_alias = @host_alias@ -host_cpu = @host_cpu@ -host_os = @host_os@ -host_vendor = @host_vendor@ -htmldir = @htmldir@ -includedir = @includedir@ -infodir = @infodir@ -install_sh = @install_sh@ -libconfigdir = @libconfigdir@ -libdir = @libdir@ -libexecdir = @libexecdir@ -localedir = @localedir@ -localstatedir = @localstatedir@ -logdir = @logdir@ -mandir = @mandir@ -mkdir_p = @mkdir_p@ -nodedir = @nodedir@ -oldincludedir = @oldincludedir@ -pdfdir = @pdfdir@ -pluginsdir = @pluginsdir@ -prefix = @prefix@ -program_transform_name = @program_transform_name@ -psdir = @psdir@ -pythondir = @pythondir@ -registrydir = @registrydir@ -runstatedir = @runstatedir@ -sbindir = @sbindir@ -sharedstatedir = @sharedstatedir@ -srcdir = @srcdir@ -sysconfdir = @sysconfdir@ -target_alias = @target_alias@ -top_build_prefix = @top_build_prefix@ -top_builddir = @top_builddir@ -top_srcdir = @top_srcdir@ -varlibdir = @varlibdir@ -webdir = @webdir@ -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in -dist_noinst_DATA = \ - README.md \ - $(NULL) - -all: all-am - -.SUFFIXES: -$(srcdir)/Makefile.in: @MAINTAINER_MODE_TRUE@ $(srcdir)/Makefile.am $(am__configure_deps) - @for dep in $?; do \ - case '$(am__configure_deps)' in \ - *$$dep*) \ - ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \ - && { if test -f $@; then exit 0; else break; fi; }; \ - exit 1;; \ - esac; \ - done; \ - echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu database/engine/Makefile'; \ - $(am__cd) $(top_srcdir) && \ - $(AUTOMAKE) --gnu database/engine/Makefile -Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status - @case '$?' in \ - *config.status*) \ - cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \ - *) \ - echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \ - cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \ - esac; - -$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES) - cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh - -$(top_srcdir)/configure: @MAINTAINER_MODE_TRUE@ $(am__configure_deps) - cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh -$(ACLOCAL_M4): @MAINTAINER_MODE_TRUE@ $(am__aclocal_m4_deps) - cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh -$(am__aclocal_m4_deps): -tags TAGS: - -ctags CTAGS: - -cscope cscopelist: - - -distdir: $(DISTFILES) - @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ - topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ - list='$(DISTFILES)'; \ - dist_files=`for file in $$list; do echo $$file; done | \ - sed -e "s|^$$srcdirstrip/||;t" \ - -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \ - case $$dist_files in \ - */*) $(MKDIR_P) `echo "$$dist_files" | \ - sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \ - sort -u` ;; \ - esac; \ - for file in $$dist_files; do \ - if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \ - if test -d $$d/$$file; then \ - dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \ - if test -d "$(distdir)/$$file"; then \ - find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ - fi; \ - if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \ - cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \ - find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ - fi; \ - cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \ - else \ - test -f "$(distdir)/$$file" \ - || cp -p $$d/$$file "$(distdir)/$$file" \ - || exit 1; \ - fi; \ - done -check-am: all-am -check: check-am -all-am: Makefile $(DATA) -installdirs: -install: install-am -install-exec: install-exec-am -install-data: install-data-am -uninstall: uninstall-am - -install-am: all-am - @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am - -installcheck: installcheck-am -install-strip: - if test -z '$(STRIP)'; then \ - $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ - install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ - install; \ - else \ - $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ - install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ - "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \ - fi -mostlyclean-generic: - -clean-generic: - -distclean-generic: - -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES) - -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES) - -maintainer-clean-generic: - @echo "This command is intended for maintainers to use" - @echo "it deletes files that may require special tools to rebuild." - -test -z "$(MAINTAINERCLEANFILES)" || rm -f $(MAINTAINERCLEANFILES) -clean: clean-am - -clean-am: clean-generic mostlyclean-am - -distclean: distclean-am - -rm -f Makefile -distclean-am: clean-am distclean-generic - -dvi: dvi-am - -dvi-am: - -html: html-am - -html-am: - -info: info-am - -info-am: - -install-data-am: - -install-dvi: install-dvi-am - -install-dvi-am: - -install-exec-am: - -install-html: install-html-am - -install-html-am: - -install-info: install-info-am - -install-info-am: - -install-man: - -install-pdf: install-pdf-am - -install-pdf-am: - -install-ps: install-ps-am - -install-ps-am: - -installcheck-am: - -maintainer-clean: maintainer-clean-am - -rm -f Makefile -maintainer-clean-am: distclean-am maintainer-clean-generic - -mostlyclean: mostlyclean-am - -mostlyclean-am: mostlyclean-generic - -pdf: pdf-am - -pdf-am: - -ps: ps-am - -ps-am: - -uninstall-am: - -.MAKE: install-am install-strip - -.PHONY: all all-am check check-am clean clean-generic cscopelist-am \ - ctags-am distclean distclean-generic distdir dvi dvi-am html \ - html-am info info-am install install-am install-data \ - install-data-am install-dvi install-dvi-am install-exec \ - install-exec-am install-html install-html-am install-info \ - install-info-am install-man install-pdf install-pdf-am \ - install-ps install-ps-am install-strip installcheck \ - installcheck-am installdirs maintainer-clean \ - maintainer-clean-generic mostlyclean mostlyclean-generic pdf \ - pdf-am ps ps-am tags-am uninstall uninstall-am - -.PRECIOUS: Makefile - - -# Tell versions [3.59,3.63) of GNU make to not export all variables. -# Otherwise a system limit (for SysV at least) may be exceeded. -.NOEXPORT: diff --git a/database/engine/README.md b/database/engine/README.md index 78f3b15ec..191366a46 100644 --- a/database/engine/README.md +++ b/database/engine/README.md @@ -1,93 +1,102 @@ -# Database engine - -The Database Engine works like a traditional database. There is some amount of RAM dedicated to data caching and -indexing and the rest of the data reside compressed on disk. The number of history entries is not fixed in this case, -but depends on the configured disk space and the effective compression ratio of the data stored. This is the **only -mode** that supports changing the data collection update frequency (`update_every`) **without losing** the previously -stored metrics. - -## Files + -With the DB engine memory mode the metric data are stored in database files. These files are organized in pairs, the -datafiles and their corresponding journalfiles, e.g.: +# Database engine -```sh -datafile-1-0000000001.ndf -journalfile-1-0000000001.njf -datafile-1-0000000002.ndf -journalfile-1-0000000002.njf -datafile-1-0000000003.ndf -journalfile-1-0000000003.njf -... -``` +The Database Engine works like a traditional database. It dedicates a certain amount of RAM to data caching and +indexing, while the rest of the data resides compressed on disk. Unlike other [memory modes](/database/README.md), the +amount of historical metrics stored is based on the amount of disk space you allocate and the effective compression +ratio, not a fixed number of metrics collected. -They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default -location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user -can safely delete some pairs of files when Netdata is stopped to manually free up some space. +By using both RAM and disk space, the database engine allows for long-term storage of per-second metrics inside of the +Agent itself. -_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._ +In addition, the database engine is the only memory mode that supports changing the data collection update frequency +(`update_every`) without losing the metrics your Agent already gathered and stored. ## Configuration -There is one DB engine instance per Netdata host/node. That is, there is one `./dbengine` folder per node, and all -charts of `dbengine` memory mode in such a host share the same storage space and DB engine instance memory state. You -can select the memory mode for localhost by editing netdata.conf and setting: +To use the database engine, open `netdata.conf` and set `memory mode` to `dbengine`. ```conf [global] memory mode = dbengine ``` -For setting the memory mode for the rest of the nodes you should look at -[streaming](../../streaming/). +To configure the database engine, look for the `page cache size` and `dbengine multihost disk space` settings in the +`[global]` section of your `netdata.conf`. The Agent ignores the `history` setting when using the database engine. -The `history` configuration option is meaningless for `memory mode = dbengine` and is ignored for any metrics being -stored in the DB engine. +```conf +[global] + page cache size = 32 + dbengine multihost disk space = 256 +``` + +The above values are the default values for Page Cache size and DB engine disk space quota. Both numbers are +in **MiB**. + +The `page cache size` option determines the amount of RAM in **MiB** dedicated to caching Netdata metric values. The +actual page cache size will be slightly larger than this figure—see the [memory requirements](#memory-requirements) +section for details. -All DB engine instances, for localhost and all other streaming recipient nodes inherit their configuration from -`netdata.conf`: +The `dbengine multihost disk space` option determines the amount of disk space in **MiB** that is dedicated to storing +Netdata metric values and all related metadata describing them. You can use the [**database engine +calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics) +to correctly set `dbengine multihost disk space` based on your metrics retention policy. The calculator gives an +accurate estimate based on how many child nodes you have, how many metrics your Agent collects, and more. + +### Legacy configuration + +The deprecated `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated to storing +Netdata metric values per legacy database engine instance (see [details on the legacy mode](#legacy-mode) below). ```conf [global] - page cache size = 32 dbengine disk space = 256 ``` -The above values are the default and minimum values for Page Cache size and DB engine disk space quota. Both numbers are -in **MiB**. All DB engine instances will allocate the configured resources separately. +### Streaming metrics to the database engine -The `page cache size` option determines the amount of RAM in **MiB** that is dedicated to caching Netdata metric values -themselves as far as queries are concerned. The total page cache size will be greater since data collection itself will -consume additional memory as is described in the [Memory requirements](#memory-requirements) section. +When using the multihost database engine, all parent and child nodes share the same `page cache size` and `dbengine +multihost disk space` in a single dbengine instance. The [**database engine +calculator**](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics) +helps you properly set `page cache size` and `dbengine multihost disk space` on your parent node to allocate enough +resources based on your metrics retention policy and how many child nodes you have. -The `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated to storing Netdata -metric values and all related metadata describing them. +#### Legacy mode -## Operation +_For Netdata Agents earlier than v1.23.2_, the Agent on the parent node uses one dbengine instance for itself, and +another instance for every child node it receives metrics from. If you had four streaming nodes, you would have five +instances in total (`1 parent + 4 child nodes = 5 instances`). -The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store -consecutive values generated from the data collectors. Those pages comprise the **Page Cache**. +The Agent allocates resources for each instance separately using the `dbengine disk space` (**deprecated**) setting. If +`dbengine disk space`(**deprecated**) is set to the default `256`, each instance is given 256 MiB in disk space, which +means the total disk space required to store all instances is, roughly, `256 MiB * 1 parent * 4 child nodes = 1280 MiB`. -When those pages fill up they are slowly compressed and flushed to disk. It can take `4096 / 4 = 1024 seconds = 17 -minutes`, for a chart dimension that is being collected every 1 second, to fill a page. Pages can be cut short when we -stop Netdata or the DB engine instance so as to not lose the data. When we query the DB engine for data we trigger disk -read I/O requests that fill the Page Cache with the requested pages and potentially evict cold (not recently used) -pages. +#### Backward compatibility -When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting -the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated -and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time. +All existing metrics belonging to child nodes are automatically converted to legacy dbengine instances and the localhost +metrics are transferred to the multihost dbengine instance. -The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O -traffic so as to create the minimum possible interference with other applications. +All new child nodes are automatically transferred to the multihost dbengine instance and share its page cache and disk +space. If you want to migrate a child node from its legacy dbengine instance to the multihost dbengine instance, you +must delete the instance's directory, which is located in `/var/cache/netdata/MACHINE_GUID/dbengine`, after stopping the +Agent. + +##### Information + +For more information about setting `memory mode` on your nodes, in addition to other streaming configurations, see +[streaming](/streaming/README.md). -## Memory requirements +### Memory requirements Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that is much larger than the available memory. -There are explicit memory requirements **per** DB engine **instance**, meaning **per** Netdata **node** (e.g. localhost -and streaming recipient nodes): +There are explicit memory requirements **per** DB engine **instance**: - The total page cache memory footprint will be an additional `#dimensions-being-collected x 4096 x 2` bytes over what the user configured with `page cache size`. @@ -99,18 +108,30 @@ and streaming recipient nodes): - for very highly compressible data (compression ratio > 90%) this RAM overhead is comparable to the disk space footprint. -An important observation is that RAM usage depends on both the `page cache size` and the `dbengine disk space` options. +An important observation is that RAM usage depends on both the `page cache size` and the `dbengine multihost disk space` +options. -## File descriptor requirements +You can use our [database engine +calculator](/docs/store/change-metrics-storage.md#calculate-the-system-resources-RAM-disk-space-needed-to-store-metrics) +to validate the memory requirements for your particular system(s) and configuration (**out-of-date**). -The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming slave or master -server). When configuring your system you should make sure there are at least 50 file descriptors available per +### Disk space requirements + +There are explicit disk space requirements **per** DB engine **instance**: + +- The total disk space footprint will be the maximum between `#dimensions-being-collected x 4096 x 2` bytes or what + the user configured with `dbengine multihost disk space` or `dbengine disk space`. + +### File descriptor requirements + +The Database Engine may keep a **significant** amount of files open per instance (e.g. per streaming child or +parent server). When configuring your system you should make sure there are at least 50 file descriptors available per `dbengine` instance. Netdata allocates 25% of the available file descriptors to its Database Engine instances. This means that only 25% of the file descriptors that are available to the Netdata service are accessible by dbengine instances. You should take that into account when configuring your service or system-wide file descriptor limits. You can roughly estimate that the -Netdata service needs 2048 file descriptors for every 10 streaming slave hosts when streaming is configured to use +Netdata service needs 2048 file descriptors for every 10 streaming child hosts when streaming is configured to use `memory mode = dbengine`. If for example one wants to allocate 65536 file descriptors to the Netdata service on a systemd system one needs to @@ -143,12 +164,53 @@ kern.maxfiles=65536 You can apply the settings by running `sysctl -p` or by rebooting. +## Files + +With the DB engine memory mode the metric data are stored in database files. These files are organized in pairs, the +datafiles and their corresponding journalfiles, e.g.: + +```sh +datafile-1-0000000001.ndf +journalfile-1-0000000001.njf +datafile-1-0000000002.ndf +journalfile-1-0000000002.njf +datafile-1-0000000003.ndf +journalfile-1-0000000003.njf +... +``` + +They are located under their host's cache directory in the directory `./dbengine` (e.g. for localhost the default +location is `/var/cache/netdata/dbengine/*`). The higher numbered filenames contain more recent metric data. The user +can safely delete some pairs of files when Netdata is stopped to manually free up some space. + +_Users should_ **back up** _their `./dbengine` folders if they consider this data to be important._ You can also set up +one or more [exporting connectors](/exporting/README.md) to send your Netdata metrics to other databases for long-term +storage at lower granularity. + +## Operation + +The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets its own page to store +consecutive values generated from the data collectors. Those pages comprise the **Page Cache**. + +When those pages fill up they are slowly compressed and flushed to disk. It can take `4096 / 4 = 1024 seconds = 17 +minutes`, for a chart dimension that is being collected every 1 second, to fill a page. Pages can be cut short when we +stop Netdata or the DB engine instance so as to not lose the data. When we query the DB engine for data we trigger disk +read I/O requests that fill the Page Cache with the requested pages and potentially evict cold (not recently used) +pages. + +When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by automatically deleting +the oldest datafile and journalfile pair. Any corresponding pages residing in the Page Cache will also be invalidated +and removed. The DB engine logic will try to maintain between 10 and 20 file pairs at any point in time. + +The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not generate excessive I/O +traffic so as to create the minimum possible interference with other applications. + ## Evaluation -We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the -web API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a -netdata master server. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a -Seagate Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD. +We have evaluated the performance of the `dbengine` API that the netdata daemon uses internally. This is **not** the web +API of netdata. Our benchmarks ran on a **single** `dbengine` instance, multiple of which can be running in a Netdata +parent node. We used a server with an AMD Ryzen Threadripper 2950X 16-Core Processor and 2 disk drives, a Seagate +Constellation ES.3 2TB magnetic HDD and a SAMSUNG MZQLB960HAJR-00007 960GB NAND Flash SSD. For our workload, we defined 32 charts with 128 metrics each, giving us a total of 4096 metrics. We defined 1 worker thread per chart (32 threads) that generates new data points with a data generation interval of 1 second. The time axis @@ -170,10 +232,10 @@ so as to avoid all disk bottlenecks. The reported numbers are the following: | device | page cache | dataset | reads/sec | writes/sec | -| :---: | :---: | ---: | ---: | ---: | -| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M | -| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M | -| N/A | 16 GiB | 6.8 GiB |118.2M | 30.2M | +| :----: | :--------: | ------: | --------: | ---------: | +| HDD | 64 MiB | 4.1 GiB | 813K | 18.0M | +| SSD | 64 MiB | 9.8 GiB | 1.7M | 43.0M | +| N/A | 16 GiB | 6.8 GiB | 118.2M | 30.2M | where "reads/sec" is the number of metric data points being read from the database via its API per second and "writes/sec" is the number of metric data points being written to the database per second. diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 8ef4ed599..7a052f963 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -30,7 +30,7 @@ void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_dataf struct rrdengine_datafile *next; next = datafile->next; - assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); + fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); ctx->datafiles.first = next; } @@ -38,7 +38,7 @@ void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_dataf static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) { - assert(tier == 1); + fatal_assert(tier == 1); datafile->tier = tier; datafile->fileno = fileno; datafile->file = (uv_file)0; @@ -75,6 +75,27 @@ int close_data_file(struct rrdengine_datafile *datafile) return ret; } +int unlink_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.datafile_deletions; + + return ret; +} int destroy_data_file(struct rrdengine_datafile *datafile) { @@ -146,7 +167,7 @@ int create_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - assert(req.result < 0); + fatal_assert(req.result < 0); error("uv_fs_write: %s", uv_strerror(ret)); ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); @@ -184,7 +205,7 @@ static int check_data_file_superblock(uv_file file) uv_fs_req_cleanup(&req); goto error; } - assert(req.result >= 0); + fatal_assert(req.result >= 0); uv_fs_req_cleanup(&req); if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || @@ -271,7 +292,7 @@ static int scan_data_files(struct rrdengine_instance *ctx) ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); if (ret < 0) { - assert(req.result < 0); + fatal_assert(req.result < 0); uv_fs_req_cleanup(&req); error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret)); ++ctx->stats.fs_errors; @@ -305,33 +326,47 @@ static int scan_data_files(struct rrdengine_instance *ctx) ctx->last_fileno = datafiles[matched_files - 1]->fileno; for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + uint8_t must_delete_pair = 0; + datafile = datafiles[i]; ret = load_data_file(datafile); if (0 != ret) { - freez(datafile); - ++failed_to_load; - break; + must_delete_pair = 1; } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = load_journal_file(ctx, journalfile, datafile); if (0 != ret) { - close_data_file(datafile); - freez(datafile); + if (!must_delete_pair) /* If datafile is still open close it */ + close_data_file(datafile); + must_delete_pair = 1; + } + if (must_delete_pair) { + char path[RRDENG_PATH_MAX]; + + error("Deleting invalid data and journal file pair."); + ret = unlink_journal_file(journalfile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Deleted journal file \"%s\".", path); + } + ret = unlink_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Deleted data file \"%s\".", path); + } freez(journalfile); + freez(datafile); ++failed_to_load; - break; + continue; } + datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; } + matched_files -= failed_to_load; freez(datafiles); - if (failed_to_load) { - error("%u datafiles failed to load.", failed_to_load); - finalize_data_files(ctx); - return UV_EIO; - } return matched_files; } diff --git a/database/engine/datafile.h b/database/engine/datafile.h index eeb11310b..ae94bfdd0 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -14,7 +14,7 @@ struct rrdengine_instance; #define DATAFILE_EXTENSION ".ndf" #define MAX_DATAFILE_SIZE (1073741824LU) -#define MIN_DATAFILE_SIZE (16777216LU) +#define MIN_DATAFILE_SIZE (4194304LU) #define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ #define TARGET_DATAFILES (20) @@ -57,6 +57,7 @@ extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengin extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); extern int close_data_file(struct rrdengine_datafile *datafile); +extern int unlink_data_file(struct rrdengine_datafile *datafile); extern int destroy_data_file(struct rrdengine_datafile *datafile); extern int create_data_file(struct rrdengine_datafile *datafile); extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index fac680aa0..9fecc48ff 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -52,7 +52,7 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) io_descr->iov = uv_buf_init((void *)io_descr->buf, size); ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1, journalfile->pos, flush_transaction_buffer_cb); - assert (-1 != ret); + fatal_assert(-1 != ret); journalfile->pos += RRDENG_BLOCK_SIZE; ctx->disk_space += RRDENG_BLOCK_SIZE; ctx->commit_log.buf = NULL; @@ -64,9 +64,9 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s { struct rrdengine_instance *ctx = wc->ctx; int ret; - unsigned buf_pos, buf_size; + unsigned buf_pos = 0, buf_size; - assert(size); + fatal_assert(size); if (ctx->commit_log.buf) { unsigned remaining; @@ -125,6 +125,29 @@ int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengi return ret; } +int unlink_journal_file(struct rrdengine_journalfile *journalfile) +{ + struct rrdengine_datafile *datafile = journalfile->datafile; + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return ret; +} + int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; @@ -194,7 +217,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - assert(req.result < 0); + fatal_assert(req.result < 0); error("uv_fs_write: %s", uv_strerror(ret)); ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); @@ -232,7 +255,7 @@ static int check_journal_file_superblock(uv_file file) uv_fs_req_cleanup(&req); goto error; } - assert(req.result >= 0); + fatal_assert(req.result >= 0); uv_fs_req_cleanup(&req); if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || @@ -275,7 +298,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden for (i = 0, valid_pages = 0 ; i < count ; ++i) { uuid_t *temp_id; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; if (PAGE_METRICS != jf_metric_data->descr[i].type) { error("Unknown page type encountered."); @@ -293,7 +316,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden /* First time we see the UUID */ uv_rwlock_wrlock(&pg_cache->metrics_index.lock); PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); - assert(NULL == *PValue); /* TODO: figure out concurrency model */ + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ *PValue = page_index = create_page_index(temp_id); page_index->prev = pg_cache->metrics_index.last_page_index; pg_cache->metrics_index.last_page_index = page_index; @@ -408,7 +431,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde fatal("uv_fs_read: %s", uv_strerror(ret)); /*uv_fs_req_cleanup(&req);*/ } - assert(req.result >= 0); + fatal_assert(req.result >= 0); uv_fs_req_cleanup(&req); ctx->stats.io_read_bytes += size_bytes; ++ctx->stats.io_read_requests; diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h index 0df66304d..f6c43cd16 100644 --- a/database/engine/journalfile.h +++ b/database/engine/journalfile.h @@ -38,6 +38,7 @@ extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct r extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern int unlink_journal_file(struct rrdengine_journalfile *journalfile); extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, diff --git a/database/engine/metadata_log/Makefile.am b/database/engine/metadata_log/Makefile.am new file mode 100644 index 000000000..161784b8f --- /dev/null +++ b/database/engine/metadata_log/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/database/engine/metadata_log/README.md b/database/engine/metadata_log/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/database/engine/metadata_log/compaction.c b/database/engine/metadata_log/compaction.c new file mode 100644 index 000000000..ba19e1edf --- /dev/null +++ b/database/engine/metadata_log/compaction.c @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" + +/* Return 0 on success. */ +int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles, + unsigned *matched_files) +{ + int ret; + unsigned starting_fileno, fileno, i, j, recovered_files; + struct metadata_logfile *metalogfile = NULL, *compactionfile = NULL, **tmp_metalogfiles; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + for (i = 0 ; i < *matched_files ; ++i) { + metalogfile = metalogfiles[i]; + if (0 == metalogfile->starting_fileno) + continue; /* skip standard metadata log files */ + break; /* this is a compaction temporary file */ + } + if (i == *matched_files) /* no recovery needed */ + return 0; + info("Starting metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + + if (*matched_files - i > 1) { /* Can't have more than 1 temporary compaction files */ + error("Metadata log files are in an invalid state. Cannot proceed."); + return 1; + } + compactionfile = metalogfile; + starting_fileno = compactionfile->starting_fileno; + fileno = compactionfile->fileno; + /* scratchpad space to move file pointers around */ + tmp_metalogfiles = callocz(*matched_files, sizeof(*tmp_metalogfiles)); + + for (j = 0, recovered_files = 0 ; j < i ; ++j) { + metalogfile = metalogfiles[j]; + fatal_assert(0 == metalogfile->starting_fileno); + if (metalogfile->fileno < starting_fileno) { + tmp_metalogfiles[recovered_files++] = metalogfile; + continue; + } + break; /* reached compaction file serial number */ + } + + if ((j == i) /* Shouldn't be possible, invalid compaction temporary file */ || + (metalogfile->fileno == starting_fileno && metalogfile->fileno == fileno)) { + error("Deleting invalid compaction temporary file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, starting_fileno, fileno); + unlink_metadata_logfile(compactionfile); + freez(compactionfile); + freez(tmp_metalogfiles); + --*matched_files; /* delete the last one */ + + info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + return 0; + } + + for ( ; j < i ; ++j) { /* continue iterating through normal metadata log files */ + metalogfile = metalogfiles[j]; + fatal_assert(0 == metalogfile->starting_fileno); + if (metalogfile->fileno < fileno) { /* It has already been compacted */ + error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, 0U, metalogfile->fileno); + unlink_metadata_logfile(metalogfile); + freez(metalogfile); + continue; + } + tmp_metalogfiles[recovered_files++] = metalogfile; + } + + /* compaction temporary file is valid */ + tmp_metalogfiles[recovered_files++] = compactionfile; + ret = rename_metadata_logfile(compactionfile, 0, starting_fileno); + if (ret < 0) { + error("Cannot rename temporary compaction files. Cannot proceed."); + freez(tmp_metalogfiles); + return 1; + } + + memcpy(metalogfiles, tmp_metalogfiles, recovered_files * sizeof(*metalogfiles)); + *matched_files = recovered_files; + freez(tmp_metalogfiles); + + info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + return 0; +} diff --git a/database/engine/metadata_log/compaction.h b/database/engine/metadata_log/compaction.h new file mode 100644 index 000000000..d04613440 --- /dev/null +++ b/database/engine/metadata_log/compaction.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_COMPACTION_H +#define NETDATA_COMPACTION_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include "../rrdengine.h" + +extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles, + unsigned *matched_files); + +#endif /* NETDATA_COMPACTION_H */ diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c new file mode 100644 index 000000000..b7c5c0618 --- /dev/null +++ b/database/engine/metadata_log/logfile.c @@ -0,0 +1,453 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include +#include "metadatalog.h" +#include "metalogpluginsd.h" + + +void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION, + metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); +} + +void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno, + unsigned fileno) +{ + metalogfile->starting_fileno = starting_fileno; + metalogfile->fileno = fileno; + metalogfile->file = (uv_file)0; + metalogfile->pos = 0; + metalogfile->next = NULL; + metalogfile->ctx = ctx; +} + +int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno) +{ + //struct metalog_instance *ctx = metalogfile->ctx; + uv_fs_t req; + int ret; + char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX]; + unsigned backup_starting_fileno, backup_fileno; + + backup_starting_fileno = metalogfile->starting_fileno; + backup_fileno = metalogfile->fileno; + generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath)); + metalogfile->starting_fileno = new_starting_fileno; + metalogfile->fileno = new_fileno; + generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath)); + + info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath); + ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL); + if (ret < 0) { + error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret)); + //++ctx->stats.fs_errors; /* this is racy, may miss some errors */ + rrd_stat_atomic_add(&global_fs_errors, 1); + /* restore previous values */ + metalogfile->starting_fileno = backup_starting_fileno; + metalogfile->fileno = backup_fileno; + } + uv_fs_req_cleanup(&req); + + return ret; +} + +int unlink_metadata_logfile(struct metadata_logfile *metalogfile) +{ + //struct metalog_instance *ctx = metalogfile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_metadata_logfile_path(metalogfile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + +static int check_metadata_logfile_superblock(uv_file file) +{ + int ret; + struct rrdeng_metalog_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + if (superblock->version > RRDENG_METALOG_VER) { + error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version); + } +error: + free(superblock); + return ret; +} + +void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload) +{ + struct metalog_instance *ctx = metalogfile->ctx; + char *line, *nextline, *record_end; + int ret; + + debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload); + record_end = (char *)payload + header->payload_length - 1; + *record_end = '\0'; + + for (line = payload ; line ; line = nextline) { + nextline = strchr(line, '\n'); + if (nextline) { + *nextline++ = '\0'; + } + ret = parser_action(ctx->metalog_parser_object->parser, line); + debug(D_METADATALOG, "parser_action ret:%d", ret); + if (ret) + return; /* skip record due to error */ + }; +} + +/* This function only works with buffered I/O */ +static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset) +{ +// struct metalog_instance *ctx; + uv_file file; + uv_buf_t iov; + uv_fs_t req; + int ret; + +// ctx = metalogfile->ctx; + file = metalogfile->file; + iov = uv_buf_init(buf, len); + ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL); + if (unlikely(ret < 0 && ret != req.result)) { + fatal("uv_fs_read: %s", uv_strerror(ret)); + } + if (req.result < 0) { +// ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__, + uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno); + } + uv_fs_req_cleanup(&req); +// ctx->stats.io_read_bytes += len; +// ++ctx->stats.io_read_requests; + + return ret; +} + +/* Return 0 on success */ +static int metadata_record_integrity_check(void *record) +{ + int ret; + uint32_t data_size; + struct rrdeng_metalog_record_header *header; + struct rrdeng_metalog_record_trailer *trailer; + uLong crc; + + header = record; + data_size = header->header_length + header->payload_length; + trailer = record + data_size; + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, record, data_size); + ret = crc32cmp(trailer->checksum, crc); + + return ret; +} + +#define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */ + +/* + * Iterates metadata log file records and creates database objects (host/chart/dimension) + */ +static void iterate_records(struct metadata_logfile *metalogfile) +{ + uint32_t file_size, pos, bytes_remaining, record_size; + void *buf; + struct rrdeng_metalog_record_header *header; + struct metalog_instance *ctx = metalogfile->ctx; + struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private; + const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) + + sizeof(header->header_length); + + file_size = metalogfile->pos; + state->metalogfile = metalogfile; + + buf = mallocz(MAX_READ_BYTES); + + for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) { + bytes_remaining = file_size - pos; + if (bytes_remaining < min_header_size) { + error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + break; + } + if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0) + break; + header = (struct rrdeng_metalog_record_header *)buf; + if (METALOG_STORE_PADDING == header->type) { + info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos; + continue; + } + if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size, + pos + min_header_size) < 0) + break; + record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer); + if (header->header_length < min_header_size || record_size > bytes_remaining) { + error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + break; + } + if (record_size > MAX_READ_BYTES) { + error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size, + metalogfile->starting_fileno, metalogfile->fileno); + continue; + } + if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header), + pos + sizeof(*header)) < 0) + break; + if (metadata_record_integrity_check(buf)) { + error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos); + continue; + } + debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__, + pos); + + replay_record(metalogfile, header, buf + header->header_length); + } + + freez(buf); +} + +int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile) +{ + UNUSED(ctx); + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size; + char path[RRDENG_PATH_MAX]; + + generate_metadata_logfile_path(metalogfile, path, sizeof(path)); + if (file_is_migrated(path)) + return 0; + + fd = open_file_buffered_io(path, O_RDWR, &file); + if (fd < 0) { +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + info("Loading metadata log \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb)); + if (ret) + goto error; + + ret = check_metadata_logfile_superblock(file); + if (ret) + goto error; +// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb); +// ++ctx->stats.io_read_requests; + + metalogfile->file = file; + metalogfile->pos = file_size; + + iterate_records(metalogfile); + + info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size); + add_migrated_file(path, file_size); + return 0; + +error: + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + return error; +} + +static int scan_metalog_files_cmp(const void *a, const void *b) +{ + struct metadata_logfile *file1, *file2; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; + + file1 = *(struct metadata_logfile **)a; + file2 = *(struct metadata_logfile **)b; + generate_metadata_logfile_path(file1, path1, sizeof(path1)); + generate_metadata_logfile_path(file2, path2, sizeof(path2)); + return strcmp(path1, path2); +} + +/* Returns number of metadata logfiles that were loaded or < 0 on error */ +static int scan_metalog_files(struct metalog_instance *ctx) +{ + int ret; + unsigned starting_no, no, matched_files, i, failed_to_load; + static uv_fs_t req; + uv_dirent_t dent; + struct metadata_logfile **metalogfiles, *metalogfile; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } + info("Found %d files in path %s", ret, dbfiles_path); + + metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles)); + for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + info("Scanning file \"%s/%s\"", dbfiles_path, dent.name); + ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no); + if (2 == ret) { + info("Matched file \"%s/%s\"", dbfiles_path, dent.name); + metalogfile = mallocz(sizeof(*metalogfile)); + metadata_logfile_init(metalogfile, ctx, starting_no, no); + metalogfiles[matched_files++] = metalogfile; + } + } + uv_fs_req_cleanup(&req); + + if (0 == matched_files) { + freez(metalogfiles); + return 0; + } + if (matched_files == MAX_DATAFILES) { + error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + } + qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp); + ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files); + if (ret) { /* If the files are corrupted fail */ + for (i = 0 ; i < matched_files ; ++i) { + freez(metalogfiles[i]); + } + freez(metalogfiles); + return UV_EINVAL; + } + //ctx->last_fileno = metalogfiles[matched_files - 1]->fileno; + + struct plugind cd = { + .enabled = 1, + .update_every = 0, + .pid = 0, + .serial_failures = 0, + .successful_collections = 0, + .obsolete = 0, + .started_t = INVALID_TIME, + .next = NULL, + .version = 0, + }; + + struct metalog_pluginsd_state metalog_parser_state; + metalog_pluginsd_state_init(&metalog_parser_state, ctx); + + PARSER_USER_OBJECT metalog_parser_object; + metalog_parser_object.enabled = cd.enabled; + metalog_parser_object.host = ctx->rrdeng_ctx->host; + metalog_parser_object.cd = &cd; + metalog_parser_object.trust_durations = 0; + metalog_parser_object.private = &metalog_parser_state; + + PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT); + if (unlikely(!parser)) { + error("Failed to initialize metadata log parser."); + failed_to_load = matched_files; + goto after_failed_to_parse; + } + parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host); + parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context); + parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone); + parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action; + parser->plugins_action->chart_action = &metalog_pluginsd_chart_action; + parser->plugins_action->guid_action = &metalog_pluginsd_guid_action; + parser->plugins_action->context_action = &metalog_pluginsd_context_action; + parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action; + parser->plugins_action->host_action = &metalog_pluginsd_host_action; + + + metalog_parser_object.parser = parser; + ctx->metalog_parser_object = &metalog_parser_object; + + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + metalogfile = metalogfiles[i]; + db_lock(); + db_execute("BEGIN TRANSACTION;"); + ret = load_metadata_logfile(ctx, metalogfile); + if (0 != ret) { + error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); + unlink_metadata_logfile(metalogfile); + ++failed_to_load; + db_execute("ROLLBACK TRANSACTION;"); + } + else + db_execute("COMMIT TRANSACTION;"); + db_unlock(); + freez(metalogfile); + } + matched_files -= failed_to_load; + debug(D_METADATALOG, "PARSER ended"); + + parser_destroy(parser); + + size_t count __maybe_unused = metalog_parser_object.count; + + debug(D_METADATALOG, "Parsing count=%u", (unsigned)count); +after_failed_to_parse: + + freez(metalogfiles); + + return matched_files; +} + +/* Return 0 on success. */ +int init_metalog_files(struct metalog_instance *ctx) +{ + int ret; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + ret = scan_metalog_files(ctx); + if (ret < 0) { + error("Failed to scan path \"%s\".", dbfiles_path); + return ret; + }/* else if (0 == ret) { + ctx->last_fileno = 1; + }*/ + + return 0; +} diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h new file mode 100644 index 000000000..df12ac714 --- /dev/null +++ b/database/engine/metadata_log/logfile.h @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_LOGFILE_H +#define NETDATA_LOGFILE_H + +#include "metadatalogprotocol.h" +#include "../rrdengine.h" + +/* Forward declarations */ +struct metadata_logfile; +struct metalog_worker_config; + +#define METALOG_PREFIX "metadatalog-" +#define METALOG_EXTENSION ".mlf" + +/* only one event loop is supported for now */ +struct metadata_logfile { + unsigned fileno; /* Starts at 1 */ + unsigned starting_fileno; /* 0 for normal files, staring number during compaction */ + uv_file file; + uint64_t pos; + struct metalog_instance *ctx; + struct metadata_logfile *next; +}; + +struct metadata_logfile_list { + struct metadata_logfile *first; /* oldest */ + struct metadata_logfile *last; /* newest */ +}; + +extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen); +extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, + unsigned new_fileno); +extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile); +extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile); +extern int init_metalog_files(struct metalog_instance *ctx); + + +#endif /* NETDATA_LOGFILE_H */ diff --git a/database/engine/metadata_log/metadatalog.h b/database/engine/metadata_log/metadatalog.h new file mode 100644 index 000000000..b484686de --- /dev/null +++ b/database/engine/metadata_log/metadatalog.h @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOG_H +#define NETDATA_METADATALOG_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include "../rrdengine.h" +#include "metadatalogprotocol.h" +#include "logfile.h" +#include "metadatalogapi.h" +#include "compaction.h" + +/* Forward declerations */ +struct metalog_instance; +struct parser_user_object; + +#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u" +#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u" + +struct metalog_instance { + struct rrdengine_instance *rrdeng_ctx; + struct parser_user_object *metalog_parser_object; + uint8_t initialized; /* set to 1 to mark context initialized */ +}; + +#endif /* NETDATA_METADATALOG_H */ diff --git a/database/engine/metadata_log/metadatalogapi.c b/database/engine/metadata_log/metadatalogapi.c new file mode 100755 index 000000000..b206cca05 --- /dev/null +++ b/database/engine/metadata_log/metadatalogapi.c @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" + +/* + * Returns 0 on success, negative on error + */ +int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx) +{ + struct metalog_instance *ctx; + int error; + + ctx = callocz(1, sizeof(*ctx)); + ctx->initialized = 0; + rrdeng_parent_ctx->metalog_ctx = ctx; + + ctx->rrdeng_ctx = rrdeng_parent_ctx; + error = init_metalog_files(ctx); + if (error) { + goto error_after_init_rrd_files; + } + ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */ + return 0; + +error_after_init_rrd_files: + freez(ctx); + return UV_EIO; +} + +/* This function is called by dbengine rotation logic when the metric has no writers */ +void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid) +{ + uuid_t multihost_uuid; + + delete_dimension_uuid(metric_uuid); + rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid); + delete_dimension_uuid(&multihost_uuid); +} diff --git a/database/engine/metadata_log/metadatalogapi.h b/database/engine/metadata_log/metadatalogapi.h new file mode 100644 index 000000000..d558b9317 --- /dev/null +++ b/database/engine/metadata_log/metadatalogapi.h @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOGAPI_H +#define NETDATA_METADATALOGAPI_H + +extern void metalog_commit_delete_chart(RRDSET *st); +extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid); + +/* must call once before using anything */ +extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx); + +#endif /* NETDATA_METADATALOGAPI_H */ diff --git a/database/engine/metadata_log/metadatalogprotocol.h b/database/engine/metadata_log/metadatalogprotocol.h new file mode 100644 index 000000000..1017213ae --- /dev/null +++ b/database/engine/metadata_log/metadatalogprotocol.h @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOGPROTOCOL_H +#define NETDATA_METADATALOGPROTOCOL_H + +#include "../rrddiskprotocol.h" + +#define RRDENG_METALOG_MAGIC "netdata-metadata-log" + +#define RRDENG_METALOG_VER (1) + +#define RRDENG_METALOG_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + sizeof(uint16_t))) +/* + * Metadata log persistent super-block + */ +struct rrdeng_metalog_sb { + char magic_number[RRDENG_MAGIC_SZ]; + uint16_t version; + uint8_t padding[RRDENG_METALOG_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Metadata log record types + */ +#define METALOG_STORE_PADDING (0) +#define METALOG_CREATE_OBJECT (1) +#define METALOG_DELETE_OBJECT (2) +#define METALOG_OTHER (3) /* reserved */ + +/* + * Metadata log record header + */ +struct rrdeng_metalog_record_header { + /* when set to METALOG_STORE_PADDING jump to start of next block */ + uint8_t type; + + uint16_t header_length; + uint32_t payload_length; + /****************************************************** + * No fields above this point can ever change. * + ****************************************************** + * All fields below this point are subject to change. * + ******************************************************/ +} __attribute__ ((packed)); + +/* + * Metadata log record trailer + */ +struct rrdeng_metalog_record_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +#endif /* NETDATA_METADATALOGPROTOCOL_H */ diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c new file mode 100755 index 000000000..88c1453a9 --- /dev/null +++ b/database/engine/metadata_log/metalogpluginsd.c @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" +#include "metalogpluginsd.h" + +extern struct config stream_config; + +PARSER_RC metalog_pluginsd_host_action( + void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, + char *tags) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0); + if (host) { + if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) { + error("Archived host '%s' has memory mode '%s', but the archived one is '%s'. Ignoring archived state.", + host->hostname, rrd_memory_mode_name(host->rrd_memory_mode), + rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE)); + ((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */ + } + ((PARSER_USER_OBJECT *) user)->host = host; + return PARSER_RC_OK; + } + + if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) { + ((PARSER_USER_OBJECT *) user)->host = host; + return PARSER_RC_OK; + } + + if (likely(!uuid_parse(machine_guid, state->host_uuid))) { + int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags); + if (unlikely(rc)) { + errno = 0; + error("Failed to store host %s with UUID %s in the database", hostname, machine_guid); + } + } + else { + errno = 0; + error("Host machine GUID %s is not valid", machine_guid); + } + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, + char *title, char *units, char *plugin, char *module, int priority, + int update_every, RRDSET_TYPE chart_type, char *options) +{ + UNUSED(options); + + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + + if (unlikely(uuid_is_null(state->host_uuid))) { + debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host."); + return PARSER_RC_OK; + } + uuid_copy(state->chart_uuid, state->uuid); + uuid_clear(state->uuid); /* Consume UUID */ + (void) sql_store_chart(&state->chart_uuid, &state->host_uuid, + type, id, name, family, context, title, units, + plugin, module, priority, update_every, + chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1); + ((PARSER_USER_OBJECT *)user)->st_exists = 1; + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, + long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + UNUSED(user); + UNUSED(options); + UNUSED(algorithm); + UNUSED(st); + + if (unlikely(uuid_is_null(state->chart_uuid))) { + debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart."); + info("Ignoring dimension belonging to missing or ignored chart."); + return PARSER_RC_OK; + } + + if (unlikely(uuid_is_null(state->uuid))) { + debug(D_METADATALOG, "Ignoring dimension without unknown UUID"); + info("Ignoring dimension without unknown UUID"); + return PARSER_RC_OK; + } + + (void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type); + uuid_clear(state->uuid); /* Consume UUID */ + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + uuid_copy(state->uuid, *uuid); + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + int rc = find_uuid_type(uuid); + + if (rc == 1) { + uuid_copy(state->host_uuid, *uuid); + ((PARSER_USER_OBJECT *)user)->st_exists = 0; + ((PARSER_USER_OBJECT *)user)->host_exists = 1; + } else if (rc == 2) { + uuid_copy(state->chart_uuid, *uuid); + ((PARSER_USER_OBJECT *)user)->st_exists = 1; + } else + uuid_copy(state->uuid, *uuid); + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid) +{ + UNUSED(user); + UNUSED(uuid); + + return PARSER_RC_OK; +} + +void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx) +{ + state->ctx = ctx; + state->skip_record = 0; + uuid_clear(state->uuid); + state->metalogfile = NULL; +} diff --git a/database/engine/metadata_log/metalogpluginsd.h b/database/engine/metadata_log/metalogpluginsd.h new file mode 100644 index 000000000..96808aaa2 --- /dev/null +++ b/database/engine/metadata_log/metalogpluginsd.h @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METALOGPLUGINSD_H +#define NETDATA_METALOGPLUGINSD_H + +#include "../../../collectors/plugins.d/pluginsd_parser.h" +#include "../../../collectors/plugins.d/plugins_d.h" +#include "../../../parser/parser.h" + +struct metalog_pluginsd_state { + struct metalog_instance *ctx; + uuid_t uuid; + uuid_t host_uuid; + uuid_t chart_uuid; + uint8_t skip_record; /* skip this record due to errors in parsing */ + struct metadata_logfile *metalogfile; /* current metadata log file being replayed */ +}; + +extern void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx); + +extern PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, + char *context, char *title, char *units, char *plugin, char *module, + int priority, int update_every, RRDSET_TYPE chart_type, char *options); +extern PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, + long multiplier, long divisor, char *options, + RRD_ALGORITHM algorithm_type); +extern PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action); +extern PARSER_RC metalog_pluginsd_host_action(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, char *tags); + +#endif /* NETDATA_METALOGPLUGINSD_H */ diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 449138b4d..a18207100 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -101,6 +101,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr) uv_cond_broadcast(&pg_cache_descr->cond); } +void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); +} + /* * The caller must hold page descriptor lock. * The lock will be released and re-acquired. The descriptor is not guaranteed @@ -115,6 +122,24 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr) --pg_cache_descr->waiters; } +/* + * The caller must hold page descriptor lock. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + * Returns UV_ETIMEDOUT if timeout_sec seconds pass. + */ +int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec) +{ + int ret; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + ++pg_cache_descr->waiters; + ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC); + --pg_cache_descr->waiters; + + return ret; +} + /* * Returns page flags. * The lock will be released and re-acquired. The descriptor is not guaranteed @@ -135,10 +160,8 @@ unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_ /* * The caller must hold page descriptor lock. - * Gets a reference to the page descriptor. - * Returns 1 on success and 0 on failure. */ -int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) +int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) { struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; @@ -146,25 +169,25 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_acces (exclusive_access && pg_cache_descr->refcnt)) { return 0; } - if (exclusive_access) - pg_cache_descr->flags |= RRD_PAGE_LOCKED; - ++pg_cache_descr->refcnt; return 1; } /* * The caller must hold page descriptor lock. - * Same return values as pg_cache_try_get_unsafe() without doing anything. + * Gets a reference to the page descriptor. + * Returns 1 on success and 0 on failure. */ -int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) +int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) { struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || - (exclusive_access && pg_cache_descr->refcnt)) { + if (!pg_cache_can_get_unsafe(descr, exclusive_access)) return 0; - } + + if (exclusive_access) + pg_cache_descr->flags |= RRD_PAGE_LOCKED; + ++pg_cache_descr->refcnt; return 1; } @@ -212,23 +235,31 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb /* * This function returns the maximum number of pages allowed in the page cache. - * The caller must hold the page cache lock. */ unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) { /* it's twice the number of producers since we pin 2 pages per producer */ - return ctx->max_cache_pages + 2 * (unsigned long)ctx->stats.metric_API_producers; + return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers; } /* * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the * number of pages below that number. - * The caller must hold the page cache lock. */ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) { /* it's twice the number of producers since we pin 2 pages per producer */ - return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->stats.metric_API_producers; + return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers; +} + +/* + * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page + * cache. + */ +unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx) +{ + /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */ + return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; } /* @@ -370,31 +401,52 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) return 0; } -void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty) +/** + * Deletes a page from the database. + * Callers of this function need to make sure they're not deleting the same descriptor concurrently. + * @param ctx is the database instance. + * @param descr is the page descriptor. + * @param remove_dirty must be non-zero if the page to be deleted is dirty. + * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference. + * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not + * NULL. Otherwise, metric_id is not set. + * @return 1 if it's safe to delete the metric, 0 otherwise. + */ +uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, + uint8_t is_exclusive_holder, uuid_t *metric_id) { struct page_cache *pg_cache = &ctx->pg_cache; struct page_cache_descr *pg_cache_descr = NULL; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; int ret; + uint8_t can_delete_metric = 0; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); - assert(NULL != PValue); + fatal_assert(NULL != PValue); page_index = *PValue; uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); uv_rwlock_wrlock(&page_index->lock); ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); - uv_rwlock_wrunlock(&page_index->lock); if (unlikely(0 == ret)) { + uv_rwlock_wrunlock(&page_index->lock); error("Page under deletion was not in index."); if (unlikely(debug_flags & D_RRDENGINE)) { print_page_descr(descr); } goto destroy; } - assert(1 == ret); + --page_index->page_count; + if (!page_index->writers && !page_index->page_count) { + can_delete_metric = 1; + if (metric_id) { + memcpy(metric_id, page_index->id, sizeof(uuid_t)); + } + } + uv_rwlock_wrunlock(&page_index->lock); + fatal_assert(1 == ret); uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); ++ctx->stats.pg_cache_deletions; @@ -403,13 +455,18 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; - while (!pg_cache_try_get_unsafe(descr, 1)) { - debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); - pg_cache_wait_event_unsafe(descr); + if (!is_exclusive_holder) { + /* If we don't hold an exclusive page reference get one */ + while (!pg_cache_try_get_unsafe(descr, 1)) { + debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_wait_event_unsafe(descr); + } } - if (!remove_dirty) { + if (remove_dirty) { + pg_cache_descr->flags &= ~RRD_PAGE_DIRTY; + } else { /* even a locked page could be dirty */ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) { debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); @@ -429,11 +486,16 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); } pg_cache_put(ctx, descr); - - rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); + while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */ + (void)sleep_usec(1000); /* 1 msec */ + } destroy: freez(descr); pg_cache_update_metric_times(page_index); + + return can_delete_metric; } static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time) @@ -521,7 +583,7 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) uv_rwlock_rdunlock(&page_index->lock); if (unlikely(NULL == firstPValue)) { - assert(NULL == lastPValue); + fatal_assert(NULL == lastPValue); page_index->oldest_time = page_index->latest_time = INVALID_TIME; return; } @@ -542,7 +604,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index /* there is page cache descriptor pre-allocated state */ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED); + fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED); if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { pg_cache_reserve_pages(ctx, 1); if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY)) @@ -553,7 +615,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index if (unlikely(NULL == index)) { uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); - assert(NULL != PValue); + fatal_assert(NULL != PValue); page_index = *PValue; uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); } else { @@ -563,6 +625,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index uv_rwlock_wrlock(&page_index->lock); PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); *PValue = descr; + ++page_index->page_count; pg_cache_add_new_metric_time(page_index, descr); uv_rwlock_wrunlock(&page_index->lock); @@ -577,7 +640,7 @@ usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); @@ -617,7 +680,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c Word_t Index; (void)pg_cache; - assert(NULL != page_index); + fatal_assert(NULL != page_index); Index = (Word_t)(point_in_time / USEC_PER_SEC); uv_rwlock_rdlock(&page_index->lock); @@ -658,11 +721,11 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta unsigned i, j, k, preload_count, count, page_info_array_max_size; unsigned long flags; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; Word_t Index; uint8_t failed_to_reserve; - assert(NULL != ret_page_indexp); + fatal_assert(NULL != ret_page_indexp); uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); @@ -804,7 +867,7 @@ struct rrdeng_page_descr * struct page_cache_descr *pg_cache_descr = NULL; unsigned long flags; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; Word_t Index; uint8_t page_not_in_cache; @@ -911,7 +974,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index struct page_cache_descr *pg_cache_descr = NULL; unsigned long flags; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; uint8_t page_not_in_cache; if (unlikely(NULL == index)) { @@ -1003,10 +1066,12 @@ struct pg_cache_page_index *create_page_index(uuid_t *id) page_index = mallocz(sizeof(*page_index)); page_index->JudyL_array = (Pvoid_t) NULL; uuid_copy(page_index->id, *id); - assert(0 == uv_rwlock_init(&page_index->lock)); + fatal_assert(0 == uv_rwlock_init(&page_index->lock)); page_index->oldest_time = INVALID_TIME; page_index->latest_time = INVALID_TIME; page_index->prev = NULL; + page_index->page_count = 0; + page_index->writers = 0; return page_index; } @@ -1017,7 +1082,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx) pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; pg_cache->metrics_index.last_page_index = NULL; - assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); + fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); } static void init_replaceQ(struct rrdengine_instance *ctx) @@ -1026,7 +1091,7 @@ static void init_replaceQ(struct rrdengine_instance *ctx) pg_cache->replaceQ.head = NULL; pg_cache->replaceQ.tail = NULL; - assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock)); + fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock)); } static void init_committed_page_index(struct rrdengine_instance *ctx) @@ -1034,7 +1099,7 @@ static void init_committed_page_index(struct rrdengine_instance *ctx) struct page_cache *pg_cache = &ctx->pg_cache; pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL; - assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock)); + fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock)); pg_cache->committed_page_index.latest_corr_id = 0; pg_cache->committed_page_index.nr_committed_pages = 0; } @@ -1045,7 +1110,7 @@ void init_page_cache(struct rrdengine_instance *ctx) pg_cache->page_descriptors = 0; pg_cache->populated_pages = 0; - assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); + fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); init_metrics_index(ctx); init_replaceQ(ctx); @@ -1064,7 +1129,7 @@ void free_page_cache(struct rrdengine_instance *ctx) /* Free committed page index */ ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0); - assert(NULL == pg_cache->committed_page_index.JudyL_array); + fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array); bytes_freed += ret_Judy; for (page_index = pg_cache->metrics_index.last_page_index ; @@ -1099,14 +1164,14 @@ void free_page_cache(struct rrdengine_instance *ctx) /* Free page index */ ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0); - assert(NULL == page_index->JudyL_array); + fatal_assert(NULL == page_index->JudyL_array); bytes_freed += ret_Judy; freez(page_index); bytes_freed += sizeof(*page_index); } /* Free metrics index */ ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); - assert(NULL == pg_cache->metrics_index.JudyHS_array); + fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array); bytes_freed += ret_Judy; info("Freed %lu bytes of memory from page cache.", bytes_freed); diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index 7d8fa2a1d..31e9739da 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -85,6 +85,8 @@ struct pg_cache_page_index { * TODO: examine if we want to support better granularity than seconds */ Pvoid_t JudyL_array; + Word_t page_count; + unsigned short writers; uv_rwlock_t lock; /* @@ -148,6 +150,7 @@ struct page_cache { /* TODO: add statistics */ }; extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr); +extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr); extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, @@ -162,7 +165,8 @@ extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr); extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, struct rrdeng_page_descr *descr); -extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty); +extern uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, + uint8_t remove_dirty, uint8_t is_exclusive_holder, uuid_t *metric_id); extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time); extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, @@ -184,6 +188,7 @@ extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx); extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx); +extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx); static inline void pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp) @@ -214,7 +219,7 @@ static inline void /* The caller must hold a reference to the page and must have already set the new data */ static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length) { - assert(!(end_time & 1)); + fatal_assert(!(end_time & 1)); __sync_synchronize(); descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */ __sync_synchronize(); diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index b6b6548ec..43135ff01 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -6,9 +6,10 @@ rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; -rrdeng_stats_t global_flushing_errors = 0; +rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; +rrdeng_stats_t global_flushing_pressure_page_deletions = 0; -void sanity_check(void) +static void sanity_check(void) { /* Magic numbers must fit in the super-blocks */ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); @@ -26,10 +27,188 @@ void sanity_check(void) /* page count must fit in 8 bits */ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); + /* extent cache count must fit in 32 bits */ + BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32); + /* page info scratch space must be able to hold 2 32-bit integers */ BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); } +/* always inserts into tail */ +static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + + xt_cache_elem->prev = NULL; + xt_cache_elem->next = NULL; + + if (likely(NULL != xt_cache->replaceQ_tail)) { + xt_cache_elem->prev = xt_cache->replaceQ_tail; + xt_cache->replaceQ_tail->next = xt_cache_elem; + } + if (unlikely(NULL == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = xt_cache_elem; + } + xt_cache->replaceQ_tail = xt_cache_elem; +} + +static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *prev, *next; + + prev = xt_cache_elem->prev; + next = xt_cache_elem->next; + + if (likely(NULL != prev)) { + prev->next = next; + } + if (likely(NULL != next)) { + next->prev = prev; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = next; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) { + xt_cache->replaceQ_tail = prev; + } + xt_cache_elem->prev = xt_cache_elem->next = NULL; +} + +static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_replaceQ_insert(wc, xt_cache_elem); +} + +/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */ +static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned idx; + int ret; + + ret = find_first_zero(xt_cache->allocation_bitmap); + if (-1 == ret || ret >= MAX_CACHED_EXTENTS) { + for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) { + idx = xt_cache_elem - xt_cache->extent_array; + if (!check_bit(xt_cache->inflight_bitmap, idx)) { + xt_cache_replaceQ_delete(wc, xt_cache_elem); + break; + } + } + if (NULL == xt_cache_elem) + return -1; + } else { + idx = (unsigned)ret; + xt_cache_elem = &xt_cache->extent_array[idx]; + } + xt_cache_elem->extent = extent; + xt_cache_elem->fileno = extent->datafile->fileno; + xt_cache_elem->inflight_io_descr = NULL; + xt_cache_replaceQ_insert(wc, xt_cache_elem); + modify_bit(&xt_cache->allocation_bitmap, idx, 1); + + return (int)idx; +} + +/** + * Returns 0 if the cached extent was found in the extent cache, 1 otherwise. + * Sets *idx to point to the position of the extent inside the cache. + **/ +static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned i; + + for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) { + xt_cache_elem = &xt_cache->extent_array[i]; + if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent && + xt_cache_elem->fileno == extent->datafile->fileno) { + *idx = i; + return 0; + } + } + return 1; +} + +#if 0 /* disabled code */ +static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + + xt_cache_elem = &xt_cache->extent_array[idx]; + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_elem->extent = NULL; + modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */ + modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */ +} +#endif + +void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx, + struct extent_io_descriptor *xt_io_descr) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + struct extent_io_descriptor *old_next; + + xt_cache_elem = &xt_cache->extent_array[idx]; + old_next = xt_cache_elem->inflight_io_descr->next; + xt_cache_elem->inflight_io_descr->next = xt_io_descr; + xt_io_descr->next = old_next; +} + +void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr) +{ + unsigned i, j, page_offset; + struct rrdengine_instance *ctx = wc->ctx; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + void *page; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + for (i = 0 ; i < xt_io_descr->descr_count; ++i) { + page = mallocz(RRDENG_BLOCK_SIZE); + descr = xt_io_descr->descr_array[i]; + for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) { + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*extent->pages[j]->id, *descr->id) && + extent->pages[j]->page_length == descr->page_length && + extent->pages[j]->start_time == descr->start_time && + extent->pages[j]->end_time == descr->end_time) { + break; + } + page_offset += extent->pages[j]->page_length; + + } + /* care, we don't hold the descriptor mutex */ + (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); + if (xt_io_descr->release_descr) { + pg_cache_put(ctx, descr); + } else { + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); + } + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); + freez(xt_io_descr); +} + void read_extent_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -40,7 +219,7 @@ void read_extent_cb(uv_fs_t* req) int ret; unsigned i, j, count; void *page, *uncompressed_buf = NULL; - uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; + uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0; uint8_t have_read_error = 0; /* persistent structures */ struct rrdeng_df_extent_header *header; @@ -98,6 +277,33 @@ after_crc_check: debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); /* care, we don't hold the descriptor mutex */ } + { + uint8_t xt_is_cached = 0; + unsigned xt_idx; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) { + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx]; + struct extent_io_descriptor *curr, *next; + + if (have_read_error) { + memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages)); + } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length); + } else { + (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length); + } + /* complete all connected in-flight read requests */ + for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) { + next = curr->next; + read_cached_extent_cb(wc, xt_idx, curr); + } + xt_cache_elem->inflight_io_descr = NULL; + modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */ + } + } for (i = 0 ; i < xt_io_descr->descr_count; ++i) { page = mallocz(RRDENG_BLOCK_SIZE); @@ -121,19 +327,19 @@ after_crc_check: } else { (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); } - pg_cache_replaceQ_insert(ctx, descr); rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; pg_cache_descr->page = page; pg_cache_descr->flags |= RRD_PAGE_POPULATED; pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; - debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); if (xt_io_descr->release_descr) { - pg_cache_put_unsafe(descr); + pg_cache_put(ctx, descr); } else { - pg_cache_wake_up_waiters_unsafe(descr); + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); } - rrdeng_page_descr_mutex_unlock(ctx, descr); } if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { freez(uncompressed_buf); @@ -158,18 +364,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc, // uint32_t payload_length; struct extent_io_descriptor *xt_io_descr; struct rrdengine_datafile *datafile; + struct extent_info *extent = descr[0]->extent; + uint8_t xt_is_cached = 0, xt_is_inflight = 0; + unsigned xt_idx; - datafile = descr[0]->extent->datafile; - pos = descr[0]->extent->offset; - size_bytes = descr[0]->extent->size; + datafile = extent->datafile; + pos = extent->offset; + size_bytes = extent->size; - xt_io_descr = mallocz(sizeof(*xt_io_descr)); - ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); - /* freez(xt_io_descr); - return;*/ - } + xt_io_descr = callocz(1, sizeof(*xt_io_descr)); for (i = 0 ; i < count; ++i) { rrdeng_page_descr_mutex_lock(ctx, descr[i]); pg_cache_descr = descr[i]->pg_cache_descr; @@ -187,10 +390,34 @@ static void do_read_extent(struct rrdengine_worker_config* wc, /* xt_io_descr->descr_commit_idx_array[0] */ xt_io_descr->release_descr = release_descr; + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached) { + xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]); + xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx); + if (xt_is_inflight) { + enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr); + return; + } + return read_cached_extent_cb(wc, xt_idx, xt_io_descr); + } else { + ret = try_insert_into_xt_cache(wc, extent); + if (-1 != ret) { + xt_idx = (unsigned)ret; + modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1); + wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr; + } + } + + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* freez(xt_io_descr); + return;*/ + } real_io_size = ALIGN_BYTES_CEILING(size_bytes); xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); - assert (-1 != ret); + fatal_assert(-1 != ret); ctx->stats.io_read_bytes += real_io_size; ++ctx->stats.io_read_requests; ctx->stats.io_read_extent_bytes += real_io_size; @@ -243,11 +470,117 @@ static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t ty commit_data_extent(wc, (struct extent_io_descriptor *)data); break; default: - assert(type == STORE_DATA); + fatal_assert(type == STORE_DATA); break; } } +static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + int error; + + error = uv_thread_join(wc->now_invalidating_dirty_pages); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; +} + +static void invalidate_oldest_committed(void *arg) +{ + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config *wc = &ctx->worker_config; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + Pvoid_t *PValue; + Word_t Index; + unsigned nr_committed_pages; + + do { + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + for (Index = 0, + PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue; + + descr != NULL; + + PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + fatal_assert(0 != descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) { + rrdeng_page_descr_mutex_unlock(ctx, descr); + + ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); + fatal_assert(1 == ret); + break; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (!descr) { + info("Failed to invalidate any dirty pages to relieve page cache pressure."); + + goto out; + } + pg_cache_punch_hole(ctx, descr, 1, 1, NULL); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1); + rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1); + + } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)); +out: + wc->cleanup_thread_invalidating_dirty_pages = 1; + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); +} + +void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned nr_committed_pages; + int error; + + if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */ + return; + + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + + if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* delete the oldest page in memory */ + if (wc->now_invalidating_dirty_pages) { + /* already deleting a page */ + return; + } + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path); + + wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages)); + wc->cleanup_thread_invalidating_dirty_pages = 0; + + error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + } + } +} + void flush_pages_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -294,6 +627,7 @@ void flush_pages_cb(uv_fs_t* req) uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); pg_cache->committed_page_index.nr_committed_pages -= count; uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + wc->inflight_dirty_pages -= count; } /* @@ -338,7 +672,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct descr = unlikely(NULL == PValue) ? NULL : *PValue) { uint8_t page_write_pending; - assert(0 != descr->page_length); + fatal_assert(0 != descr->page_length); page_write_pending = 0; rrdeng_page_descr_mutex_lock(ctx, descr); @@ -355,7 +689,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct if (page_write_pending) { ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); - assert(1 == ret); + fatal_assert(1 == ret); } } uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); @@ -366,6 +700,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct complete(completion); return 0; } + wc->inflight_dirty_pages += count; + xt_io_descr = mallocz(sizeof(*xt_io_descr)); payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); switch (compression_algorithm) { @@ -373,7 +709,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); break; default: /* Compress */ - assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); + fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); max_compressed_size = LZ4_compressBound(uncompressed_payload_length); compressed_buf = mallocz(max_compressed_size); size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); @@ -453,7 +789,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct real_io_size = ALIGN_BYTES_CEILING(size_bytes); xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb); - assert (-1 != ret); + fatal_assert(-1 != ret); ctx->stats.io_write_bytes += real_io_size; ++ctx->stats.io_write_requests; ctx->stats.io_write_extent_bytes += real_io_size; @@ -466,17 +802,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct return ALIGN_BYTES_CEILING(size_bytes); } -static void after_delete_old_data(uv_work_t *req, int status) +static void after_delete_old_data(struct rrdengine_worker_config* wc) { - struct rrdengine_instance *ctx = req->data; - struct rrdengine_worker_config* wc = &ctx->worker_config; + struct rrdengine_instance *ctx = wc->ctx; struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; unsigned deleted_bytes, journalfile_bytes, datafile_bytes; - int ret; + int ret, error; char path[RRDENG_PATH_MAX]; - (void)status; datafile = ctx->datafiles.first; journalfile = datafile->journalfile; datafile_bytes = datafile->pos; @@ -503,19 +837,30 @@ static void after_delete_old_data(uv_work_t *req, int status) ctx->disk_space -= deleted_bytes; info("Reclaimed %u bytes of disk space.", deleted_bytes); + error = uv_thread_join(wc->now_deleting_files); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_deleting_files); /* unfreeze command processing */ - wc->now_deleting.data = NULL; - /* wake up event loop */ - assert(0 == uv_async_send(&wc->async)); + wc->now_deleting_files = NULL; + + wc->cleanup_thread_deleting_files = 0; + + /* interrupt event loop */ + uv_stop(wc->loop); } -static void delete_old_data(uv_work_t *req) +static void delete_old_data(void *arg) { - struct rrdengine_instance *ctx = req->data; + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config* wc = &ctx->worker_config; struct rrdengine_datafile *datafile; struct extent_info *extent, *next; struct rrdeng_page_descr *descr; unsigned count, i; + uint8_t can_delete_metric; + uuid_t metric_id; /* Safe to use since it will be deleted after we are done */ datafile = ctx->datafiles.first; @@ -524,11 +869,21 @@ static void delete_old_data(uv_work_t *req) count = extent->number_of_pages; for (i = 0 ; i < count ; ++i) { descr = extent->pages[i]; - pg_cache_punch_hole(ctx, descr, 0); + can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id); + if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) { + /* + * If the metric is empty, has no active writers and if the metadata log has been initialized then + * attempt to delete the corresponding netdata dimension. + */ + metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id); + } } next = extent->next; freez(extent); } + wc->cleanup_thread_deleting_files = 1; + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); } void rrdeng_test_quota(struct rrdengine_worker_config* wc) @@ -537,10 +892,11 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) struct rrdengine_datafile *datafile; unsigned current_size, target_size; uint8_t out_of_space, only_one_datafile; - int ret; + int ret, error; out_of_space = 0; - if (unlikely(ctx->disk_space > ctx->max_disk_space)) { + /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */ + if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) { out_of_space = 1; } datafile = ctx->datafiles.last; @@ -557,9 +913,9 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) ++ctx->last_fileno; } } - if (unlikely(out_of_space)) { + if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) { /* delete old data */ - if (wc->now_deleting.data) { + if (wc->now_deleting_files) { /* already deleting data */ return; } @@ -571,8 +927,39 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) } info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); - wc->now_deleting.data = ctx; - assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data)); + wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files)); + wc->cleanup_thread_deleting_files = 0; + + error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_deleting_files); + wc->now_deleting_files = NULL; + } + } +} + +static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc) +{ + if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) { + return 1; + } + return 0; +} + +static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + + if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) { + after_invalidate_oldest_committed(wc); + } + if (unlikely(wc->cleanup_thread_deleting_files)) { + after_delete_old_data(wc); + } + if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) { + ctx->quiesce = QUIESCED; + complete(&ctx->rrdengine_completion); } } @@ -591,8 +978,8 @@ void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) { wc->cmd_queue.head = wc->cmd_queue.tail = 0; wc->queue_size = 0; - assert(0 == uv_cond_init(&wc->cmd_cond)); - assert(0 == uv_mutex_init(&wc->cmd_mutex)); + fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); + fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); } void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) @@ -604,7 +991,7 @@ void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) { uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); } - assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); + fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); /* enqueue command */ wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ? @@ -613,7 +1000,7 @@ void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) uv_mutex_unlock(&wc->cmd_mutex); /* wake up event loop */ - assert(0 == uv_async_send(&wc->async)); + fatal_assert(0 == uv_async_send(&wc->async)); } struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) @@ -657,39 +1044,44 @@ void async_cb(uv_async_t *handle) void timer_cb(uv_timer_t* handle) { struct rrdengine_worker_config* wc = handle->data; + struct rrdengine_instance *ctx = wc->ctx; uv_stop(handle->loop); uv_update_time(handle->loop); + if (unlikely(!ctx->metalog_ctx->initialized)) + return; /* Wait for the metadata log to initialize */ rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); - if (likely(!wc->now_deleting.data)) { - /* There is free space so we can write to disk */ - struct rrdengine_instance *ctx = wc->ctx; + if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { + /* There is free space so we can write to disk and we are not actively deleting dirty buffers */ struct page_cache *pg_cache = &ctx->pg_cache; unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark, high_watermark; - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); - producers = ctx->stats.metric_API_producers; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + producers = ctx->metric_API_max_producers; /* are flushable pages more than 25% of the maximum page cache size */ high_watermark = (ctx->max_cache_pages * 25LLU) / 100; low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */ - if (nr_committed_pages > producers && - /* committed to be written pages are more than the produced number */ - nr_committed_pages - producers > high_watermark) { - /* Flushing speed must increase to stop page cache from filling with dirty pages */ - bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; - } - bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); + /* Flush more pages only if disk can keep up */ + if (wc->inflight_dirty_pages < high_watermark + producers) { + if (nr_committed_pages > producers && + /* committed to be written pages are more than the produced number */ + nr_committed_pages - producers > high_watermark) { + /* Flushing speed must increase to stop page cache from filling with dirty pages */ + bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; + } + bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); - debug(D_RRDENGINE, "Flushing pages to disk."); - for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ; - bytes_written && (total_bytes < bytes_to_write) ; - total_bytes += bytes_written) { - bytes_written = do_flush_pages(wc, 0, NULL); + debug(D_RRDENGINE, "Flushing pages to disk."); + for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL); + bytes_written && (total_bytes < bytes_to_write); + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 0, NULL); + } } } #ifdef NETDATA_INTERNAL_CHECKS @@ -730,7 +1122,12 @@ void rrdeng_worker(void* arg) } wc->async.data = wc; - wc->now_deleting.data = NULL; + wc->now_deleting_files = NULL; + wc->cleanup_thread_deleting_files = 0; + + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; + wc->inflight_dirty_pages = 0; /* dirty page flushing timer */ ret = uv_timer_init(loop, &timer_req); @@ -744,10 +1141,11 @@ void rrdeng_worker(void* arg) /* wake up initialization thread */ complete(&ctx->rrdengine_completion); - assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); + fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); shutdown = 0; - while (shutdown == 0 || uv_loop_alive(loop)) { + while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) { uv_run(loop, UV_RUN_DEFAULT); + rrdeng_cleanup_finished_threads(wc); /* wait for commands */ cmd_batch_size = 0; @@ -769,14 +1167,20 @@ void rrdeng_worker(void* arg) break; case RRDENG_SHUTDOWN: shutdown = 1; - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour and we need to be aware if this becomes - * an issue in the future. - */ - uv_close((uv_handle_t *)&wc->async, NULL); - assert(0 == uv_timer_stop(&timer_req)); + break; + case RRDENG_QUIESCE: + ctx->drop_metrics_under_page_cache_pressure = 0; + ctx->quiesce = SET_QUIESCE; + fatal_assert(0 == uv_timer_stop(&timer_req)); uv_close((uv_handle_t *)&timer_req, NULL); + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all committed pages. */ + } + wal_flush_transaction_buffer(wc); + if (!rrdeng_threads_alive(wc)) { + ctx->quiesce = QUIESCED; + complete(&ctx->rrdengine_completion); + } break; case RRDENG_READ_PAGE: do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); @@ -788,16 +1192,16 @@ void rrdeng_worker(void* arg) do_commit_transaction(wc, STORE_DATA, NULL); break; case RRDENG_FLUSH_PAGES: { - unsigned bytes_written; - - /* First I/O should be enough to call completion */ - bytes_written = do_flush_pages(wc, 1, cmd.completion); - if (bytes_written) { - while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) { - ; /* Force flushing of all committed pages if there is free space. */ - } + if (wc->now_invalidating_dirty_pages) { + /* Do not flush if the disk cannot keep up */ + complete(cmd.completion); + } else { + (void)do_flush_pages(wc, 1, cmd.completion); } break; + case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE: + rrdeng_invalidate_oldest_committed(wc); + break; } default: debug(D_RRDENGINE, "%s: default.", __func__); @@ -805,11 +1209,17 @@ void rrdeng_worker(void* arg) } } while (opcode != RRDENG_NOOP); } + /* cleanup operations of the event loop */ - if (unlikely(wc->now_deleting.data)) { - info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); - } info("Shutting down RRD engine event loop."); + + /* + * uv_async_send after uv_close does not seem to crash in linux at the moment, + * it is however undocumented behaviour and we need to be aware if this becomes + * an issue in the future. + */ + uv_close((uv_handle_t *)&wc->async, NULL); + while (do_flush_pages(wc, 1, NULL)) { ; /* Force flushing of all committed pages. */ } @@ -820,7 +1230,7 @@ void rrdeng_worker(void* arg) /* TODO: don't let the API block by waiting to enqueue commands */ uv_cond_destroy(&wc->cmd_cond); /* uv_mutex_destroy(&wc->cmd_mutex); */ - assert(0 == uv_loop_close(loop)); + fatal_assert(0 == uv_loop_close(loop)); freez(loop); return; @@ -828,7 +1238,7 @@ void rrdeng_worker(void* arg) error_after_timer_init: uv_close((uv_handle_t *)&wc->async, NULL); error_after_async_init: - assert(0 == uv_loop_close(loop)); + fatal_assert(0 == uv_loop_close(loop)); error_after_loop_init: freez(loop); @@ -845,11 +1255,12 @@ void rrdengine_main(void) int ret; struct rrdengine_instance *ctx; - ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); + sanity_check(); + ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); if (ret) { exit(ret); } rrdeng_exit(ctx); fprintf(stderr, "Hello world!"); exit(0); -} \ No newline at end of file +} diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 78a35a0bf..87af04bff 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -7,19 +7,17 @@ #define _GNU_SOURCE #endif #include -#include -#include -#include #include #include #include #include -#include +#include "../../daemon/common.h" #include "../rrd.h" #include "rrddiskprotocol.h" #include "rrdenginelib.h" #include "datafile.h" #include "journalfile.h" +#include "metadata_log/metadatalog.h" #include "rrdengineapi.h" #include "pagecache.h" #include "rrdenglocking.h" @@ -52,6 +50,8 @@ enum rrdeng_opcode { RRDENG_COMMIT_PAGE, RRDENG_FLUSH_PAGES, RRDENG_SHUTDOWN, + RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, + RRDENG_QUIESCE, RRDENG_MAX_OPCODE }; @@ -88,6 +88,7 @@ struct extent_io_descriptor { int release_descr; struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT]; Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; + struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */ }; struct generic_io_descriptor { @@ -99,13 +100,43 @@ struct generic_io_descriptor { struct completion *completion; }; +struct extent_cache_element { + struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */ + unsigned fileno; + struct extent_cache_element *prev; /* LRU */ + struct extent_cache_element *next; /* LRU */ + struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */ + uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE]; +}; + +#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */ + +/* Initialize by setting the structure to zero */ +struct extent_cache { + struct extent_cache_element extent_array[MAX_CACHED_EXTENTS]; + unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */ + unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */ + + struct extent_cache_element *replaceQ_head; /* LRU */ + struct extent_cache_element *replaceQ_tail; /* MRU */ +}; + struct rrdengine_worker_config { struct rrdengine_instance *ctx; uv_thread_t thread; uv_loop_t* loop; uv_async_t async; - uv_work_t now_deleting; + + /* file deletion thread */ + uv_thread_t *now_deleting_files; + unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */ + + /* dirty page deletion thread */ + uv_thread_t *now_invalidating_dirty_pages; + /* set to 0 when now_invalidating_dirty_pages is still running */ + unsigned long cleanup_thread_invalidating_dirty_pages; + unsigned inflight_dirty_pages; /* FIFO command queue */ uv_mutex_t cmd_mutex; @@ -113,6 +144,8 @@ struct rrdengine_worker_config { volatile unsigned queue_size; struct rrdeng_cmdqueue cmd_queue; + struct extent_cache xt_cache; + int error; }; @@ -148,7 +181,8 @@ struct rrdengine_statistics { rrdeng_stats_t page_cache_descriptors; rrdeng_stats_t io_errors; rrdeng_stats_t fs_errors; - rrdeng_stats_t flushing_errors; + rrdeng_stats_t pg_cache_over_half_dirty_events; + rrdeng_stats_t flushing_pressure_page_deletions; }; /* I/O errors global counter */ @@ -157,27 +191,38 @@ extern rrdeng_stats_t global_io_errors; extern rrdeng_stats_t global_fs_errors; /* number of File-Descriptors that have been reserved by dbengine */ extern rrdeng_stats_t rrdeng_reserved_file_descriptors; -/* inability to flush global counter */ -extern rrdeng_stats_t global_flushing_errors; +/* inability to flush global counters */ +extern rrdeng_stats_t global_pg_cache_over_half_dirty_events; +extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */ + +#define NO_QUIESCE (0) /* initial state when all operations function normally */ +#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */ +#define QUIESCED (2) /* is set after all threads have finished running */ struct rrdengine_instance { + struct metalog_instance *metalog_ctx; struct rrdengine_worker_config worker_config; struct completion rrdengine_completion; struct page_cache pg_cache; + uint8_t drop_metrics_under_page_cache_pressure; /* boolean */ uint8_t global_compress_alg; struct transaction_commit_log commit_log; struct rrdengine_datafile_list datafiles; - char dbfiles_path[FILENAME_MAX+1]; + RRDHOST *host; /* the legacy host, or NULL for multi-host DB */ + char dbfiles_path[FILENAME_MAX + 1]; + char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */ uint64_t disk_space; uint64_t max_disk_space; unsigned last_fileno; /* newest index of datafile and journalfile */ unsigned long max_cache_pages; unsigned long cache_pages_low_watermark; + unsigned long metric_API_max_producers; + + uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */ struct rrdengine_statistics stats; }; -extern void sanity_check(void); extern int init_rrd_files(struct rrdengine_instance *ctx); extern void finalize_rrd_files(struct rrdengine_instance *ctx); extern void rrdeng_test_quota(struct rrdengine_worker_config* wc); diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c old mode 100644 new mode 100755 index baf4a9973..7b2ff5b72 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -2,65 +2,148 @@ #include "rrdengine.h" /* Default global database instance */ -static struct rrdengine_instance default_global_ctx; +struct rrdengine_instance multidb_ctx; int default_rrdeng_page_cache_mb = 32; -int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB; +int default_rrdeng_disk_quota_mb = 256; +int default_multidb_disk_quota_mb = 256; +/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */ +uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; -/* - * Gets a handle for storing metrics to the database. - * The handle must be released with rrdeng_store_metric_final(). - */ -void rrdeng_store_metric_init(RRDDIM *rd) +static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host) +{ + return host->rrdeng_ctx; +} + +/* This UUID is not unique across hosts */ +void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid) { - struct rrdeng_collect_handle *handle; - struct page_cache *pg_cache; - struct rrdengine_instance *ctx; - uuid_t temp_id; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index; EVP_MD_CTX *evpctx; unsigned char hash_value[EVP_MAX_MD_SIZE]; unsigned int hash_len; - //&default_global_ctx; TODO: test this use case or remove it? + evpctx = EVP_MD_CTX_create(); + EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); + EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id)); + EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id)); + EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); + EVP_MD_CTX_destroy(evpctx); + fatal_assert(hash_len > sizeof(uuid_t)); + memcpy(ret_uuid, hash_value, sizeof(uuid_t)); +} - ctx = rd->rrdset->rrdhost->rrdeng_ctx; - pg_cache = &ctx->pg_cache; - handle = &rd->state->handle.rrdeng; - handle->ctx = ctx; +/* Transform legacy UUID to be unique across hosts deterministacally */ +void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid) +{ + EVP_MD_CTX *evpctx; + unsigned char hash_value[EVP_MAX_MD_SIZE]; + unsigned int hash_len; evpctx = EVP_MD_CTX_create(); EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); - EVP_DigestUpdate(evpctx, rd->id, strlen(rd->id)); - EVP_DigestUpdate(evpctx, rd->rrdset->id, strlen(rd->rrdset->id)); + EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN); + EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t)); EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); EVP_MD_CTX_destroy(evpctx); - assert(hash_len > sizeof(temp_id)); - memcpy(&temp_id, hash_value, sizeof(temp_id)); + fatal_assert(hash_len > sizeof(uuid_t)); + memcpy(ret_uuid, hash_value, sizeof(uuid_t)); +} - handle->descr = NULL; - handle->prev_descr = NULL; - handle->unaligned_page = 0; +void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid) +{ + struct page_cache *pg_cache; + struct rrdengine_instance *ctx; + uuid_t legacy_uuid; + uuid_t multihost_legacy_uuid; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + int is_multihost_child = 0; + RRDHOST *host = rd->rrdset->rrdhost; + + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); + if (unlikely(!ctx)) { + error("Failed to fetch multidb context"); + return; + } + pg_cache = &ctx->pg_cache; + + rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid); + rd->state->metric_uuid = dim_uuid; + if (host != localhost && host->rrdeng_ctx == &multidb_ctx) + is_multihost_child = 1; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t)); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t)); if (likely(NULL != PValue)) { page_index = *PValue; } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - /* First time we see the UUID */ - uv_rwlock_wrlock(&pg_cache->metrics_index.lock); - PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0); - assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(&temp_id); - page_index->prev = pg_cache->metrics_index.last_page_index; - pg_cache->metrics_index.last_page_index = page_index; - uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + if (is_multihost_child || NULL == PValue) { + /* First time we see the legacy UUID or metric belongs to child host in multi-host DB. + * Drop legacy support, normal path */ + + if (unlikely(!rd->state->metric_uuid)) + rd->state->metric_uuid = create_dimension_uuid(rd->rrdset, rd); + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t), PJE0); + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(rd->state->metric_uuid); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + } + } else { + /* There are legacy UUIDs in the database, implement backward compatibility */ + + rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid, + &multihost_legacy_uuid); + + if (unlikely(!rd->state->metric_uuid)) + rd->state->metric_uuid = mallocz(sizeof(uuid_t)); + + int need_to_store = (dim_uuid == NULL || uuid_compare(*rd->state->metric_uuid, multihost_legacy_uuid)); + + uuid_copy(*rd->state->metric_uuid, multihost_legacy_uuid); + + if (unlikely(need_to_store)) + (void)sql_store_dimension(rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, + rd->algorithm); + } rd->state->rrdeng_uuid = &page_index->id; - handle->page_index = page_index; + rd->state->page_index = page_index; +} + +/* + * Gets a handle for storing metrics to the database. + * The handle must be released with rrdeng_store_metric_final(). + */ +void rrdeng_store_metric_init(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); + handle = &rd->state->handle.rrdeng; + handle->ctx = ctx; + + handle->descr = NULL; + handle->prev_descr = NULL; + handle->unaligned_page = 0; + + page_index = rd->state->page_index; + uv_rwlock_wrlock(&page_index->lock); + ++page_index->writers; + uv_rwlock_wrunlock(&page_index->lock); } /* The page must be populated and referenced */ @@ -88,12 +171,14 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) handle = &rd->state->handle.rrdeng; ctx = handle->ctx; + if (unlikely(!ctx)) + return; descr = handle->descr; if (unlikely(NULL == descr)) { return; } if (likely(descr->page_length)) { - int ret, page_is_empty; + int page_is_empty; rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); @@ -107,17 +192,21 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) if (unlikely(debug_flags & D_RRDENGINE)) print_page_cache_descr(descr); pg_cache_put(ctx, descr); - pg_cache_punch_hole(ctx, descr, 1); + pg_cache_punch_hole(ctx, descr, 1, 0, NULL); handle->prev_descr = NULL; } else { + /* + * Disable pinning for now as it leads to deadlocks. When a collector stops collecting the extra pinned page + * eventually gets rotated but it cannot be destroyed due to the extra reference. + */ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ - rrdeng_page_descr_mutex_lock(ctx, descr); +/* rrdeng_page_descr_mutex_lock(ctx, descr); ret = pg_cache_try_get_unsafe(descr, 0); rrdeng_page_descr_mutex_unlock(ctx, descr); - assert (1 == ret); + fatal_assert(1 == ret);*/ rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - handle->prev_descr = descr; + /* handle->prev_descr = descr;*/ } } else { freez(descr->pg_cache_descr->page); @@ -168,14 +257,12 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n must_flush_unaligned_page)) { rrdeng_store_metric_flush_current_page(rd); - page = rrdeng_create_page(ctx, &handle->page_index->id, &descr); - assert(page); + page = rrdeng_create_page(ctx, &rd->state->page_index->id, &descr); + fatal_assert(page); handle->descr = descr; - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); - handle->page_correlation_id = pg_cache->committed_page_index.latest_corr_id++; - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); if (0 == rd->rrdset->rrddim_page_alignment) { /* this is the leading dimension that defines chart alignment */ @@ -189,30 +276,53 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n if (perfect_page_alignment) rd->rrdset->rrddim_page_alignment = descr->page_length; if (unlikely(INVALID_TIME == descr->start_time)) { + unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers; descr->start_time = point_in_time; - rrd_stat_atomic_add(&ctx->stats.metric_API_producers, 1); - pg_cache_insert(ctx, handle->page_index, descr); + new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1); + while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) { + /* Increase ctx->metric_API_max_producers */ + ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers, + old_metric_API_max_producers, + new_metric_API_producers); + if (old_metric_API_max_producers == ret_metric_API_max_producers) { + /* success */ + break; + } + } + + pg_cache_insert(ctx, rd->state->page_index, descr); } else { - pg_cache_add_new_metric_time(handle->page_index, descr); + pg_cache_add_new_metric_time(rd->state->page_index, descr); } } /* * Releases the database reference from the handle for storing metrics. + * Returns 1 if it's safe to delete the dimension. */ -void rrdeng_store_metric_finalize(RRDDIM *rd) +int rrdeng_store_metric_finalize(RRDDIM *rd) { struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + uint8_t can_delete_metric = 0; handle = &rd->state->handle.rrdeng; ctx = handle->ctx; + page_index = rd->state->page_index; rrdeng_store_metric_flush_current_page(rd); if (handle->prev_descr) { /* unpin old second page */ pg_cache_put(ctx, handle->prev_descr); } + uv_rwlock_wrlock(&page_index->lock); + if (!--page_index->writers && !page_index->page_count) { + can_delete_metric = 1; + } + uv_rwlock_wrunlock(&page_index->lock); + + return can_delete_metric; } /* Returns 1 if the data collection interval is well defined, 0 otherwise */ @@ -253,7 +363,7 @@ static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) * @return number of regions with different data collection intervals. */ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, - struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp) + struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list) { struct pg_cache_page_index *page_index; struct rrdengine_instance *ctx; @@ -266,15 +376,16 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e struct rrdeng_region_info *region_info_array; uint8_t is_first_region_initialized; - ctx = st->rrdhost->rrdeng_ctx; + ctx = get_rrdeng_ctx_from_host(st->rrdhost); regions = 1; *max_intervalp = max_interval = 0; region_info_array = NULL; *region_info_arrayp = NULL; page_info_array = NULL; + RRDDIM *temp_rd = context_param_list ? context_param_list->rd : NULL; rrdset_rdlock(st); - for(rd_iter = st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { + for(rd_iter = temp_rd?temp_rd:st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { /* * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions * but it is a best effort approximation with a bias towards older metrics in a chart. It @@ -316,7 +427,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e continue; } page_entries = curr->page_length / sizeof(storage_number); - assert(0 != page_entries); + fatal_assert(0 != page_entries); if (likely(1 != page_entries)) { dt = (curr->end_time - curr->start_time) / (page_entries - 1); *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt); @@ -352,7 +463,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e if (1 == page_points) first_valid_time_in_page = current_position_time; if (unlikely(!is_first_region_initialized)) { - assert(1 == regions); + fatal_assert(1 == regions); /* this is the first region */ region_info_array[0].start_time = current_position_time; is_first_region_initialized = 1; @@ -365,7 +476,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e } if (unlikely(0 == *pginfo_to_dt(curr))) { /* unknown data collection interval */ - assert(1 == page_points); + fatal_assert(1 == page_points); if (likely(NULL != prev)) { /* get interval from previous page */ *pginfo_to_dt(curr) = *pginfo_to_dt(prev); @@ -428,7 +539,7 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand struct rrdengine_instance *ctx; unsigned pages_nr; - ctx = rd->rrdset->rrdhost->rrdeng_ctx; + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; handle = &rrdimm_handle->rrdeng; @@ -452,7 +563,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle struct rrdeng_page_descr *descr; storage_number *page, ret; unsigned position, entries; - usec_t next_page_time, current_position_time, page_end_time; + usec_t next_page_time = 0, current_position_time, page_end_time = 0; uint32_t page_length; handle = &rrdimm_handle->rrdeng; @@ -473,17 +584,18 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle /* We need to get a new page */ if (descr) { /* Drop old page's reference */ - handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1; - if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { - goto no_more_metrics; - } - next_page_time = handle->next_page_time * USEC_PER_SEC; #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif pg_cache_put(ctx, descr); handle->descr = NULL; + handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1; + if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { + goto no_more_metrics; + } + next_page_time = handle->next_page_time * USEC_PER_SEC; } + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); if (NULL == descr) { @@ -520,7 +632,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle } handle->position = position; handle->now = current_position_time / USEC_PER_SEC; -/* assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); +/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); The above assertion is an approximation and needs to take update_every into account */ if (unlikely(handle->now >= rrdimm_handle->end_time)) { /* next calls will not load any more metrics */ @@ -564,21 +676,17 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) time_t rrdeng_metric_latest_time(RRDDIM *rd) { - struct rrdeng_collect_handle *handle; struct pg_cache_page_index *page_index; - handle = &rd->state->handle.rrdeng; - page_index = handle->page_index; + page_index = rd->state->page_index; return page_index->latest_time / USEC_PER_SEC; } time_t rrdeng_metric_oldest_time(RRDDIM *rd) { - struct rrdeng_collect_handle *handle; struct pg_cache_page_index *page_index; - handle = &rd->state->handle.rrdeng; - page_index = handle->page_index; + page_index = rd->state->page_index; return page_index->oldest_time / USEC_PER_SEC; } @@ -620,7 +728,7 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__); return; } - assert(descr->page_length); + fatal_assert(descr->page_length); uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0); @@ -628,16 +736,27 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages; uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); - if (nr_committed_pages >= (pg_cache_hard_limit(ctx) - (unsigned long)ctx->stats.metric_API_producers) / 2) { - /* 50% of pages have not been committed yet */ - if (0 == (unsigned long)ctx->stats.flushing_errors) { - /* only print the first time */ - error("Failed to flush quickly enough in dbengine instance \"%s\"" - ". Metric data will not be stored in the database" - ", please reduce disk load or use a faster disk.", ctx->dbfiles_path); + if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) { + /* over 50% of pages have not been committed yet */ + + if (ctx->drop_metrics_under_page_cache_pressure && + nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* 100% of pages are dirty */ + struct rrdeng_cmd cmd; + + cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + } else { + if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) { + /* only print the first time */ + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data at risk of not being stored in the database, " + "please reduce disk load or use a faster disk.", ctx->dbfiles_path); + } + rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1); + rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1); } - rrd_stat_atomic_add(&ctx->stats.flushing_errors, 1); - rrd_stat_atomic_add(&global_flushing_errors, 1); } pg_cache_put(ctx, descr); @@ -687,8 +806,11 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i * You must not change the indices of the statistics or user code will break. * You must not exceed RRDENG_NR_STATS or it will crash. */ -void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array) +void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array) { + if (ctx == NULL) + return; + struct page_cache *pg_cache = &ctx->pg_cache; array[0] = (uint64_t)ctx->stats.metric_API_producers; @@ -724,9 +846,11 @@ void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long array[30] = (uint64_t)global_io_errors; array[31] = (uint64_t)global_fs_errors; array[32] = (uint64_t)rrdeng_reserved_file_descriptors; - array[33] = (uint64_t)ctx->stats.flushing_errors; - array[34] = (uint64_t)global_flushing_errors; - assert(RRDENG_NR_STATS == 35); + array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events; + array[34] = (uint64_t)global_pg_cache_over_half_dirty_events; + array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions; + array[36] = (uint64_t)global_flushing_pressure_page_deletions; + fatal_assert(RRDENG_NR_STATS == 37); } /* Releases reference to page */ @@ -739,21 +863,21 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) /* * Returns 0 on success, negative on error */ -int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb) +int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, + unsigned disk_space_mb) { struct rrdengine_instance *ctx; int error; uint32_t max_open_files; - sanity_check(); - max_open_files = rlimit_nofile.rlim_cur / 4; /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE); if (rrdeng_reserved_file_descriptors > max_open_files) { - error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", - (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); + error( + "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", + (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); rrd_stat_atomic_add(&global_fs_errors, 1); rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); @@ -761,8 +885,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p } if (NULL == ctxp) { - /* for testing */ - ctx = &default_global_ctx; + ctx = &multidb_ctx; memset(ctx, 0, sizeof(*ctx)); } else { *ctxp = ctx = callocz(1, sizeof(*ctx)); @@ -778,6 +901,16 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p ctx->max_disk_space = disk_space_mb * 1048576LLU; strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1); ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0'; + if (NULL == host) + strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN); + else + strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN); + + ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure; + ctx->metric_API_max_producers = 0; + ctx->quiesce = NO_QUIESCE; + ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */ + ctx->host = host; memset(&ctx->worker_config, 0, sizeof(ctx->worker_config)); ctx->worker_config.ctx = ctx; @@ -789,20 +922,27 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p } init_completion(&ctx->rrdengine_completion); - assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config)); + fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config)); /* wait for worker thread to initialize */ wait_for_completion(&ctx->rrdengine_completion); destroy_completion(&ctx->rrdengine_completion); + uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); if (ctx->worker_config.error) { goto error_after_rrdeng_worker; } + error = metalog_init(ctx); + if (error) { + error("Failed to initialize metadata log file event loop."); + goto error_after_rrdeng_worker; + } + return 0; error_after_rrdeng_worker: finalize_rrd_files(ctx); error_after_init_rrd_files: free_page_cache(ctx); - if (ctx != &default_global_ctx) { + if (ctx != &multidb_ctx) { freez(ctx); *ctxp = NULL; } @@ -825,14 +965,35 @@ int rrdeng_exit(struct rrdengine_instance *ctx) cmd.opcode = RRDENG_SHUTDOWN; rrdeng_enq_cmd(&ctx->worker_config, &cmd); - assert(0 == uv_thread_join(&ctx->worker_config.thread)); + fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread)); finalize_rrd_files(ctx); + //metalog_exit(ctx->metalog_ctx); free_page_cache(ctx); - if (ctx != &default_global_ctx) { + if (ctx != &multidb_ctx) { freez(ctx); } rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return 0; -} \ No newline at end of file +} + +void rrdeng_prepare_exit(struct rrdengine_instance *ctx) +{ + struct rrdeng_cmd cmd; + + if (NULL == ctx) { + return; + } + + init_completion(&ctx->rrdengine_completion); + cmd.opcode = RRDENG_QUIESCE; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + /* wait for dbengine to quiesce */ + wait_for_completion(&ctx->rrdengine_completion); + destroy_completion(&ctx->rrdengine_completion); + + //metalog_prepare_exit(ctx->metalog_ctx); +} + diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index 1ef0569d5..41375b980 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -6,14 +6,17 @@ #include "rrdengine.h" #define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8) -#define RRDENG_MIN_DISK_SPACE_MB (256) +#define RRDENG_MIN_DISK_SPACE_MB (64) -#define RRDENG_NR_STATS (35) +#define RRDENG_NR_STATS (37) #define RRDENG_FD_BUDGET_PER_INSTANCE (50) extern int default_rrdeng_page_cache_mb; extern int default_rrdeng_disk_quota_mb; +extern int default_multidb_disk_quota_mb; +extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure; +extern struct rrdengine_instance multidb_ctx; struct rrdeng_region_info { time_t start_time; @@ -27,13 +30,20 @@ extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_pag extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle); extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle); extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle); + +extern void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid); +extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, + uuid_t *ret_uuid); + + +extern void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid); extern void rrdeng_store_metric_init(RRDDIM *rd); extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd); extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number); -extern void rrdeng_store_metric_finalize(RRDDIM *rd); +extern int rrdeng_store_metric_finalize(RRDDIM *rd); extern unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, - struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp); + struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list); extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time); extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time); @@ -41,12 +51,13 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle); extern time_t rrdeng_metric_latest_time(RRDDIM *rd); extern time_t rrdeng_metric_oldest_time(RRDDIM *rd); -extern void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array); +extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array); /* must call once before using anything */ -extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, +extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb); extern int rrdeng_exit(struct rrdengine_instance *ctx); +extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx); -#endif /* NETDATA_RRDENGINEAPI_H */ \ No newline at end of file +#endif /* NETDATA_RRDENGINEAPI_H */ diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c index 0606dd938..287b86be8 100644 --- a/database/engine/rrdenginelib.c +++ b/database/engine/rrdenginelib.c @@ -60,7 +60,7 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size) if (ret < 0) { fatal("uv_fs_fstat: %s\n", uv_strerror(ret)); } - assert(req.result == 0); + fatal_assert(req.result == 0); s = req.ptr; if (!(s->st_mode & S_IFREG)) { error("Not a regular file.\n"); @@ -78,17 +78,22 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size) return 0; } -/* - * Tries to open a file in direct I/O mode, falls back to buffered mode if not possible. - * Returns UV error number that is < 0 on failure. - * On success sets (*file) to be the uv_file that was opened. +/** + * Open file for I/O. + * + * @param path The full path of the file. + * @param flags Same flags as the open() system call uses. + * @param file On success sets (*file) to be the uv_file that was opened. + * @param direct Tries to open a file in direct I/O mode when direct=1, falls back to buffered mode if not possible. + * @return Returns UV error number that is < 0 on failure. 0 on success. */ -int open_file_direct_io(char *path, int flags, uv_file *file) +int open_file_for_io(char *path, int flags, uv_file *file, int direct) { uv_fs_t req; - int fd, current_flags, direct; + int fd = -1, current_flags; - for (direct = 1 ; direct >= 0 ; --direct) { + fatal_assert(0 == direct || 1 == direct); + for ( ; direct >= 0 ; --direct) { #ifdef __APPLE__ /* Apple OS does not support O_DIRECT */ direct = 0; @@ -106,7 +111,7 @@ int open_file_direct_io(char *path, int flags, uv_file *file) --direct; /* break the loop */ } } else { - assert(req.result >= 0); + fatal_assert(req.result >= 0); *file = req.result; #ifdef __APPLE__ info("Disabling OS X caching for file \"%s\".", path); @@ -159,8 +164,10 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si "global_io_errors: %ld\n" "global_fs_errors: %ld\n" "rrdeng_reserved_file_descriptors: %ld\n" - "flushing_errors: %ld\n" - "global_flushing_errors: %ld\n", + "pg_cache_over_half_dirty_events: %ld\n" + "global_pg_cache_over_half_dirty_events: %ld\n" + "flushing_pressure_page_deletions: %ld\n" + "global_flushing_pressure_page_deletions: %ld\n", (long)ctx->stats.metric_API_producers, (long)ctx->stats.metric_API_consumers, (long)pg_cache->page_descriptors, @@ -194,8 +201,94 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si (long)global_io_errors, (long)global_fs_errors, (long)rrdeng_reserved_file_descriptors, - (long)ctx->stats.flushing_errors, - (long)global_flushing_errors + (long)ctx->stats.pg_cache_over_half_dirty_events, + (long)global_pg_cache_over_half_dirty_events, + (long)ctx->stats.flushing_pressure_page_deletions, + (long)global_flushing_pressure_page_deletions ); return str; } + +int is_legacy_child(const char *machine_guid) +{ + uuid_t uuid; + char dbengine_file[FILENAME_MAX+1]; + + if (unlikely(!strcmp(machine_guid, "unittest-dbengine") || !strcmp(machine_guid, "dbengine-dataset") || + !strcmp(machine_guid, "dbengine-stress-test"))) { + return 1; + } + if (!uuid_parse(machine_guid, uuid)) { + uv_fs_t stat_req; + snprintfz(dbengine_file, FILENAME_MAX, "%s/%s/dbengine", netdata_configured_cache_dir, machine_guid); + int rc = uv_fs_stat(NULL, &stat_req, dbengine_file, NULL); + if (likely(rc == 0 && ((stat_req.statbuf.st_mode & S_IFMT) == S_IFDIR))) { + //info("Found legacy engine folder \"%s\"", dbengine_file); + return 1; + } + } + return 0; +} + +int count_legacy_children(char *dbfiles_path) +{ + int ret; + uv_fs_t req; + uv_dirent_t dent; + int legacy_engines = 0; + + ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL); + if (ret < 0) { + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); + return ret; + } + + while(UV_EOF != uv_fs_scandir_next(&req, &dent)) { + if (dent.type == UV_DIRENT_DIR) { + if (is_legacy_child(dent.name)) + legacy_engines++; + } + } + uv_fs_req_cleanup(&req); + return legacy_engines; +} + +int compute_multidb_diskspace() +{ + char multidb_disk_space_file[FILENAME_MAX + 1]; + FILE *fp; + int computed_multidb_disk_quota_mb = -1; + + snprintfz(multidb_disk_space_file, FILENAME_MAX, "%s/dbengine_multihost_size", netdata_configured_varlib_dir); + fp = fopen(multidb_disk_space_file, "r"); + if (likely(fp)) { + int rc = fscanf(fp, "%d", &computed_multidb_disk_quota_mb); + fclose(fp); + if (unlikely(rc != 1 || computed_multidb_disk_quota_mb < RRDENG_MIN_DISK_SPACE_MB)) { + errno = 0; + error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file); + computed_multidb_disk_quota_mb = -1; + } + } + + if (computed_multidb_disk_quota_mb == -1) { + int rc = count_legacy_children(netdata_configured_cache_dir); + if (likely(rc >= 0)) { + computed_multidb_disk_quota_mb = (rc + 1) * default_rrdeng_disk_quota_mb; + info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb); + + fp = fopen(multidb_disk_space_file, "w"); + if (likely(fp)) { + fprintf(fp, "%d", computed_multidb_disk_quota_mb); + info("Created file '%s' to store the computed value", multidb_disk_space_file); + fclose(fp); + } else + error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file); + } + else + computed_multidb_disk_quota_mb = default_rrdeng_disk_quota_mb; + } + + return computed_multidb_disk_quota_mb; +} diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h index 5685b65a5..ebab93c8f 100644 --- a/database/engine/rrdenginelib.h +++ b/database/engine/rrdenginelib.h @@ -3,10 +3,9 @@ #ifndef NETDATA_RRDENGINELIB_H #define NETDATA_RRDENGINELIB_H -#include "rrdengine.h" - /* Forward declarations */ struct rrdeng_page_descr; +struct rrdengine_instance; #define STR_HELPER(x) #x #define STR(x) STR_HELPER(x) @@ -28,11 +27,43 @@ struct rrdeng_page_descr; typedef uintptr_t rrdeng_stats_t; #ifdef __ATOMIC_RELAXED -#define rrd_stat_atomic_add(p, n) do {(void) __atomic_fetch_add(p, n, __ATOMIC_RELAXED);} while(0) +#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED) +#define rrd_atomic_add_fetch(p, n) __atomic_add_fetch(p, n, __ATOMIC_RELAXED) #else -#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0) +#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n) +#define rrd_atomic_add_fetch(p, n) __sync_add_and_fetch(p, n) #endif +#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n) + +/* returns -1 if it didn't find the first cleared bit, the position otherwise. Starts from LSB. */ +static inline int find_first_zero(unsigned x) +{ + return ffs((int)(~x)) - 1; +} + +/* Starts from LSB. */ +static inline uint8_t check_bit(unsigned x, size_t pos) +{ + return !!(x & (1 << pos)); +} + +/* Starts from LSB. val is 0 or 1 */ +static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val) +{ + switch(val) { + case 0: + *x &= ~(1U << pos); + break; + case 1: + *x |= 1U << pos; + break; + default: + error("modify_bit() called with invalid argument."); + break; + } +} + #define RRDENG_PATH_MAX (4096) /* returns old *ptr value */ @@ -56,8 +87,8 @@ struct completion { static inline void init_completion(struct completion *p) { p->completed = 0; - assert(0 == uv_cond_init(&p->cond)); - assert(0 == uv_mutex_init(&p->mutex)); + fatal_assert(0 == uv_cond_init(&p->cond)); + fatal_assert(0 == uv_mutex_init(&p->mutex)); } static inline void destroy_completion(struct completion *p) @@ -72,7 +103,7 @@ static inline void wait_for_completion(struct completion *p) while (0 == p->completed) { uv_cond_wait(&p->cond, &p->mutex); } - assert(1 == p->completed); + fatal_assert(1 == p->completed); uv_mutex_unlock(&p->mutex); } @@ -97,7 +128,17 @@ static inline void crc32set(void *crcp, uLong crc) extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr); extern void print_page_descr(struct rrdeng_page_descr *descr); extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size); -extern int open_file_direct_io(char *path, int flags, uv_file *file); +extern int open_file_for_io(char *path, int flags, uv_file *file, int direct); +static inline int open_file_direct_io(char *path, int flags, uv_file *file) +{ + return open_file_for_io(path, flags, file, 1); +} +static inline int open_file_buffered_io(char *path, int flags, uv_file *file) +{ + return open_file_for_io(path, flags, file, 0); +} extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size); +extern int compute_multidb_diskspace(); +extern int is_legacy_child(const char *machine_guid); #endif /* NETDATA_RRDENGINELIB_H */ \ No newline at end of file diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c index 0eb9019b4..a23abf307 100644 --- a/database/engine/rrdenglocking.c +++ b/database/engine/rrdenglocking.c @@ -12,8 +12,8 @@ struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance pg_cache_descr->prev = pg_cache_descr->next = NULL; pg_cache_descr->refcnt = 0; pg_cache_descr->waiters = 0; - assert(0 == uv_cond_init(&pg_cache_descr->cond)); - assert(0 == uv_mutex_init(&pg_cache_descr->mutex)); + fatal_assert(0 == uv_cond_init(&pg_cache_descr->cond)); + fatal_assert(0 == uv_mutex_init(&pg_cache_descr->mutex)); return pg_cache_descr; } @@ -39,7 +39,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_ old_users = old_state >> PG_CACHE_DESCR_SHIFT; if (unlikely(we_locked)) { - assert(old_state & PG_CACHE_DESCR_LOCKED); + fatal_assert(old_state & PG_CACHE_DESCR_LOCKED); new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED; ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); if (old_state == ret_state) { @@ -49,7 +49,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_ continue; /* spin */ } if (old_state & PG_CACHE_DESCR_LOCKED) { - assert(0 == old_users); + fatal_assert(0 == old_users); continue; /* spin */ } if (0 == old_state) { @@ -71,8 +71,9 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_ continue; /* spin */ } /* page cache descriptor is already allocated */ - assert(old_state & PG_CACHE_DESCR_ALLOCATED); - + if (unlikely(!(old_state & PG_CACHE_DESCR_ALLOCATED))) { + fatal("Invalid page cache descriptor locking state:%#lX", old_state); + } new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT; new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; @@ -94,7 +95,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) { unsigned long old_state, new_state, ret_state, old_users; - struct page_cache_descr *pg_cache_descr; + struct page_cache_descr *pg_cache_descr, *delete_pg_cache_descr = NULL; uint8_t we_locked; uv_mutex_unlock(&descr->pg_cache_descr->mutex); @@ -105,35 +106,39 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden old_users = old_state >> PG_CACHE_DESCR_SHIFT; if (unlikely(we_locked)) { - assert(0 == old_users); + fatal_assert(0 == old_users); ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0); if (old_state == ret_state) { /* success */ - break; + rrdeng_destroy_pg_cache_descr(ctx, delete_pg_cache_descr); + return; } continue; /* spin */ } if (old_state & PG_CACHE_DESCR_LOCKED) { - assert(0 == old_users); + fatal_assert(0 == old_users); continue; /* spin */ } - assert(old_state & PG_CACHE_DESCR_ALLOCATED); + fatal_assert(old_state & PG_CACHE_DESCR_ALLOCATED); pg_cache_descr = descr->pg_cache_descr; /* caller is the only page cache descriptor user and there are no pending references on the page */ if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) && !pg_cache_descr->flags && !pg_cache_descr->refcnt) { + fatal_assert(!pg_cache_descr->waiters); + new_state = PG_CACHE_DESCR_LOCKED; ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); if (old_state == ret_state) { we_locked = 1; - rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + delete_pg_cache_descr = pg_cache_descr; + descr->pg_cache_descr = NULL; /* retry */ continue; } continue; /* spin */ } - assert(old_users > 0); + fatal_assert(old_users > 0); new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT; new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; @@ -144,7 +149,6 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden } /* spin */ } - } /* @@ -155,34 +159,36 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) { unsigned long old_state, new_state, ret_state, old_users; - struct page_cache_descr *pg_cache_descr; - uint8_t just_locked, we_freed, must_unlock; + struct page_cache_descr *pg_cache_descr = NULL; + uint8_t just_locked, can_free, must_unlock; just_locked = 0; - we_freed = 0; + can_free = 0; must_unlock = 0; while (1) { /* spin */ old_state = descr->pg_cache_descr_state; old_users = old_state >> PG_CACHE_DESCR_SHIFT; if (unlikely(just_locked)) { - assert(0 == old_users); + fatal_assert(0 == old_users); must_unlock = 1; just_locked = 0; /* Try deallocate if there are no pending references on the page */ if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) { - rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); - we_freed = 1; + fatal_assert(!pg_cache_descr->waiters); + + descr->pg_cache_descr = NULL; + can_free = 1; /* success */ continue; } continue; /* spin */ } if (unlikely(must_unlock)) { - assert(0 == old_users); + fatal_assert(0 == old_users); - if (we_freed) { + if (can_free) { /* success */ new_state = 0; } else { @@ -192,6 +198,8 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); if (old_state == ret_state) { /* unlocked */ + if (can_free) + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); return; } continue; /* spin */ @@ -201,16 +209,16 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct return; } if (old_state & PG_CACHE_DESCR_LOCKED) { - assert(0 == old_users); + fatal_assert(0 == old_users); continue; /* spin */ } - pg_cache_descr = descr->pg_cache_descr; /* caller is the only page cache descriptor user */ if (0 == old_users) { new_state = old_state | PG_CACHE_DESCR_LOCKED; ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); if (old_state == ret_state) { just_locked = 1; + pg_cache_descr = descr->pg_cache_descr; /* retry */ continue; } -- cgit v1.2.3