summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-02-07 11:49:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-02-07 12:42:05 +0000
commit2e85f9325a797977eea9dfea0a925775ddd211d9 (patch)
tree452c7f30d62fca5755f659b99e4e53c7b03afc21 /streaming
parentReleasing debian version 1.19.0-4. (diff)
downloadnetdata-2e85f9325a797977eea9dfea0a925775ddd211d9.tar.xz
netdata-2e85f9325a797977eea9dfea0a925775ddd211d9.zip
Merging upstream version 1.29.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/Makefile.in576
-rw-r--r--streaming/README.md322
-rw-r--r--streaming/receiver.c499
-rw-r--r--streaming/rrdpush.c1098
-rw-r--r--streaming/rrdpush.h94
-rw-r--r--streaming/sender.c723
-rw-r--r--streaming/stream.conf49
7 files changed, 1727 insertions, 1634 deletions
diff --git a/streaming/Makefile.in b/streaming/Makefile.in
deleted file mode 100644
index bdef4166c..000000000
--- a/streaming/Makefile.in
+++ /dev/null
@@ -1,576 +0,0 @@
-# Makefile.in generated by automake 1.15.1 from Makefile.am.
-# @configure_input@
-
-# Copyright (C) 1994-2017 Free Software Foundation, Inc.
-
-# This Makefile.in is free software; the Free Software Foundation
-# gives unlimited permission to copy and/or distribute it,
-# with or without modifications, as long as this notice is preserved.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
-# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
-# PARTICULAR PURPOSE.
-
-@SET_MAKE@
-
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-VPATH = @srcdir@
-am__is_gnu_make = { \
- if test -z '$(MAKELEVEL)'; then \
- false; \
- elif test -n '$(MAKE_HOST)'; then \
- true; \
- elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \
- true; \
- else \
- false; \
- fi; \
-}
-am__make_running_with_option = \
- case $${target_option-} in \
- ?) ;; \
- *) echo "am__make_running_with_option: internal error: invalid" \
- "target option '$${target_option-}' specified" >&2; \
- exit 1;; \
- esac; \
- has_opt=no; \
- sane_makeflags=$$MAKEFLAGS; \
- if $(am__is_gnu_make); then \
- sane_makeflags=$$MFLAGS; \
- else \
- case $$MAKEFLAGS in \
- *\\[\ \ ]*) \
- bs=\\; \
- sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \
- | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \
- esac; \
- fi; \
- skip_next=no; \
- strip_trailopt () \
- { \
- flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \
- }; \
- for flg in $$sane_makeflags; do \
- test $$skip_next = yes && { skip_next=no; continue; }; \
- case $$flg in \
- *=*|--*) continue;; \
- -*I) strip_trailopt 'I'; skip_next=yes;; \
- -*I?*) strip_trailopt 'I';; \
- -*O) strip_trailopt 'O'; skip_next=yes;; \
- -*O?*) strip_trailopt 'O';; \
- -*l) strip_trailopt 'l'; skip_next=yes;; \
- -*l?*) strip_trailopt 'l';; \
- -[dEDm]) skip_next=yes;; \
- -[JT]) skip_next=yes;; \
- esac; \
- case $$flg in \
- *$$target_option*) has_opt=yes; break;; \
- esac; \
- done; \
- test $$has_opt = yes
-am__make_dryrun = (target_option=n; $(am__make_running_with_option))
-am__make_keepgoing = (target_option=k; $(am__make_running_with_option))
-pkgdatadir = $(datadir)/@PACKAGE@
-pkgincludedir = $(includedir)/@PACKAGE@
-pkglibdir = $(libdir)/@PACKAGE@
-pkglibexecdir = $(libexecdir)/@PACKAGE@
-am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
-install_sh_DATA = $(install_sh) -c -m 644
-install_sh_PROGRAM = $(install_sh) -c
-install_sh_SCRIPT = $(install_sh) -c
-INSTALL_HEADER = $(INSTALL_DATA)
-transform = $(program_transform_name)
-NORMAL_INSTALL = :
-PRE_INSTALL = :
-POST_INSTALL = :
-NORMAL_UNINSTALL = :
-PRE_UNINSTALL = :
-POST_UNINSTALL = :
-build_triplet = @build@
-host_triplet = @host@
-subdir = streaming
-ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
-am__aclocal_m4_deps = $(top_srcdir)/build/m4/ax_c___atomic.m4 \
- $(top_srcdir)/build/m4/ax_c__generic.m4 \
- $(top_srcdir)/build/m4/ax_c_lto.m4 \
- $(top_srcdir)/build/m4/ax_c_mallinfo.m4 \
- $(top_srcdir)/build/m4/ax_c_mallopt.m4 \
- $(top_srcdir)/build/m4/ax_check_compile_flag.m4 \
- $(top_srcdir)/build/m4/ax_gcc_func_attribute.m4 \
- $(top_srcdir)/build/m4/ax_pthread.m4 \
- $(top_srcdir)/build/m4/jemalloc.m4 \
- $(top_srcdir)/build/m4/tcmalloc.m4 $(top_srcdir)/configure.ac
-am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
- $(ACLOCAL_M4)
-DIST_COMMON = $(srcdir)/Makefile.am $(dist_libconfig_DATA) \
- $(dist_noinst_DATA) $(am__DIST_COMMON)
-mkinstalldirs = $(install_sh) -d
-CONFIG_HEADER = $(top_builddir)/config.h
-CONFIG_CLEAN_FILES =
-CONFIG_CLEAN_VPATH_FILES =
-AM_V_P = $(am__v_P_@AM_V@)
-am__v_P_ = $(am__v_P_@AM_DEFAULT_V@)
-am__v_P_0 = false
-am__v_P_1 = :
-AM_V_GEN = $(am__v_GEN_@AM_V@)
-am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@)
-am__v_GEN_0 = @echo " GEN " $@;
-am__v_GEN_1 =
-AM_V_at = $(am__v_at_@AM_V@)
-am__v_at_ = $(am__v_at_@AM_DEFAULT_V@)
-am__v_at_0 = @
-am__v_at_1 =
-SOURCES =
-DIST_SOURCES =
-am__can_run_installinfo = \
- case $$AM_UPDATE_INFO_DIR in \
- n|no|NO) false;; \
- *) (install-info --version) >/dev/null 2>&1;; \
- esac
-am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
-am__vpath_adj = case $$p in \
- $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
- *) f=$$p;; \
- esac;
-am__strip_dir = f=`echo $$p | sed -e 's|^.*/||'`;
-am__install_max = 40
-am__nobase_strip_setup = \
- srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*|]/\\\\&/g'`
-am__nobase_strip = \
- for p in $$list; do echo "$$p"; done | sed -e "s|$$srcdirstrip/||"
-am__nobase_list = $(am__nobase_strip_setup); \
- for p in $$list; do echo "$$p $$p"; done | \
- sed "s| $$srcdirstrip/| |;"' / .*\//!s/ .*/ ./; s,\( .*\)/[^/]*$$,\1,' | \
- $(AWK) 'BEGIN { files["."] = "" } { files[$$2] = files[$$2] " " $$1; \
- if (++n[$$2] == $(am__install_max)) \
- { print $$2, files[$$2]; n[$$2] = 0; files[$$2] = "" } } \
- END { for (dir in files) print dir, files[dir] }'
-am__base_list = \
- sed '$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;s/\n/ /g' | \
- sed '$$!N;$$!N;$$!N;$$!N;s/\n/ /g'
-am__uninstall_files_from_dir = { \
- test -z "$$files" \
- || { test ! -d "$$dir" && test ! -f "$$dir" && test ! -r "$$dir"; } \
- || { echo " ( cd '$$dir' && rm -f" $$files ")"; \
- $(am__cd) "$$dir" && rm -f $$files; }; \
- }
-am__installdirs = "$(DESTDIR)$(libconfigdir)"
-DATA = $(dist_libconfig_DATA) $(dist_noinst_DATA)
-am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP)
-am__DIST_COMMON = $(srcdir)/Makefile.in
-DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
-ACLOCAL = @ACLOCAL@
-AMTAR = @AMTAR@
-AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@
-AUTOCONF = @AUTOCONF@
-AUTOHEADER = @AUTOHEADER@
-AUTOMAKE = @AUTOMAKE@
-AWK = @AWK@
-CC = @CC@
-CCDEPMODE = @CCDEPMODE@
-CFLAGS = @CFLAGS@
-CMOCKA_CFLAGS = @CMOCKA_CFLAGS@
-CMOCKA_LIBS = @CMOCKA_LIBS@
-CPP = @CPP@
-CPPFLAGS = @CPPFLAGS@
-CUPSCONFIG = @CUPSCONFIG@
-CXX = @CXX@
-CXXDEPMODE = @CXXDEPMODE@
-CXXFLAGS = @CXXFLAGS@
-CXX_BINARY = @CXX_BINARY@
-CYGPATH_W = @CYGPATH_W@
-DEFS = @DEFS@
-DEPDIR = @DEPDIR@
-ECHO_C = @ECHO_C@
-ECHO_N = @ECHO_N@
-ECHO_T = @ECHO_T@
-EGREP = @EGREP@
-ENABLE_UNITTESTS = @ENABLE_UNITTESTS@
-EXEEXT = @EXEEXT@
-GREP = @GREP@
-INSTALL = @INSTALL@
-INSTALL_DATA = @INSTALL_DATA@
-INSTALL_PROGRAM = @INSTALL_PROGRAM@
-INSTALL_SCRIPT = @INSTALL_SCRIPT@
-INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
-IPMIMONITORING_CFLAGS = @IPMIMONITORING_CFLAGS@
-IPMIMONITORING_LIBS = @IPMIMONITORING_LIBS@
-JSON_CFLAGS = @JSON_CFLAGS@
-JSON_LIBS = @JSON_LIBS@
-LDFLAGS = @LDFLAGS@
-LIBCAP_CFLAGS = @LIBCAP_CFLAGS@
-LIBCAP_LIBS = @LIBCAP_LIBS@
-LIBCRYPTO_CFLAGS = @LIBCRYPTO_CFLAGS@
-LIBCRYPTO_LIBS = @LIBCRYPTO_LIBS@
-LIBCURL_CFLAGS = @LIBCURL_CFLAGS@
-LIBCURL_LIBS = @LIBCURL_LIBS@
-LIBMNL_CFLAGS = @LIBMNL_CFLAGS@
-LIBMNL_LIBS = @LIBMNL_LIBS@
-LIBMONGOC_CFLAGS = @LIBMONGOC_CFLAGS@
-LIBMONGOC_LIBS = @LIBMONGOC_LIBS@
-LIBOBJS = @LIBOBJS@
-LIBS = @LIBS@
-LIBSSL_CFLAGS = @LIBSSL_CFLAGS@
-LIBSSL_LIBS = @LIBSSL_LIBS@
-LTLIBOBJS = @LTLIBOBJS@
-MAINT = @MAINT@
-MAKEINFO = @MAKEINFO@
-MATH_CFLAGS = @MATH_CFLAGS@
-MATH_LIBS = @MATH_LIBS@
-MKDIR_P = @MKDIR_P@
-NFACCT_CFLAGS = @NFACCT_CFLAGS@
-NFACCT_LIBS = @NFACCT_LIBS@
-OBJEXT = @OBJEXT@
-OPTIONAL_CUPS_CFLAGS = @OPTIONAL_CUPS_CFLAGS@
-OPTIONAL_CUPS_LIBS = @OPTIONAL_CUPS_LIBS@
-OPTIONAL_IPMIMONITORING_CFLAGS = @OPTIONAL_IPMIMONITORING_CFLAGS@
-OPTIONAL_IPMIMONITORING_LIBS = @OPTIONAL_IPMIMONITORING_LIBS@
-OPTIONAL_JSONC_LIBS = @OPTIONAL_JSONC_LIBS@
-OPTIONAL_JUDY_LIBS = @OPTIONAL_JUDY_LIBS@
-OPTIONAL_KINESIS_CFLAGS = @OPTIONAL_KINESIS_CFLAGS@
-OPTIONAL_KINESIS_LIBS = @OPTIONAL_KINESIS_LIBS@
-OPTIONAL_LIBCAP_CFLAGS = @OPTIONAL_LIBCAP_CFLAGS@
-OPTIONAL_LIBCAP_LIBS = @OPTIONAL_LIBCAP_LIBS@
-OPTIONAL_LZ4_LIBS = @OPTIONAL_LZ4_LIBS@
-OPTIONAL_MATH_CFLAGS = @OPTIONAL_MATH_CFLAGS@
-OPTIONAL_MATH_LIBS = @OPTIONAL_MATH_LIBS@
-OPTIONAL_MONGOC_CFLAGS = @OPTIONAL_MONGOC_CFLAGS@
-OPTIONAL_MONGOC_LIBS = @OPTIONAL_MONGOC_LIBS@
-OPTIONAL_NFACCT_CFLAGS = @OPTIONAL_NFACCT_CFLAGS@
-OPTIONAL_NFACCT_LIBS = @OPTIONAL_NFACCT_LIBS@
-OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS = @OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS@
-OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS = @OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS@
-OPTIONAL_SSL_LIBS = @OPTIONAL_SSL_LIBS@
-OPTIONAL_UUID_CFLAGS = @OPTIONAL_UUID_CFLAGS@
-OPTIONAL_UUID_LIBS = @OPTIONAL_UUID_LIBS@
-OPTIONAL_UV_LIBS = @OPTIONAL_UV_LIBS@
-OPTIONAL_XENSTAT_CFLAGS = @OPTIONAL_XENSTAT_CFLAGS@
-OPTIONAL_XENSTAT_LIBS = @OPTIONAL_XENSTAT_LIBS@
-OPTIONAL_ZLIB_CFLAGS = @OPTIONAL_ZLIB_CFLAGS@
-OPTIONAL_ZLIB_LIBS = @OPTIONAL_ZLIB_LIBS@
-PACKAGE = @PACKAGE@
-PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
-PACKAGE_NAME = @PACKAGE_NAME@
-PACKAGE_RPM_VERSION = @PACKAGE_RPM_VERSION@
-PACKAGE_STRING = @PACKAGE_STRING@
-PACKAGE_TARNAME = @PACKAGE_TARNAME@
-PACKAGE_URL = @PACKAGE_URL@
-PACKAGE_VERSION = @PACKAGE_VERSION@
-PATH_SEPARATOR = @PATH_SEPARATOR@
-PKG_CONFIG = @PKG_CONFIG@
-PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@
-PKG_CONFIG_PATH = @PKG_CONFIG_PATH@
-PROTOBUF_CFLAGS = @PROTOBUF_CFLAGS@
-PROTOBUF_LIBS = @PROTOBUF_LIBS@
-PROTOC = @PROTOC@
-PTHREAD_CC = @PTHREAD_CC@
-PTHREAD_CFLAGS = @PTHREAD_CFLAGS@
-PTHREAD_LIBS = @PTHREAD_LIBS@
-SET_MAKE = @SET_MAKE@
-SHELL = @SHELL@
-SSE_CANDIDATE = @SSE_CANDIDATE@
-STRIP = @STRIP@
-TEST_CFLAGS = @TEST_CFLAGS@
-TEST_LIBS = @TEST_LIBS@
-UUID_CFLAGS = @UUID_CFLAGS@
-UUID_LIBS = @UUID_LIBS@
-VERSION = @VERSION@
-XENLIGHT_CFLAGS = @XENLIGHT_CFLAGS@
-XENLIGHT_LIBS = @XENLIGHT_LIBS@
-YAJL_CFLAGS = @YAJL_CFLAGS@
-YAJL_LIBS = @YAJL_LIBS@
-ZLIB_CFLAGS = @ZLIB_CFLAGS@
-ZLIB_LIBS = @ZLIB_LIBS@
-abs_builddir = @abs_builddir@
-abs_srcdir = @abs_srcdir@
-abs_top_builddir = @abs_top_builddir@
-abs_top_srcdir = @abs_top_srcdir@
-ac_ct_CC = @ac_ct_CC@
-ac_ct_CXX = @ac_ct_CXX@
-am__include = @am__include@
-am__leading_dot = @am__leading_dot@
-am__quote = @am__quote@
-am__tar = @am__tar@
-am__untar = @am__untar@
-ax_pthread_config = @ax_pthread_config@
-bindir = @bindir@
-build = @build@
-build_alias = @build_alias@
-build_cpu = @build_cpu@
-build_os = @build_os@
-build_target = @build_target@
-build_vendor = @build_vendor@
-builddir = @builddir@
-cachedir = @cachedir@
-chartsdir = @chartsdir@
-configdir = @configdir@
-datadir = @datadir@
-datarootdir = @datarootdir@
-docdir = @docdir@
-dvidir = @dvidir@
-exec_prefix = @exec_prefix@
-has_jemalloc = @has_jemalloc@
-has_tcmalloc = @has_tcmalloc@
-host = @host@
-host_alias = @host_alias@
-host_cpu = @host_cpu@
-host_os = @host_os@
-host_vendor = @host_vendor@
-htmldir = @htmldir@
-includedir = @includedir@
-infodir = @infodir@
-install_sh = @install_sh@
-libconfigdir = @libconfigdir@
-libdir = @libdir@
-libexecdir = @libexecdir@
-localedir = @localedir@
-localstatedir = @localstatedir@
-logdir = @logdir@
-mandir = @mandir@
-mkdir_p = @mkdir_p@
-nodedir = @nodedir@
-oldincludedir = @oldincludedir@
-pdfdir = @pdfdir@
-pluginsdir = @pluginsdir@
-prefix = @prefix@
-program_transform_name = @program_transform_name@
-psdir = @psdir@
-pythondir = @pythondir@
-registrydir = @registrydir@
-runstatedir = @runstatedir@
-sbindir = @sbindir@
-sharedstatedir = @sharedstatedir@
-srcdir = @srcdir@
-sysconfdir = @sysconfdir@
-target_alias = @target_alias@
-top_build_prefix = @top_build_prefix@
-top_builddir = @top_builddir@
-top_srcdir = @top_srcdir@
-varlibdir = @varlibdir@
-webdir = @webdir@
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-dist_libconfig_DATA = \
- stream.conf \
- $(NULL)
-
-dist_noinst_DATA = \
- README.md \
- $(NULL)
-
-all: all-am
-
-.SUFFIXES:
-$(srcdir)/Makefile.in: @MAINTAINER_MODE_TRUE@ $(srcdir)/Makefile.am $(am__configure_deps)
- @for dep in $?; do \
- case '$(am__configure_deps)' in \
- *$$dep*) \
- ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \
- && { if test -f $@; then exit 0; else break; fi; }; \
- exit 1;; \
- esac; \
- done; \
- echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu streaming/Makefile'; \
- $(am__cd) $(top_srcdir) && \
- $(AUTOMAKE) --gnu streaming/Makefile
-Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
- @case '$?' in \
- *config.status*) \
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
- *) \
- echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \
- cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \
- esac;
-
-$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-
-$(top_srcdir)/configure: @MAINTAINER_MODE_TRUE@ $(am__configure_deps)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-$(ACLOCAL_M4): @MAINTAINER_MODE_TRUE@ $(am__aclocal_m4_deps)
- cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
-$(am__aclocal_m4_deps):
-install-dist_libconfigDATA: $(dist_libconfig_DATA)
- @$(NORMAL_INSTALL)
- @list='$(dist_libconfig_DATA)'; test -n "$(libconfigdir)" || list=; \
- if test -n "$$list"; then \
- echo " $(MKDIR_P) '$(DESTDIR)$(libconfigdir)'"; \
- $(MKDIR_P) "$(DESTDIR)$(libconfigdir)" || exit 1; \
- fi; \
- for p in $$list; do \
- if test -f "$$p"; then d=; else d="$(srcdir)/"; fi; \
- echo "$$d$$p"; \
- done | $(am__base_list) | \
- while read files; do \
- echo " $(INSTALL_DATA) $$files '$(DESTDIR)$(libconfigdir)'"; \
- $(INSTALL_DATA) $$files "$(DESTDIR)$(libconfigdir)" || exit $$?; \
- done
-
-uninstall-dist_libconfigDATA:
- @$(NORMAL_UNINSTALL)
- @list='$(dist_libconfig_DATA)'; test -n "$(libconfigdir)" || list=; \
- files=`for p in $$list; do echo $$p; done | sed -e 's|^.*/||'`; \
- dir='$(DESTDIR)$(libconfigdir)'; $(am__uninstall_files_from_dir)
-tags TAGS:
-
-ctags CTAGS:
-
-cscope cscopelist:
-
-
-distdir: $(DISTFILES)
- @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
- topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
- list='$(DISTFILES)'; \
- dist_files=`for file in $$list; do echo $$file; done | \
- sed -e "s|^$$srcdirstrip/||;t" \
- -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \
- case $$dist_files in \
- */*) $(MKDIR_P) `echo "$$dist_files" | \
- sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \
- sort -u` ;; \
- esac; \
- for file in $$dist_files; do \
- if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
- if test -d $$d/$$file; then \
- dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \
- if test -d "$(distdir)/$$file"; then \
- find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
- fi; \
- if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
- cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \
- find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
- fi; \
- cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \
- else \
- test -f "$(distdir)/$$file" \
- || cp -p $$d/$$file "$(distdir)/$$file" \
- || exit 1; \
- fi; \
- done
-check-am: all-am
-check: check-am
-all-am: Makefile $(DATA)
-installdirs:
- for dir in "$(DESTDIR)$(libconfigdir)"; do \
- test -z "$$dir" || $(MKDIR_P) "$$dir"; \
- done
-install: install-am
-install-exec: install-exec-am
-install-data: install-data-am
-uninstall: uninstall-am
-
-install-am: all-am
- @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
-
-installcheck: installcheck-am
-install-strip:
- if test -z '$(STRIP)'; then \
- $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
- install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
- install; \
- else \
- $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
- install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
- "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \
- fi
-mostlyclean-generic:
-
-clean-generic:
-
-distclean-generic:
- -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
- -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES)
-
-maintainer-clean-generic:
- @echo "This command is intended for maintainers to use"
- @echo "it deletes files that may require special tools to rebuild."
- -test -z "$(MAINTAINERCLEANFILES)" || rm -f $(MAINTAINERCLEANFILES)
-clean: clean-am
-
-clean-am: clean-generic mostlyclean-am
-
-distclean: distclean-am
- -rm -f Makefile
-distclean-am: clean-am distclean-generic
-
-dvi: dvi-am
-
-dvi-am:
-
-html: html-am
-
-html-am:
-
-info: info-am
-
-info-am:
-
-install-data-am: install-dist_libconfigDATA
-
-install-dvi: install-dvi-am
-
-install-dvi-am:
-
-install-exec-am:
-
-install-html: install-html-am
-
-install-html-am:
-
-install-info: install-info-am
-
-install-info-am:
-
-install-man:
-
-install-pdf: install-pdf-am
-
-install-pdf-am:
-
-install-ps: install-ps-am
-
-install-ps-am:
-
-installcheck-am:
-
-maintainer-clean: maintainer-clean-am
- -rm -f Makefile
-maintainer-clean-am: distclean-am maintainer-clean-generic
-
-mostlyclean: mostlyclean-am
-
-mostlyclean-am: mostlyclean-generic
-
-pdf: pdf-am
-
-pdf-am:
-
-ps: ps-am
-
-ps-am:
-
-uninstall-am: uninstall-dist_libconfigDATA
-
-.MAKE: install-am install-strip
-
-.PHONY: all all-am check check-am clean clean-generic cscopelist-am \
- ctags-am distclean distclean-generic distdir dvi dvi-am html \
- html-am info info-am install install-am install-data \
- install-data-am install-dist_libconfigDATA install-dvi \
- install-dvi-am install-exec install-exec-am install-html \
- install-html-am install-info install-info-am install-man \
- install-pdf install-pdf-am install-ps install-ps-am \
- install-strip installcheck installcheck-am installdirs \
- maintainer-clean maintainer-clean-generic mostlyclean \
- mostlyclean-generic pdf pdf-am ps ps-am tags-am uninstall \
- uninstall-am uninstall-dist_libconfigDATA
-
-.PRECIOUS: Makefile
-
-
-# Tell versions [3.59,3.63) of GNU make to not export all variables.
-# Otherwise a system limit (for SysV at least) may be exceeded.
-.NOEXPORT:
diff --git a/streaming/README.md b/streaming/README.md
index 7fba3552a..94ab1f2e8 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -1,57 +1,70 @@
+<!--
+title: "Streaming and replication"
+description: "Replicate and mirror Netdata's metrics through real-time streaming from child to parent nodes. Then combine, correlate, and export."
+custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README.md
+-->
+
# Streaming and replication
Each Netdata is able to replicate/mirror its database to another Netdata, by streaming collected
metrics, in real-time to it. This is quite different to [data archiving to third party time-series
-databases](../backends).
+databases](/exporting/README.md).
+
+When Netdata streams metrics to another Netdata, the receiving one is able to perform everything a Netdata instance is
+capable of:
-When Netdata streams metrics to another Netdata, the receiving one is able to perform everything a Netdata instance is capable of:
+- Visualize metrics with a dashboard
+- Run health checks that trigger alarms and send alarm notifications
+- Export metrics to a external time-series database
-- visualize them with a dashboard
-- run health checks that trigger alarms and send alarm notifications
-- archive metrics to a backend time-series database
+The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
+There are also **proxies**, which collects metrics from a child and sends it to a parent.
## Supported configurations
### Netdata without a database or web API (headless collector)
-Local Netdata (`slave`), **without any database or alarms**, collects metrics and sends them to
-another Netdata (`master`).
+Local Netdata (child), **without any database or alarms**, collects metrics and sends them to another Netdata
+(parent).
-The node menu shows a list of all "databases streamed to" the master. Clicking one of those links allows the user to view the full dashboard of the `slave` Netdata. The URL has the form <http://master-host:master-port/host/slave-host/>.
+The node menu shows a list of all "databases streamed to" the parent. Clicking one of those links allows the user to
+view the full dashboard of the child node. The URL has the form
+`http://parent-host:parent-port/host/child-host/`.
-Alarms for the `slave` are served by the `master`.
+Alarms for the child are served by the parent.
-In this mode the `slave` is just a plain data collector. It spawns all external plugins, but instead
-of maintaining a local database and accepting dashboard requests, it streams all metrics to the
-`master`. The memory footprint is reduced significantly, to between 6 MiB and 40 MiB, depending on the enabled plugins. To reduce the memory usage as much as possible, refer to [running Netdata in embedded devices](../docs/Performance.md#running-netdata-in-embedded-devices).
+In this mode the child is just a plain data collector. It spawns all external plugins, but instead of maintaining a
+local database and accepting dashboard requests, it streams all metrics to the parent. The memory footprint is reduced
+significantly, to between 6 MiB and 40 MiB, depending on the enabled plugins. To reduce the memory usage as much as
+possible, refer to the [performance optimization guide](/docs/guides/configure/performance.md).
-The same `master` can collect data for any number of `slaves`.
+The same parent can collect data for any number of child nodes.
-### database replication
+### Database Replication
-Local Netdata (`slave`), **with a local database (and possibly alarms)**, collects metrics and
-sends them to another Netdata (`master`).
+Local Netdata (child), **with a local database (and possibly alarms)**, collects metrics and
+sends them to another Netdata (parent).
-The user can use all the functions **at both** <http://slave-ip:slave-port/> and
-<http://master-host:master-port/host/slave-host/>.
+The user can use all the functions **at both** `http://child-ip:child-port/` and
+`http://parent-host:parent-port/host/child-host/`.
-The `slave` and the `master` may have different data retention policies for the same metrics.
+The child and the parent may have different data retention policies for the same metrics.
-Alarms for the `slave` are triggered by **both** the `slave` and the `master` (and actually
+Alarms for the child are triggered by **both** the child and the parent (and actually
each can have different alarms configurations or have alarms disabled).
-Take a note, that custom chart names, configured on the `slave`, should be in the form `type.name` to work correctly. The `master` will truncate the `type` part and substitute the original chart `type` to store the name in the database.
+Take a note, that custom chart names, configured on the child, should be in the form `type.name` to work correctly. The parent will truncate the `type` part and substitute the original chart `type` to store the name in the database.
### Netdata proxies
-Local Netdata (`slave`), with or without a database, collects metrics and sends them to another
-Netdata (`proxy`), which may or may not maintain a database, which forwards them to another
-Netdata (`master`).
+Local Netdata (child), with or without a database, collects metrics and sends them to another
+Netdata (**proxy**), which may or may not maintain a database, which forwards them to another
+Netdata (parent).
-Alarms for the slave can be triggered by any of the involved hosts that maintains a database.
+Alarms for the child can be triggered by any of the involved hosts that maintains a database.
Any number of daisy chaining Netdata servers are supported, each with or without a database and
-with or without alarms for the `slave` metrics.
+with or without alarms for the child metrics.
### mix and match with backends
@@ -89,7 +102,9 @@ monitoring (there cannot be health monitoring without a database).
`[web].mode = none` disables the API (Netdata will not listen to any ports).
This also disables the registry (there cannot be a registry without an API).
-`accept a streaming request every seconds` can be used to set a limit on how often a master Netdata server will accept streaming requests from the slaves. 0 sets no limit, 1 means maximum once every second. If this is set, you may see error log entries "... too busy to accept new streaming request. Will be allowed in X secs".
+`accept a streaming request every seconds` can be used to set a limit on how often a parent node will accept streaming
+requests from its child nodes. 0 sets no limit, 1 means maximum once every second. If this is set, you may see error log
+entries "... too busy to accept new streaming request. Will be allowed in X secs".
```
[backend]
@@ -104,7 +119,7 @@ this host).
### streaming configuration
-A new file is introduced: [stream.conf](stream.conf) (to edit it on your system run
+A new file is introduced: `stream.conf` (to edit it on your system run
`/etc/netdata/edit-config stream.conf`). This file holds streaming configuration for both the
sending and the receiving Netdata.
@@ -119,7 +134,7 @@ sending-receiving Netdata.
This is the section for the sending Netdata. On the receiving node, `[stream].enabled` can be `no`.
If it is `yes`, the receiving node will also stream the metrics to another node (i.e. it will be
-a `proxy`).
+a proxy).
```
[stream]
@@ -137,7 +152,7 @@ This is an overview of how these options can be combined:
| proxy with db|not `none`|not `none`|`yes`|possible|possible|yes|
| central netdata|not `none`|not `none`|`no`|possible|possible|yes|
-For the options to encrypt the data stream between the slave and the master, refer to [securing the communication](#securing-streaming-communications)
+For the options to encrypt the data stream between the child and the parent, refer to [securing the communication](#securing-streaming-communications)
##### options for the receiving node
@@ -159,7 +174,7 @@ all hosts pushed with this API key.
You can also add sections like this:
```sh
-# replace MACHINE_GUID with the slave /var/lib/netdata/registry/netdata.public.unique.id
+# replace MACHINE_GUID with the child /var/lib/netdata/registry/netdata.public.unique.id
[MACHINE_GUID]
enabled = yes
history = 3600
@@ -168,18 +183,18 @@ You can also add sections like this:
allow from = *
```
-The above is the receiver configuration of a single host, at the receiver end. `MACHINE_GUID` is
+The above is the parent configuration of a single host, at the parent end. `MACHINE_GUID` is
the unique id the Netdata generating the metrics (i.e. the Netdata that originally collects
them `/var/lib/netdata/registry/netdata.unique.id`). So, metrics for Netdata `A` that pass through
any number of other Netdata, will have the same `MACHINE_GUID`.
You can also use `default memory mode = dbengine` for an API key or `memory mode = dbengine` for
- a single host. The additional `page cache size` and `dbengine disk space` configuration options
+ a single host. The additional `page cache size` and `dbengine multihost disk space` configuration options
are inherited from the global Netdata configuration.
##### allow from
-`allow from` settings are [Netdata simple patterns](../libnetdata/simple_pattern): string matches
+`allow from` settings are [Netdata simple patterns](/libnetdata/simple_pattern/README.md): string matches
that use `*` as wildcard (any number of times) and a `!` prefix for a negative match.
So: `allow from = !10.1.2.3 10.*` will allow all IPs in `10.*` except `10.1.2.3`. The order is
important: left to right, the first positive or negative match is used.
@@ -188,7 +203,7 @@ important: left to right, the first positive or negative match is used.
##### tracing
-When a `slave` is trying to push metrics to a `master` or `proxy`, it logs entries like these:
+When a child is trying to push metrics to a parent or proxy, it logs entries like these:
```
2017-02-25 01:57:44: netdata: ERROR: Failed to connect to '10.11.12.1', port '19999' (errno 111, Connection refused)
@@ -200,7 +215,7 @@ When a `slave` is trying to push metrics to a `master` or `proxy`, it logs entri
2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [send]: ready - sending metrics...
```
-The receiving end (`proxy` or `master`) logs entries like these:
+The receiving end (proxy or parent) logs entries like these:
```
2017-02-25 01:58:04: netdata: INFO : STREAM [receive from [10.11.12.11]:33554]: new client connection.
@@ -214,21 +229,23 @@ For Netdata v1.9+, streaming can also be monitored via `access.log`.
### Securing streaming communications
-Netdata does not activate TLS encryption by default. To encrypt streaming connections, you first need to [enable TLS support](../web/server/#enabling-tls-support) on the master. With encryption enabled on the receiving side, you need to instruct the slave to use TLS/SSL as well. On the slave's `stream.conf`, configure the destination as follows:
+Netdata does not activate TLS encryption by default. To encrypt streaming connections, you first need to [enable TLS support](/web/server/README.md#enabling-tls-support) on the parent. With encryption enabled on the receiving side, you need to instruct the child to use TLS/SSL as well. On the child's `stream.conf`, configure the destination as follows:
```
[stream]
destination = host:port:SSL
```
-The word `SSL` appended to the end of the destination tells the slave that connections must be encrypted.
+The word `SSL` appended to the end of the destination tells the child that connections must be encrypted.
-??? info "Differences in TLS and SSL terminology"
- While Netdata uses Transport Layer Security (TLS) 1.2 to encrypt communications rather than the obsolete SSL protocol, it's still common practice to refer to encrypted web connections as `SSL`. Many vendors, like Nginx and even Netdata itself, use `SSL` in configuration files, whereas documentation will always refer to encrypted communications as `TLS` or `TLS/SSL`.
+> While Netdata uses Transport Layer Security (TLS) 1.2 to encrypt communications rather than the obsolete SSL protocol,
+> it's still common practice to refer to encrypted web connections as `SSL`. Many vendors, like Nginx and even Netdata
+> itself, use `SSL` in configuration files, whereas documentation will always refer to encrypted communications as `TLS`
+> or `TLS/SSL`.
#### Certificate verification
-When TLS/SSL is enabled on the slave, the default behavior will be to not connect with the master unless the server's certificate can be verified via the default chain. In case you want to avoid this check, add the following to the slave's `stream.conf` file:
+When TLS/SSL is enabled on the child, the default behavior will be to not connect with the parent unless the server's certificate can be verified via the default chain. In case you want to avoid this check, add the following to the child's `stream.conf` file:
```
[stream]
@@ -243,15 +260,15 @@ Given these known issues, you have two options. If you trust your certificate, y
For more details about these options, you can read about [verify locations](https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_load_verify_locations.html).
-Before you changed your streaming configuration, you need to copy your trusted certificate to your slave system and add the certificate to OpenSSL's list.
+Before you changed your streaming configuration, you need to copy your trusted certificate to your child system and add the certificate to OpenSSL's list.
On most Linux distributions, the `update-ca-certificates` command searches inside the `/usr/share/ca-certificates` directory for certificates. You should double-check by reading the `update-ca-certificate` manual (`man update-ca-certificate`), and then change the directory in the below commands if needed.
-If you have `sudo` configured on your slave system, you can use that to run the following commands. If not, you'll have to log in as `root` to complete them.
+If you have `sudo` configured on your child system, you can use that to run the following commands. If not, you'll have to log in as `root` to complete them.
```
# mkdir /usr/share/ca-certificates/netdata
-# cp master_cert.pem /usr/share/ca-certificates/netdata/master_cert.crt
+# cp parent_cert.pem /usr/share/ca-certificates/netdata/parent_cert.crt
# chown -R netdata.netdata /usr/share/ca-certificates/netdata/
```
@@ -260,7 +277,7 @@ First, you create a new directory to store your certificates for Netdata. Next,
Next, edit the file `/etc/ca-certificates.conf` and add the following line:
```
-netdata/master_cert.crt
+netdata/parent_cert.crt
```
Now you update the list of certificates running the following, again either as `sudo` or `root`:
@@ -269,35 +286,35 @@ Now you update the list of certificates running the following, again either as `
# update-ca-certificates
```
-!!! note
- Some Linux distributions have different methods of updating the certificate list. For more details, please read this guide on [addding trusted root certificates](https://github.com/Busindre/How-to-Add-trusted-root-certificates).
+> Some Linux distributions have different methods of updating the certificate list. For more details, please read this
+> guide on [adding trusted root certificates](https://github.com/Busindre/How-to-Add-trusted-root-certificates).
-Once you update your certificate list, you can set the stream parameters for Netdata to trust the master certificate. Open `stream.conf` for editing and change the following lines:
+Once you update your certificate list, you can set the stream parameters for Netdata to trust the parent certificate. Open `stream.conf` for editing and change the following lines:
```
[stream]
CApath = /etc/ssl/certs/
- CAfile = /etc/ssl/certs/master_cert.pem
+ CAfile = /etc/ssl/certs/parent_cert.pem
```
-With this configuration, the `CApath` option tells Netdata to search for trusted certificates inside `/etc/ssl/certs`. The `CAfile` option specifies the Netdata master certificate is located at `/etc/ssl/certs/master_cert.pem`. With this configuration, you can skip using the system's entire list of certificates and use Netdata's master certificate instead.
+With this configuration, the `CApath` option tells Netdata to search for trusted certificates inside `/etc/ssl/certs`. The `CAfile` option specifies the Netdata parent certificate is located at `/etc/ssl/certs/parent_cert.pem`. With this configuration, you can skip using the system's entire list of certificates and use Netdata's parent certificate instead.
#### Expected behaviors
-With the introduction of TLS/SSL, the master-slave communication behaves as shown in the table below, depending on the following configurations:
+With the introduction of TLS/SSL, the parent-child communication behaves as shown in the table below, depending on the following configurations:
-- **Master TLS (Yes/No)**: Whether the `[web]` section in `netdata.conf` has `ssl key` and `ssl certificate`.
-- **Master port TLS (-/force/optional)**: Depends on whether the `[web]` section `bind to` contains a `^SSL=force` or `^SSL=optional` directive on the port(s) used for streaming.
-- **Slave TLS (Yes/No)**: Whether the destination in the slave's `stream.conf` has `:SSL` at the end.
-- **Slave TLS Verification (yes/no)**: Value of the slave's `stream.conf` `ssl skip certificate verification` parameter (default is no).
+- **Parent TLS (Yes/No)**: Whether the `[web]` section in `netdata.conf` has `ssl key` and `ssl certificate`.
+- **Parent port TLS (-/force/optional)**: Depends on whether the `[web]` section `bind to` contains a `^SSL=force` or `^SSL=optional` directive on the port(s) used for streaming.
+- **Child TLS (Yes/No)**: Whether the destination in the child's `stream.conf` has `:SSL` at the end.
+- **Child TLS Verification (yes/no)**: Value of the child's `stream.conf` `ssl skip certificate verification` parameter (default is no).
-| Master TLS enabled|Master port SSL|Slave TLS|Slave SSL Ver.|Behavior|
+| Parent TLS enabled|Parent port SSL|Child TLS|Child SSL Ver.|Behavior|
|:----------------:|:-------------:|:-------:|:------------:|:-------|
-| No|-|No|no|Legacy behavior. The master-slave stream is unencrypted.|
-| Yes|force|No|no|The master rejects the slave connection.|
-| Yes|-/optional|No|no|The master-slave stream is unencrypted (expected situation for legacy slaves and newer masters)|
-| Yes|-/force/optional|Yes|no|The master-slave stream is encrypted, provided that the master has a valid TLS/SSL certificate. Otherwise, the slave refuses to connect.|
-| Yes|-/force/optional|Yes|yes|The master-slave stream is encrypted.|
+| No|-|No|no|Legacy behavior. The parent-child stream is unencrypted.|
+| Yes|force|No|no|The parent rejects the child connection.|
+| Yes|-/optional|No|no|The parent-child stream is unencrypted (expected situation for legacy child nodes and newer parent nodes)|
+| Yes|-/force/optional|Yes|no|The parent-child stream is encrypted, provided that the parent has a valid TLS/SSL certificate. Otherwise, the child refuses to connect.|
+| Yes|-/force/optional|Yes|yes|The parent-child stream is encrypted.|
## Viewing remote host dashboards, using mirrored databases
@@ -314,9 +331,7 @@ Auto-scaling is probably the most trendy service deployment strategy these days.
Auto-scaling detects the need for additional resources and boots VMs on demand, based on a template. Soon after they start running the applications, a load balancer starts distributing traffic to them, allowing the service to grow horizontally to the scale needed to handle the load. When demands falls, auto-scaling starts shutting down VMs that are no longer needed.
-<p align="center">
-<img src="https://cloud.githubusercontent.com/assets/2662304/23627426/65a9074a-02b9-11e7-9664-cd8f258a00af.png"/>
-</p>
+![Monitoring ephemeral nodes with Netdata](https://cloud.githubusercontent.com/assets/2662304/23627426/65a9074a-02b9-11e7-9664-cd8f258a00af.png)
What a fantastic feature for controlling infrastructure costs! Pay only for what you need for the time you need it!
@@ -339,84 +354,83 @@ Following the Netdata way of monitoring, we wanted:
All monitoring solutions, including Netdata, work like this:
-1. `collect metrics`, from the system and the running applications
-2. `store metrics`, in a time-series database
-3. `examine metrics` periodically, for triggering alarms and sending alarm notifications
-4. `visualize metrics`, so that users can see what exactly is happening
+1. Collect metrics from the system and the running applications
+2. Store metrics in a time-series database
+3. Examine metrics periodically, for triggering alarms and sending alarm notifications
+4. Visualize metrics so that users can see what exactly is happening
Netdata used to be self-contained, so that all these functions were handled entirely by each server. The changes we made, allow each Netdata to be configured independently for each function. So, each Netdata can now act as:
-- a `self contained system`, much like it used to be.
-- a `data collector`, that collects metrics from a host and pushes them to another Netdata (with or without a local database and alarms).
-- a `proxy`, that receives metrics from other hosts and pushes them immediately to other Netdata servers. Netdata proxies can also be `store and forward proxies` meaning that they are able to maintain a local database for all metrics passing through them (with or without alarms).
-- a `time-series database` node, where data are kept, alarms are run and queries are served to visualise the metrics.
+- A self-contained system, much like it used to be.
+- A data collector that collects metrics from a host and pushes them to another Netdata (with or without a local database and alarms).
+- A proxy, which receives metrics from other hosts and pushes them immediately to other Netdata servers. Netdata proxies can also be `store and forward proxies` meaning that they are able to maintain a local database for all metrics passing through them (with or without alarms).
+- A time-series database node, where data are kept, alarms are run and queries are served to visualise the metrics.
### Configuring an auto-scaling setup
-<p align="center">
-<img src="https://cloud.githubusercontent.com/assets/2662304/23627468/96daf7ba-02b9-11e7-95ac-1f767dd8dab8.png"/>
-</p>
+![A diagram of an auto-scaling setup with Netdata](https://user-images.githubusercontent.com/1153921/84290043-0c1c1600-aaf8-11ea-9757-dd8dd8a8ec6c.png)
-You need a Netdata `master`. This node should not be ephemeral. It will be the node where all ephemeral nodes (let's call them `slaves`) will be sending their metrics.
+You need a Netdata parent. This node should not be ephemeral. It will be the node where all ephemeral child
+nodes will send their metrics.
-The master will need to authorize the slaves for accepting their metrics. This is done with an API key.
+The parent will need to authorize child nodes to receive their metrics. This is done with an API key.
#### API keys
-API keys are just random GUIDs. Use the Linux command `uuidgen` to generate one. You can use the same API key for all your `slaves`, or you can configure one API for each of them. This is entirely your decision.
+API keys are just random GUIDs. Use the Linux command `uuidgen` to generate one. You can use the same API key for all your child nodes, or you can configure one API for each of them. This is entirely your decision.
We suggest to use the same API key for each ephemeral node template you have, so that all replicas of the same ephemeral node will have exactly the same configuration.
I will use this API_KEY: `11111111-2222-3333-4444-555555555555`. Replace it with your own.
-#### Configuring the `master`
+#### Configuring the parent
-On the master, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
+On the parent, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
```bash
[11111111-2222-3333-4444-555555555555]
# enable/disable this API key
enabled = yes
- # one hour of data for each of the slaves
+ # one hour of data for each of the child nodes
default history = 3600
- # do not save slave metrics on disk
+ # do not save child metrics on disk
default memory = ram
- # alarms checks, only while the slave is connected
+ # alarms checks, only while the child is connected
health enabled by default = auto
```
-_`stream.conf` on master, to enable receiving metrics from slaves using the API key._
+_`stream.conf` on the parent, to enable receiving metrics from its child nodes using the API key._
If you used many API keys, you can add one such section for each API key.
-When done, restart Netdata on the `master` node. It is now ready to receive metrics.
+When done, restart Netdata on the parent node. It is now ready to receive metrics.
-Note that `health enabled by default = auto` will still trigger `last_collected` alarms, if a connected slave does not exit gracefully. If the `netdata` process running on the slave is
-stopped, it will close the connection to the master, ensuring that no `last_collected` alarms are triggered. For example, a proper container restart would first terminate
-the `netdata` process, but a system power issue would leave the connection open on the master side. In the second case, you will still receive alarms.
+Note that `health enabled by default = auto` will still trigger `last_collected` alarms, if a connected child does not exit gracefully. If the `netdata` process running on the child is
+stopped, it will close the connection to the parent, ensuring that no `last_collected` alarms are triggered. For example, a proper container restart would first terminate
+the `netdata` process, but a system power issue would leave the connection open on the parent side. In the second case, you will still receive alarms.
-#### Configuring the `slaves`
+#### Configuring the child nodes
-On each of the slaves, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
+On each of the child nodes, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
```bash
[stream]
# stream metrics to another Netdata
enabled = yes
- # the IP and PORT of the master
+ # the IP and PORT of the parent
destination = 10.11.12.13:19999
# the API key to use
api key = 11111111-2222-3333-4444-555555555555
```
-_`stream.conf` on slaves, to enable pushing metrics to master at `10.11.12.13:19999`._
+_`stream.conf` on child nodes, to enable pushing metrics to their parent at `10.11.12.13:19999`._
-Using just the above configuration, the `slaves` will be pushing their metrics to the `master` Netdata, but they will still maintain a local database of the metrics and run health checks. To disable them, edit `/etc/netdata/netdata.conf` and set:
+Using just the above configuration, the child nodes will be pushing their metrics to the parent Netdata, but they will still maintain a local database of the metrics and run health checks. To disable them, edit `/etc/netdata/netdata.conf` and set:
```bash
[global]
@@ -428,9 +442,9 @@ Using just the above configuration, the `slaves` will be pushing their metrics t
enabled = no
```
-_`netdata.conf` configuration on slaves, to disable the local database and health checks._
+_`netdata.conf` configuration on child nodes, to disable the local database and health checks._
-Keep in mind that setting `memory mode = none` will also force `[health].enabled = no` (health checks require access to a local database). But you can keep the database and disable health checks if you need to. You are however sending all the metrics to the master server, which can handle the health checking (`[health].enabled = yes`)
+Keep in mind that setting `memory mode = none` will also force `[health].enabled = no` (health checks require access to a local database). But you can keep the database and disable health checks if you need to. You are however sending all the metrics to the parent node, which can handle the health checking (`[health].enabled = yes`)
#### Netdata unique id
@@ -440,15 +454,15 @@ The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random
#### Troubleshooting metrics streaming
-Both the sender and the receiver of metrics log information at `/var/log/netdata/error.log`.
+Both parent and child nodes log information at `/var/log/netdata/error.log`.
-On both master and slave do this:
+Run the following on both the parent and child nodes:
```
tail -f /var/log/netdata/error.log | grep STREAM
```
-If the slave manages to connect to the master you will see something like (on the master):
+If the child manages to connect to the parent you will see something like (on the parent):
```
2017-03-09 09:38:52: netdata: INFO : STREAM [receive from [10.11.12.86]:38564]: new client connection.
@@ -458,7 +472,7 @@ If the slave manages to connect to the master you will see something like (on th
2017-03-09 09:38:52: netdata: INFO : STREAM xxx [receive from [10.11.12.86]:38564]: receiving metrics...
```
-and something like this on the slave:
+and something like this on the child:
```
2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: connecting...
@@ -469,7 +483,8 @@ and something like this on the slave:
### Archiving to a time-series database
-The `master` Netdata node can also archive metrics, for all `slaves`, to a time-series database. At the time of this writing, Netdata supports:
+The parent Netdata node can also archive metrics, for all its child nodes, to a time-series database. At the time of
+this writing, Netdata supports:
- graphite
- opentsdb
@@ -477,13 +492,12 @@ The `master` Netdata node can also archive metrics, for all `slaves`, to a time-
- json document DBs
- all the compatibles to the above (e.g. kairosdb, influxdb, etc)
-Check the Netdata [backends documentation](../backends) for configuring this.
+Check the Netdata [exporting documentation](/docs/export/external-databases.md) for configuring this.
This is how such a solution will work:
-<p align="center">
-<img src="https://cloud.githubusercontent.com/assets/2662304/23627295/e3569adc-02b8-11e7-9d55-4014bf98c1b3.png"/>
-</p>
+![Diagram showing an example configuration for archiving to a time-series
+database](https://user-images.githubusercontent.com/1153921/84291308-c2ccc600-aaf9-11ea-98a9-89ccbf3a62dd.png)
### An advanced setup
@@ -495,7 +509,7 @@ This means a setup like the following is also possible:
<img src="https://cloud.githubusercontent.com/assets/2662304/23629551/bb1fd9c2-02c0-11e7-90f5-cab5a3ed4c53.png"/>
</p>
-## proxies
+## Proxies
A proxy is a Netdata instance that is receiving metrics from a Netdata, and streams them to another Netdata.
@@ -504,11 +518,107 @@ When they maintain a database, they can also run health checks (alarms and notif
for the remote host that is streaming the metrics.
To configure a proxy, configure it as a receiving and a sending Netdata at the same time,
-using [stream.conf](stream.conf).
+using `stream.conf`.
The sending side of a Netdata proxy, connects and disconnects to the final destination of the
metrics, following the same pattern of the receiving side.
For a practical example see [Monitoring ephemeral nodes](#monitoring-ephemeral-nodes).
+## Troubleshooting streaming connections
+
+This section describes the most common issues you might encounter when connecting parent and child nodes.
+
+### Slow connections between parent and child
+
+When you have a slow connection between parent and child, Netdata raises a few different errors. Most of the
+errors will appear in the child's `error.log`.
+
+```bash
+netdata ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM CHILD HOSTNAME [send to PARENT IP:PARENT PORT]: too many data pending - buffer is X bytes long,
+Y unsent - we have sent Z bytes in total, W on this connection. Closing connection to flush the data.
+```
+
+On the parent side, you may see various error messages, most commonly the following:
+
+```
+netdata ERROR : STREAM_PARENT[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : read failed: end of file
+```
+
+Another common problem in slow connections is the CHILD sending a partial message to the parent. In this case,
+the parent will write the following in its `error.log`:
+
+```
+ERROR : STREAM_RECEIVER[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : sent command 'B' which is not known by netdata, for host 'HOSTNAME'. Disabling it.
+```
+
+In this example, `B` was part of a `BEGIN` message that was cut due to connection problems.
+
+Slow connections can also cause problems when the parent misses a message and then receives a command related to the
+missed message. For example, a parent might miss a message containing the child's charts, and then doesn't know
+what to do with the `SET` message that follows. When that happens, the parent will show a message like this:
+
+```
+ERROR : STREAM_RECEIVER[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : requested a SET on chart 'CHART NAME' of host 'HOSTNAME', without a dimension. Disabling it.
+```
+
+### Child cannot connect to parent
+
+When the child can't connect to a parent for any reason (misconfiguration, networking, firewalls, parent
+down), you will see the following in the child's `error.log`.
+
+```
+ERROR : STREAM_SENDER[HOSTNAME] : Failed to connect to 'PARENT IP', port 'PARENT PORT' (errno 113, No route to host)
+```
+
+### 'Is this a Netdata?'
+
+This question can appear when Netdata starts the stream and receives an unexpected response. This error can appear when
+the parent is using SSL and the child tries to connect using plain text. You will also see this message when
+Netdata connects to another server that isn't Netdata. The complete error message will look like this:
+
+```
+ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM child HOSTNAME [send to PARENT HOSTNAME:PARENT PORT]: server is not replying properly (is it a netdata?).
+```
+
+### Stream charts wrong
+
+Chart data needs to be consistent between child and parent nodes. If there are differences between chart data on
+a parent and a child, such as gaps in metrics collection, it most often means your child's `memory mode`
+does not match the parent's. To learn more about the different ways Netdata can store metrics, and thus keep chart
+data consistent, read our [memory mode documentation](/database/README.md).
+
+### Forbidding access
+
+You may see errors about "forbidding access" for a number of reasons. It could be because of a slow connection between
+the parent and child nodes, but it could also be due to other failures. Look in your parent's `error.log` for errors
+that look like this:
+
+```
+STREAM [receive from [child HOSTNAME]:child IP]: `MESSAGE`. Forbidding access."
+```
+
+`MESSAGE` will have one of the following patterns:
+
+- `request without KEY` : The message received is incomplete and the KEY value can be API, hostname, machine GUID.
+- `API key 'VALUE' is not valid GUID`: The UUID received from child does not have the format defined in [RFC 4122]
+ (https://tools.ietf.org/html/rfc4122)
+- `machine GUID 'VALUE' is not GUID.`: This error with machine GUID is like the previous one.
+- `API key 'VALUE' is not allowed`: This stream has a wrong API key.
+- `API key 'VALUE' is not permitted from this IP`: The IP is not allowed to use STREAM with this parent.
+- `machine GUID 'VALUE' is not allowed.`: The GUID that is trying to send stream is not allowed.
+- `Machine GUID 'VALUE' is not permitted from this IP. `: The IP does not match the pattern or IP allowed to connect
+ to use stream.
+
+### Netdata could not create a stream
+
+The connection between parent and child is a stream. When the parent can't convert the initial connection into
+a stream, it will write the following message inside `error.log`:
+
+```
+file descriptor given is not a valid stream
+```
+
+After logging this error, Netdata will close the stream.
+
[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fstreaming%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
diff --git a/streaming/receiver.c b/streaming/receiver.c
new file mode 100644
index 000000000..3ea15806d
--- /dev/null
+++ b/streaming/receiver.c
@@ -0,0 +1,499 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+
+extern struct config stream_config;
+
+void destroy_receiver_state(struct receiver_state *rpt) {
+ freez(rpt->key);
+ freez(rpt->hostname);
+ freez(rpt->registry_hostname);
+ freez(rpt->machine_guid);
+ freez(rpt->os);
+ freez(rpt->timezone);
+ freez(rpt->tags);
+ freez(rpt->client_ip);
+ freez(rpt->client_port);
+ freez(rpt->program_name);
+ freez(rpt->program_version);
+#ifdef ENABLE_HTTPS
+ if(rpt->ssl.conn){
+ SSL_free(rpt->ssl.conn);
+ }
+#endif
+ freez(rpt);
+}
+
+static void rrdpush_receiver_thread_cleanup(void *ptr) {
+ static __thread int executed = 0;
+ if(!executed) {
+ executed = 1;
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch
+ // the host pointer as it is unpredicable when the RRDHOST is deleted. Do the cleanup from rrdhost_free().
+ if (netdata_exit && rpt->host) {
+ rpt->exited = 1;
+ return;
+ }
+
+ // Make sure that we detach this thread and don't kill a freshly arriving receiver
+ if (!netdata_exit && rpt->host) {
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == rpt)
+ rpt->host->receiver = NULL;
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ }
+
+ info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+ destroy_receiver_state(rpt);
+ }
+}
+
+#include "../collectors/plugins.d/pluginsd_parser.h"
+
+PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+{
+ UNUSED(plugins_action);
+ char *remote_time_txt = words[1];
+ time_t remote_time = 0;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+ struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
+ if (cd->version < VERSION_GAP_FILLING ) {
+ error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd,
+ cd->version);
+ return PARSER_RC_OK; // Ignore error and continue stream
+ }
+ if (remote_time_txt && *remote_time_txt) {
+ remote_time = str2ull(remote_time_txt);
+ time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host);
+ time_t gap = 0;
+ if (prev == 0)
+ info("STREAM %s from %s: Initial connection (no gap to check), remote=%ld local=%ld slew=%ld",
+ host->hostname, cd->cmd, remote_time, now, now-remote_time);
+ else {
+ gap = now - prev;
+ info("STREAM %s from %s: Checking for gaps... remote=%ld local=%ld..%ld slew=%ld %ld-sec gap",
+ host->hostname, cd->cmd, remote_time, prev, now, remote_time - now, gap);
+ }
+ char message[128];
+ sprintf(message,"REPLICATE %ld %ld\n", remote_time - gap, remote_time);
+ int ret;
+#ifdef ENABLE_HTTPS
+ SSL *conn = host->stream_ssl.conn ;
+ if(conn && !host->stream_ssl.flags) {
+ ret = SSL_write(conn, message, strlen(message));
+ } else {
+ ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
+ }
+#else
+ ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
+#endif
+ if (ret != (int)strlen(message))
+ error("Failed to send initial timestamp - gaps may appear in charts");
+ return PARSER_RC_OK;
+ }
+ return PARSER_RC_ERROR;
+}
+
+#define CLAIMED_ID_MIN_WORDS 3
+PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+{
+ UNUSED(plugins_action);
+
+ int i;
+ uuid_t uuid;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+
+ for (i = 0; words[i]; i++) ;
+ if (i != CLAIMED_ID_MIN_WORDS) {
+ error("Command CLAIMED_ID came malformed %d parameters are expected but %d received", CLAIMED_ID_MIN_WORDS - 1, i - 1);
+ return PARSER_RC_ERROR;
+ }
+
+ // We don't need the parsed UUID
+ // just do it to check the format
+ if(uuid_parse(words[1], uuid)) {
+ error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]);
+ return PARSER_RC_ERROR;
+ }
+ if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) {
+ error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(words[1], host->machine_guid)) {
+ error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewehere else
+ }
+
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ freez(host->aclk_state.claimed_id);
+ host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
+ rrdhost_aclk_state_unlock(host);
+
+ rrdpush_claimed_id(host);
+
+ return PARSER_RC_OK;
+}
+
+/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
+ */
+static int receiver_read(struct receiver_state *r, FILE *fp) {
+#ifdef ENABLE_HTTPS
+ if (r->ssl.conn && !r->ssl.flags) {
+ ERR_clear_error();
+ int desired = sizeof(r->read_buffer) - r->read_len - 1;
+ int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired);
+ if (ret > 0 ) {
+ r->read_len += ret;
+ return 0;
+ }
+ // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
+ u_long err;
+ char buf[256];
+ while ((err = ERR_get_error()) != 0) {
+ ERR_error_string_n(err, buf, sizeof(buf));
+ error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf);
+ }
+ return 1;
+ }
+#endif
+ if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
+ return 1;
+ r->read_len = strlen(r->read_buffer);
+ return 0;
+}
+
+/* Produce a full line if one exists, statefully return where we start next time.
+ * When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
+ */
+static char *receiver_next_line(struct receiver_state *r, int *pos) {
+ int start = *pos, scan = *pos;
+ if (scan >= r->read_len) {
+ r->read_len = 0;
+ return NULL;
+ }
+ while (scan < r->read_len && r->read_buffer[scan] != '\n')
+ scan++;
+ if (scan < r->read_len && r->read_buffer[scan] == '\n') {
+ *pos = scan+1;
+ r->read_buffer[scan] = 0;
+ return &r->read_buffer[start];
+ }
+ memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start);
+ r->read_len -= start;
+ return NULL;
+}
+
+
+size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
+ size_t result;
+ PARSER_USER_OBJECT *user = callocz(1, sizeof(*user));
+ user->enabled = cd->enabled;
+ user->host = rpt->host;
+ user->opaque = rpt;
+ user->cd = cd;
+ user->trust_durations = 0;
+
+ PARSER *parser = parser_init(rpt->host, user, fp, PARSER_INPUT_SPLIT);
+ parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
+ parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
+
+ if (unlikely(!parser)) {
+ error("Failed to initialize parser");
+ cd->serial_failures++;
+ freez(user);
+ return 0;
+ }
+
+ parser->plugins_action->begin_action = &pluginsd_begin_action;
+ parser->plugins_action->flush_action = &pluginsd_flush_action;
+ parser->plugins_action->end_action = &pluginsd_end_action;
+ parser->plugins_action->disable_action = &pluginsd_disable_action;
+ parser->plugins_action->variable_action = &pluginsd_variable_action;
+ parser->plugins_action->dimension_action = &pluginsd_dimension_action;
+ parser->plugins_action->label_action = &pluginsd_label_action;
+ parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
+ parser->plugins_action->chart_action = &pluginsd_chart_action;
+ parser->plugins_action->set_action = &pluginsd_set_action;
+
+ user->parser = parser;
+
+ do {
+ if (receiver_read(rpt, fp))
+ break;
+ int pos = 0;
+ char *line;
+ while ((line = receiver_next_line(rpt, &pos))) {
+ if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line)))
+ goto done;
+ }
+ rpt->last_msg_t = now_realtime_sec();
+ }
+ while(!netdata_exit);
+done:
+ result= user->count;
+ freez(user);
+ parser_destroy(parser);
+ return result;
+}
+
+
+static int rrdpush_receive(struct receiver_state *rpt)
+{
+ int history = default_rrd_history_entries;
+ RRD_MEMORY_MODE mode = default_rrd_memory_mode;
+ int health_enabled = default_health_enabled;
+ int rrdpush_enabled = default_rrdpush_enabled;
+ char *rrdpush_destination = default_rrdpush_destination;
+ char *rrdpush_api_key = default_rrdpush_api_key;
+ char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
+ time_t alarms_delay = 60;
+
+ rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every);
+ if(rpt->update_every < 0) rpt->update_every = 1;
+
+ history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history);
+ history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history);
+ if(history < 5) history = 5;
+
+ mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode)));
+ mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode)));
+
+ health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled);
+ health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled);
+
+ alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay);
+ alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay);
+
+ rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled);
+ rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled);
+
+ rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination);
+ rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination);
+
+ rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key);
+ rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key);
+
+ rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
+ rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+
+ (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
+
+ if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT");
+ error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
+ close(rpt->fd);
+ return 1;
+ }
+
+ if (rpt->host==NULL) {
+
+ rpt->host = rrdhost_find_or_create(
+ rpt->hostname
+ , rpt->registry_hostname
+ , rpt->machine_guid
+ , rpt->os
+ , rpt->timezone
+ , rpt->tags
+ , rpt->program_name
+ , rpt->program_version
+ , rpt->update_every
+ , history
+ , mode
+ , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
+ , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
+ , rrdpush_destination
+ , rrdpush_api_key
+ , rrdpush_send_charts_matching
+ , rpt->system_info
+ );
+
+ if(!rpt->host) {
+ close(rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST");
+ error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port);
+ return 1;
+ }
+
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == NULL)
+ rpt->host->receiver = rpt;
+ else {
+ error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid);
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ close(rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION");
+ return 1;
+ }
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ }
+
+ int ssl = 0;
+#ifdef ENABLE_HTTPS
+ if (rpt->ssl.conn != NULL)
+ ssl = 1;
+#endif
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
+ , rpt->hostname
+ , rpt->client_ip
+ , rpt->client_port
+ , rpt->host->hostname
+ , rpt->host->machine_guid
+ , rpt->host->rrd_update_every
+ , rpt->host->rrd_history_entries
+ , rrd_memory_mode_name(rpt->host->rrd_memory_mode)
+ , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
+ , ssl ? " SSL," : ""
+ , rpt->host->tags?rpt->host->tags:""
+ );
+#endif // NETDATA_INTERNAL_CHECKS
+
+
+ struct plugind cd = {
+ .enabled = 1,
+ .update_every = default_rrd_update_every,
+ .pid = 0,
+ .serial_failures = 0,
+ .successful_collections = 0,
+ .obsolete = 0,
+ .started_t = now_realtime_sec(),
+ .next = NULL,
+ .version = 0,
+ };
+
+ // put the client IP and port into the buffers used by plugins.d
+ snprintfz(cd.id, CONFIG_MAX_NAME, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.filename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+
+ info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ char initial_response[HTTP_HEADER_SIZE];
+ if (rpt->stream_version > 1) {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+ sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
+ } else if (rpt->stream_version == 1) {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
+ } else {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT);
+ }
+ debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
+ #ifdef ENABLE_HTTPS
+ rpt->host->stream_ssl.conn = rpt->ssl.conn;
+ rpt->host->stream_ssl.flags = rpt->ssl.flags;
+ if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+#else
+ if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
+#endif
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ close(rpt->fd);
+ return 0;
+ }
+
+ // remove the non-blocking flag from the socket
+ if(sock_delnonblock(rpt->fd) < 0)
+ error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+
+ // convert the socket to a FILE *
+ FILE *fp = fdopen(rpt->fd, "r");
+ if(!fp) {
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR");
+ error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ close(rpt->fd);
+ return 0;
+ }
+
+ rrdhost_wrlock(rpt->host);
+/* if(rpt->host->connected_senders > 0) {
+ rrdhost_unlock(rpt->host);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ fclose(fp);
+ return 0;
+ }
+*/
+
+// rpt->host->connected_senders++;
+ rpt->host->labels.labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM;
+
+ if(health_enabled != CONFIG_BOOLEAN_NO) {
+ if(alarms_delay > 0) {
+ rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
+ info("Postponing health checks for %ld seconds, on host '%s', because it was just connected."
+ , alarms_delay
+ , rpt->host->hostname
+ );
+ }
+ }
+ rrdhost_unlock(rpt->host);
+
+ // call the plugins.d processor to receive the metrics
+ info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED");
+
+ cd.version = rpt->stream_version;
+
+#ifdef ENABLE_ACLK
+ // in case we have cloud connection we inform cloud
+ // new slave connected
+ if (netdata_cloud_setting)
+ aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT);
+#endif
+
+ size_t count = streaming_parser(rpt, &cd, fp);
+
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
+ "DISCONNECTED");
+ error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
+ rpt->client_port, count);
+
+#ifdef ENABLE_ACLK
+ // in case we have cloud connection we inform cloud
+ // new slave connected
+ if (netdata_cloud_setting)
+ aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_DISCONNECT);
+#endif
+
+ // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
+ if (!netdata_exit && rpt->host) {
+ rrd_rdlock();
+ rrdhost_wrlock(rpt->host);
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == rpt) {
+ rpt->host->senders_disconnected_time = now_realtime_sec();
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
+ if(health_enabled == CONFIG_BOOLEAN_AUTO)
+ rpt->host->health_enabled = 0;
+ }
+ rrdhost_unlock(rpt->host);
+ if (rpt->host->receiver == rpt) {
+ rrdpush_sender_thread_stop(rpt->host);
+ }
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ rrd_unlock();
+ }
+
+ // cleanup
+ fclose(fp);
+ return (int)count;
+}
+
+void *rrdpush_receiver_thread(void *ptr) {
+ netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
+
+ struct receiver_state *rpt = (struct receiver_state *)ptr;
+ info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+
+ rrdpush_receive(rpt);
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index e6d2aca0a..f54fc609e 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
+#include "../parser/parser.h"
/*
* rrdpush
@@ -25,15 +26,9 @@
*
*/
-#define START_STREAMING_PROMPT "Hit me baby, push them over..."
-
-typedef enum {
- RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
- RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
-} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
-
-static struct config stream_config = {
- .sections = NULL,
+struct config stream_config = {
+ .first_section = NULL,
+ .last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = {
.avl_tree = {
@@ -57,12 +52,12 @@ char *netdata_ssl_ca_file = NULL;
static void load_stream_conf() {
errno = 0;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
- if(!appconfig_load(&stream_config, filename, 0)) {
+ if(!appconfig_load(&stream_config, filename, 0, NULL)) {
info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
- if(!appconfig_load(&stream_config, filename, 0))
+ if(!appconfig_load(&stream_config, filename, 0, NULL))
info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
}
freez(filename);
@@ -79,6 +74,7 @@ int rrdpush_init() {
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
+
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
@@ -96,6 +92,7 @@ int rrdpush_init() {
}
char *invalid_certificate = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", "no");
+
if ( !strcmp(invalid_certificate,"yes")){
if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
info("Netdata is configured to accept invalid SSL certificate.");
@@ -110,8 +107,6 @@ int rrdpush_init() {
return default_rrdpush_enabled;
}
-#define CONNECTED_TO_SIZE 100
-
// data collection happens from multiple threads
// each of these threads calls rrdset_done()
// which in turn calls rrdset_done_push()
@@ -126,8 +121,6 @@ int rrdpush_init() {
// this is for the first iterations of each chart
unsigned int remote_clock_resync_iterations = 60;
-#define rrdpush_buffer_lock(host) netdata_mutex_lock(&((host)->rrdpush_sender_buffer_mutex))
-#define rrdpush_buffer_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_sender_buffer_mutex))
static inline int should_send_chart_matching(RRDSET *st) {
if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) {
@@ -151,6 +144,25 @@ static inline int should_send_chart_matching(RRDSET *st) {
return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND));
}
+int configured_as_parent() {
+ struct section *section = NULL;
+ int is_parent = 0;
+
+ appconfig_wrlock(&stream_config);
+ for (section = stream_config.first_section; section; section = section->next) {
+ uuid_t uuid;
+
+ if (uuid_parse(section->name, uuid) != -1 &&
+ appconfig_get_boolean_by_section(section, "enabled", 0)) {
+ is_parent = 1;
+ break;
+ }
+ }
+ appconfig_unlock(&stream_config);
+
+ return is_parent;
+}
+
// checks if the current chart definition has been sent
static inline int need_to_send_chart_definition(RRDSET *st) {
rrdset_check_rdlock(st);
@@ -171,7 +183,8 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
return 0;
}
-// sends the current chart definition
+// Send the current chart definition.
+// Assumes that collector thread has already called sender_start for mutex / buffer state.
static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
RRDHOST *host = st->rrdhost;
@@ -190,11 +203,9 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
}
}
- // info("CHART '%s' '%s'", st->id, name);
-
// send the chart
buffer_sprintf(
- host->rrdpush_sender_buffer
+ host->sender->build
, "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, st->id
, name
@@ -217,7 +228,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
buffer_sprintf(
- host->rrdpush_sender_buffer
+ host->sender->build
, "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
, rd->id
, rd->name
@@ -238,7 +249,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
calculated_number *value = (calculated_number *) rs->value;
buffer_sprintf(
- host->rrdpush_sender_buffer
+ host->sender->build
, "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n"
, rs->variable
, *value
@@ -250,25 +261,29 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
}
// sends the current chart dimensions
-static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) {
+static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) {
RRDHOST *host = st->rrdhost;
- buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
+ buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
+ if (s->version >= VERSION_GAP_FILLING)
+ buffer_sprintf(host->sender->build, " %ld\n", st->last_collected_time.tv_sec);
+ else
+ buffer_strcat(host->sender->build, "\n");
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if(rd->updated && rd->exposed)
- buffer_sprintf(host->rrdpush_sender_buffer
+ buffer_sprintf(host->sender->build
, "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n"
, rd->id
, rd->collected_value
);
}
-
- buffer_strcat(host->rrdpush_sender_buffer, "END\n");
+ buffer_strcat(host->sender->build, "END\n");
}
static void rrdpush_sender_thread_spawn(RRDHOST *host);
+// Called from the internal collectors to mark a chart obsolete.
void rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
@@ -276,9 +291,9 @@ void rrdset_push_chart_definition_now(RRDSET *st) {
return;
rrdset_rdlock(st);
- rrdpush_buffer_lock(host);
+ sender_start(host->sender);
rrdpush_send_chart_definition_nolock(st);
- rrdpush_buffer_unlock(host);
+ sender_commit(host->sender);
rrdset_unlock(st);
}
@@ -288,18 +303,14 @@ void rrdset_done_push(RRDSET *st) {
RRDHOST *host = st->rrdhost;
- rrdpush_buffer_lock(host);
-
if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
rrdpush_sender_thread_spawn(host);
- if(unlikely(!host->rrdpush_sender_buffer || !host->rrdpush_sender_connected)) {
+ // Handle non-connected case
+ if(unlikely(!host->rrdpush_sender_connected)) {
if(unlikely(!host->rrdpush_sender_error_shown))
error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname);
-
host->rrdpush_sender_error_shown = 1;
-
- rrdpush_buffer_unlock(host);
return;
}
else if(unlikely(host->rrdpush_sender_error_shown)) {
@@ -307,105 +318,82 @@ void rrdset_done_push(RRDSET *st) {
host->rrdpush_sender_error_shown = 0;
}
+ sender_start(host->sender);
+
if(need_to_send_chart_definition(st))
rrdpush_send_chart_definition_nolock(st);
- rrdpush_send_chart_metrics_nolock(st);
+ rrdpush_send_chart_metrics_nolock(st, host->sender);
// signal the sender there are more data
if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
- rrdpush_buffer_unlock(host);
+ sender_commit(host->sender);
}
-// ----------------------------------------------------------------------------
-// rrdpush sender thread
-
-static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) {
- calculated_number *value = (calculated_number *)rv->value;
+// labels
+void rrdpush_send_labels(RRDHOST *host) {
+ if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM))
+ return;
- buffer_sprintf(
- host->rrdpush_sender_buffer
- , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n"
- , rv->name
- , *value
- );
+ sender_start(host->sender);
+ rrdhost_rdlock(host);
+ netdata_rwlock_rdlock(&host->labels.labels_rwlock);
- debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value);
-}
+ struct label *label_i = host->labels.head;
+ while(label_i) {
+ buffer_sprintf(host->sender->build
+ , "LABEL \"%s\" = %d %s\n"
+ , label_i->key
+ , (int)label_i->label_source
+ , label_i->value);
-void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
- if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) {
- rrdpush_buffer_lock(host);
- rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
- rrdpush_buffer_unlock(host);
+ label_i = label_i->next;
}
-}
-
-static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) {
- RRDVAR *rv = (RRDVAR *)rrdvar_ptr;
- RRDHOST *host = (RRDHOST *)host_ptr;
- if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) {
- rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
+ buffer_sprintf(host->sender->build
+ , "OVERWRITE %s\n", "labels");
- // return 1, so that the traversal will return the number of variables sent
- return 1;
- }
-
- // returning a negative number will break the traversal
- return 0;
-}
+ netdata_rwlock_unlock(&host->labels.labels_rwlock);
+ rrdhost_unlock(host);
+ sender_commit(host->sender);
-static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
- int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host);
- (void)ret;
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
- debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
}
-// resets all the chart, so that their definitions
-// will be resent to the central netdata
-static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
- rrdhost_rdlock(host);
-
- RRDSET *st;
- rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+void rrdpush_claimed_id(RRDHOST *host)
+{
+ if(unlikely(!host->rrdpush_send_enabled || !host->rrdpush_sender_connected))
+ return;
+
+ if(host->sender->version < STREAM_VERSION_CLAIM)
+ return;
- st->upstream_resync_time = 0;
+ sender_start(host->sender);
+ rrdhost_aclk_state_lock(host);
- rrdset_rdlock(st);
+ buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
- RRDDIM *rd;
- rrddim_foreach_read(rd, st)
- rd->exposed = 0;
+ rrdhost_aclk_state_unlock(host);
+ sender_commit(host->sender);
- rrdset_unlock(st);
- }
-
- rrdhost_unlock(host);
+ // signal the sender there are more data
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
}
-static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
- rrdpush_buffer_lock(host);
-
- if(buffer_strlen(host->rrdpush_sender_buffer))
- error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, buffer_strlen(host->rrdpush_sender_buffer));
-
- buffer_flush(host->rrdpush_sender_buffer);
-
- rrdpush_sender_thread_reset_all_charts(host);
- rrdpush_sender_thread_send_custom_host_variables(host);
-
- rrdpush_buffer_unlock(host);
-}
+// ----------------------------------------------------------------------------
+// rrdpush sender thread
+// Either the receiver lost the connection or the host is being destroyed.
+// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
void rrdpush_sender_thread_stop(RRDHOST *host) {
- rrdpush_buffer_lock(host);
- rrdhost_wrlock(host);
+ netdata_mutex_lock(&host->sender->mutex);
netdata_thread_t thr = 0;
if(host->rrdpush_sender_spawn) {
@@ -422,8 +410,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
netdata_thread_cancel(host->rrdpush_sender_thread);
}
- rrdhost_unlock(host);
- rrdpush_buffer_unlock(host);
+ netdata_mutex_unlock(&host->sender->mutex);
if(thr != 0) {
info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname);
@@ -433,828 +420,28 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
}
}
-static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
- host->rrdpush_sender_connected = 0;
-
- if(host->rrdpush_sender_socket != -1) {
- close(host->rrdpush_sender_socket);
- host->rrdpush_sender_socket = -1;
- }
-}
-
-//called from client side
-static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_port, int timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) {
- struct timeval tv = {
- .tv_sec = timeout,
- .tv_usec = 0
- };
-
- // make sure the socket is closed
- rrdpush_sender_thread_close_socket(host);
-
- debug(D_STREAM, "STREAM: Attempting to connect...");
- info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination);
-
- host->rrdpush_sender_socket = connect_to_one_of(
- host->rrdpush_send_destination
- , default_port
- , &tv
- , reconnects_counter
- , connected_to
- , connected_to_size
- );
-
- if(unlikely(host->rrdpush_sender_socket == -1)) {
- error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination);
- return 0;
- }
-
- info("STREAM %s [send to %s]: initializing communication...", host->hostname, connected_to);
-
-#ifdef ENABLE_HTTPS
- if( netdata_client_ctx ){
- host->ssl.flags = NETDATA_SSL_START;
- if (!host->ssl.conn){
- host->ssl.conn = SSL_new(netdata_client_ctx);
- if(!host->ssl.conn){
- error("Failed to allocate SSL structure.");
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
- }
- else{
- SSL_clear(host->ssl.conn);
- }
-
- if (host->ssl.conn)
- {
- if (SSL_set_fd(host->ssl.conn, host->rrdpush_sender_socket) != 1) {
- error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- } else{
- host->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
- }
- }
- }
- else {
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
-#endif
-
- #define HTTP_HEADER_SIZE 8192
- char http[HTTP_HEADER_SIZE + 1];
- int eol = snprintfz(http, HTTP_HEADER_SIZE,
- "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s"
- "&NETDATA_SYSTEM_OS_NAME=%s"
- "&NETDATA_SYSTEM_OS_ID=%s"
- "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
- "&NETDATA_SYSTEM_OS_VERSION=%s"
- "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
- "&NETDATA_SYSTEM_OS_DETECTION=%s"
- "&NETDATA_SYSTEM_KERNEL_NAME=%s"
- "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
- "&NETDATA_SYSTEM_ARCHITECTURE=%s"
- "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
- "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
- "&NETDATA_SYSTEM_CONTAINER=%s"
- "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
- " HTTP/1.1\r\n"
- "User-Agent: %s/%s\r\n"
- "Accept: */*\r\n\r\n"
- , host->rrdpush_send_api_key
- , host->hostname
- , host->registry_hostname
- , host->machine_guid
- , default_rrd_update_every
- , host->os
- , host->timezone
- , (host->tags) ? host->tags : ""
- , (host->system_info->os_name) ? host->system_info->os_name : ""
- , (host->system_info->os_id) ? host->system_info->os_id : ""
- , (host->system_info->os_id_like) ? host->system_info->os_id_like : ""
- , (host->system_info->os_version) ? host->system_info->os_version : ""
- , (host->system_info->os_version_id) ? host->system_info->os_version_id : ""
- , (host->system_info->os_detection) ? host->system_info->os_detection : ""
- , (host->system_info->kernel_name) ? host->system_info->kernel_name : ""
- , (host->system_info->kernel_version) ? host->system_info->kernel_version : ""
- , (host->system_info->architecture) ? host->system_info->architecture : ""
- , (host->system_info->virtualization) ? host->system_info->virtualization : ""
- , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
- , (host->system_info->container) ? host->system_info->container : ""
- , (host->system_info->container_detection) ? host->system_info->container_detection : ""
- , host->program_name
- , host->program_version
- );
- http[eol] = 0x00;
-
-#ifdef ENABLE_HTTPS
- if (!host->ssl.flags) {
- ERR_clear_error();
- SSL_set_connect_state(host->ssl.conn);
- int err = SSL_connect(host->ssl.conn);
- if (err != 1){
- err = SSL_get_error(host->ssl.conn, err);
- error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->ssl.conn,err),NULL));
- if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
- rrdpush_sender_thread_close_socket(host);
- return 0;
- }else {
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
- }
- else {
- if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
- if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
- if ( security_test_certificate(host->ssl.conn)) {
- error("Closing the stream connection, because the server SSL certificate is not valid.");
- rrdpush_sender_thread_close_socket(host);
- return 0;
- }
- }
- }
- }
- }
- if(send_timeout(&host->ssl,host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
-#else
- if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
-#endif
- error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, connected_to);
- rrdpush_sender_thread_close_socket(host);
- return 0;
- }
-
- info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, connected_to);
-
-#ifdef ENABLE_HTTPS
- if(recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) {
-#else
- if(recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) {
-#endif
- error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, connected_to);
- rrdpush_sender_thread_close_socket(host);
- return 0;
- }
-
- if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)) != 0) {
- error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, connected_to);
- rrdpush_sender_thread_close_socket(host);
- return 0;
- }
-
- info("STREAM %s [send to %s]: established communication - ready to send metrics...", host->hostname, connected_to);
-
- if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, connected_to);
-
- if(sock_enlarge_out(host->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, connected_to);
-
- debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket);
-
- return 1;
-}
-
-static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
- RRDHOST *host = (RRDHOST *)ptr;
-
- rrdpush_buffer_lock(host);
- rrdhost_wrlock(host);
-
- info("STREAM %s [send]: sending thread cleans up...", host->hostname);
-
- rrdpush_sender_thread_close_socket(host);
-
- // close the pipe
- if(host->rrdpush_sender_pipe[PIPE_READ] != -1) {
- close(host->rrdpush_sender_pipe[PIPE_READ]);
- host->rrdpush_sender_pipe[PIPE_READ] = -1;
- }
-
- if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1) {
- close(host->rrdpush_sender_pipe[PIPE_WRITE]);
- host->rrdpush_sender_pipe[PIPE_WRITE] = -1;
- }
-
- buffer_free(host->rrdpush_sender_buffer);
- host->rrdpush_sender_buffer = NULL;
-
- if(!host->rrdpush_sender_join) {
- info("STREAM %s [send]: sending thread detaches itself.", host->hostname);
- netdata_thread_detach(netdata_thread_self());
- }
-
- host->rrdpush_sender_spawn = 0;
-
- info("STREAM %s [send]: sending thread now exits.", host->hostname);
-
- rrdhost_unlock(host);
- rrdpush_buffer_unlock(host);
-}
-
-void *rrdpush_sender_thread(void *ptr) {
- RRDHOST *host = (RRDHOST *)ptr;
-
- if(!host->rrdpush_send_enabled || !host->rrdpush_send_destination || !*host->rrdpush_send_destination || !host->rrdpush_send_api_key || !*host->rrdpush_send_api_key) {
- error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", host->hostname, gettid());
- return NULL;
- }
-
-#ifdef ENABLE_HTTPS
- if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){
- security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
- security_location_for_context(netdata_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
- }
-#endif
-
- info("STREAM %s [send]: thread created (task id %d)", host->hostname, gettid());
-
- int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
- int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
- size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
- unsigned int reconnect_delay = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
- remote_clock_resync_iterations = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations);
- char connected_to[CONNECTED_TO_SIZE + 1] = "";
-
- // initialize rrdpush globals
- host->rrdpush_sender_buffer = buffer_create(1);
- host->rrdpush_sender_connected = 0;
- if(pipe(host->rrdpush_sender_pipe) == -1) fatal("STREAM %s [send]: cannot create required pipe.", host->hostname);
-
- // initialize local variables
- size_t begin = 0;
- size_t reconnects_counter = 0;
- size_t sent_bytes = 0;
- size_t sent_bytes_on_this_connection = 0;
- size_t send_attempts = 0;
-
-
- time_t last_sent_t = 0;
- struct pollfd fds[2], *ifd, *ofd;
- nfds_t fdmax;
-
- ifd = &fds[0];
- ofd = &fds[1];
-
- size_t not_connected_loops = 0;
-
- netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, host);
-
- for(; host->rrdpush_send_enabled && !netdata_exit ;) {
- // check for outstanding cancellation requests
- netdata_thread_testcancel();
-
- // if we don't have socket open, lets wait a bit
- if(unlikely(host->rrdpush_sender_socket == -1)) {
- send_attempts = 0;
-
- if(not_connected_loops == 0 && sent_bytes_on_this_connection > 0) {
- // fast re-connection on first disconnect
- sleep_usec(USEC_PER_MS * 500); // milliseconds
- }
- else {
- // slow re-connection on repeating errors
- sleep_usec(USEC_PER_SEC * reconnect_delay); // seconds
- }
-
- if(rrdpush_sender_thread_connect_to_master(host, default_port, timeout, &reconnects_counter, connected_to, CONNECTED_TO_SIZE)) {
- last_sent_t = now_monotonic_sec();
-
- // reset the buffer, to properly send charts and metrics
- rrdpush_sender_thread_data_flush(host);
-
- // send from the beginning
- begin = 0;
-
- // make sure the next reconnection will be immediate
- not_connected_loops = 0;
-
- // reset the bytes we have sent for this session
- sent_bytes_on_this_connection = 0;
-
- // let the data collection threads know we are ready
- host->rrdpush_sender_connected = 1;
- }
- else {
- // increase the failed connections counter
- not_connected_loops++;
-
- // reset the number of bytes sent
- sent_bytes_on_this_connection = 0;
- }
-
- // loop through
- continue;
- }
- else if(unlikely(now_monotonic_sec() - last_sent_t > timeout)) {
- error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", host->hostname, connected_to, timeout, sent_bytes_on_this_connection, send_attempts);
- rrdpush_sender_thread_close_socket(host);
- }
-
- ifd->fd = host->rrdpush_sender_pipe[PIPE_READ];
- ifd->events = POLLIN;
- ifd->revents = 0;
-
- ofd->fd = host->rrdpush_sender_socket;
- ofd->revents = 0;
- if(ofd->fd != -1 && begin < buffer_strlen(host->rrdpush_sender_buffer)) {
- debug(D_STREAM, "STREAM: Requesting data output on streaming socket %d...", ofd->fd);
- ofd->events = POLLOUT;
- fdmax = 2;
- send_attempts++;
- }
- else {
- debug(D_STREAM, "STREAM: Not requesting data output on streaming socket %d (nothing to send now)...", ofd->fd);
- ofd->events = 0;
- fdmax = 1;
- }
-
- debug(D_STREAM, "STREAM: Waiting for poll() events (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
- if(unlikely(netdata_exit)) break;
- int retval = poll(fds, fdmax, 1000);
- if(unlikely(netdata_exit)) break;
-
- if(unlikely(retval == -1)) {
- debug(D_STREAM, "STREAM: poll() failed (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
-
- if(errno == EAGAIN || errno == EINTR) {
- debug(D_STREAM, "STREAM: poll() failed with EAGAIN or EINTR...");
- }
- else {
- error("STREAM %s [send to %s]: failed to poll(). Closing socket.", host->hostname, connected_to);
- rrdpush_sender_thread_close_socket(host);
- }
-
- continue;
- }
- else if(likely(retval)) {
- if (ifd->revents & POLLIN || ifd->revents & POLLPRI) {
- debug(D_STREAM, "STREAM: Data added to send buffer (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
-
- char buffer[1000 + 1];
- if (read(host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1)
- error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to);
- }
-
- if (ofd->revents & POLLOUT) {
- if (begin < buffer_strlen(host->rrdpush_sender_buffer)) {
- debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_sender_buffer), begin);
-
- // BEGIN RRDPUSH LOCKED SESSION
-
- // during this session, data collectors
- // will not be able to append data to our buffer
- // but the socket is in non-blocking mode
- // so, we will not block at send()
-
- netdata_thread_disable_cancelability();
-
- debug(D_STREAM, "STREAM: Getting exclusive lock on host...");
- rrdpush_buffer_lock(host);
-
- debug(D_STREAM, "STREAM: Sending data, starting from %zu, size %zu...", begin, buffer_strlen(host->rrdpush_sender_buffer));
- ssize_t ret;
-#ifdef ENABLE_HTTPS
- SSL *conn = host->ssl.conn ;
- if(conn && !host->ssl.flags) {
- ret = SSL_write(conn,&host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin);
- } else {
- ret = send(host->rrdpush_sender_socket, &host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin, MSG_DONTWAIT);
- }
-#else
- ret = send(host->rrdpush_sender_socket, &host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin, MSG_DONTWAIT);
-#endif
- if (unlikely(ret == -1)) {
- if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) {
- debug(D_STREAM, "STREAM: Send failed - closing socket...");
- error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_bytes_on_this_connection);
- rrdpush_sender_thread_close_socket(host);
- }
- else {
- debug(D_STREAM, "STREAM: Send failed - will retry...");
- }
- }
- else if (likely(ret > 0)) {
- // DEBUG - dump the string to see it
- //char c = host->rrdpush_sender_buffer->buffer[begin + ret];
- //host->rrdpush_sender_buffer->buffer[begin + ret] = '\0';
- //debug(D_STREAM, "STREAM: sent from %zu to %zd:\n%s\n", begin, ret, &host->rrdpush_sender_buffer->buffer[begin]);
- //host->rrdpush_sender_buffer->buffer[begin + ret] = c;
-
- sent_bytes_on_this_connection += ret;
- sent_bytes += ret;
- begin += ret;
-
- if (begin == buffer_strlen(host->rrdpush_sender_buffer)) {
- // we send it all
-
- debug(D_STREAM, "STREAM: Sent %zd bytes (the whole buffer)...", ret);
- buffer_flush(host->rrdpush_sender_buffer);
- begin = 0;
- }
- else {
- debug(D_STREAM, "STREAM: Sent %zd bytes (part of the data buffer)...", ret);
- }
-
- last_sent_t = now_monotonic_sec();
- }
- else {
- debug(D_STREAM, "STREAM: send() returned %zd - closing the socket...", ret);
- error("STREAM %s [send to %s]: failed to send metrics (send() returned %zd) - closing connection - we have sent %zu bytes on this connection.",
- host->hostname, connected_to, ret, sent_bytes_on_this_connection);
- rrdpush_sender_thread_close_socket(host);
- }
-
- debug(D_STREAM, "STREAM: Releasing exclusive lock on host...");
- rrdpush_buffer_unlock(host);
-
- netdata_thread_enable_cancelability();
-
- // END RRDPUSH LOCKED SESSION
- }
- else {
- debug(D_STREAM, "STREAM: we have sent the entire buffer, but we received POLLOUT...");
- }
- }
-
- if(host->rrdpush_sender_socket != -1) {
- char *error = NULL;
-
- if (unlikely(ofd->revents & POLLERR))
- error = "socket reports errors (POLLERR)";
-
- else if (unlikely(ofd->revents & POLLHUP))
- error = "connection closed by remote end (POLLHUP)";
-
- else if (unlikely(ofd->revents & POLLNVAL))
- error = "connection is invalid (POLLNVAL)";
-
- if(unlikely(error)) {
- debug(D_STREAM, "STREAM: %s - closing socket...", error);
- error("STREAM %s [send to %s]: %s - reopening socket - we have sent %zu bytes on this connection.", host->hostname, connected_to, error, sent_bytes_on_this_connection);
- rrdpush_sender_thread_close_socket(host);
- }
- }
- }
- else {
- debug(D_STREAM, "STREAM: poll() timed out.");
- }
-
- // protection from overflow
- if(buffer_strlen(host->rrdpush_sender_buffer) > max_size) {
- debug(D_STREAM, "STREAM: Buffer is too big (%zu bytes), bigger than the max (%zu) - flushing it...", buffer_strlen(host->rrdpush_sender_buffer), max_size);
- errno = 0;
- error("STREAM %s [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", host->hostname, connected_to, host->rrdpush_sender_buffer->len, host->rrdpush_sender_buffer->len - begin, sent_bytes, sent_bytes_on_this_connection);
- rrdpush_sender_thread_close_socket(host);
- }
- }
-
- netdata_thread_cleanup_pop(1);
- return NULL;
-}
-
// ----------------------------------------------------------------------------
// rrdpush receiver thread
-static void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
+void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
}
-static RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY get_multiple_connections_strategy(struct config *c, const char *section, const char *name, RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY def) {
- char *value;
- switch(def) {
- default:
- case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
- value = "allow";
- break;
-
- case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
- value = "deny";
- break;
- }
-
- value = appconfig_get(c, section, name, value);
-
- RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY ret = def;
-
- if(strcasecmp(value, "allow") == 0 || strcasecmp(value, "permit") == 0 || strcasecmp(value, "accept") == 0)
- ret = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
-
- else if(strcasecmp(value, "deny") == 0 || strcasecmp(value, "reject") == 0 || strcasecmp(value, "block") == 0)
- ret = RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW;
-
- else
- error("Invalid stream config value at section [%s], setting '%s', value '%s'", section, name, value);
-
- return ret;
-}
-
-static int rrdpush_receive(int fd
- , const char *key
- , const char *hostname
- , const char *registry_hostname
- , const char *machine_guid
- , const char *os
- , const char *timezone
- , const char *tags
- , const char *program_name
- , const char *program_version
- , struct rrdhost_system_info *system_info
- , int update_every
- , char *client_ip
- , char *client_port
-#ifdef ENABLE_HTTPS
- , struct netdata_ssl *ssl
-#endif
-) {
- RRDHOST *host;
- int history = default_rrd_history_entries;
- RRD_MEMORY_MODE mode = default_rrd_memory_mode;
- int health_enabled = default_health_enabled;
- int rrdpush_enabled = default_rrdpush_enabled;
- char *rrdpush_destination = default_rrdpush_destination;
- char *rrdpush_api_key = default_rrdpush_api_key;
- char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
- time_t alarms_delay = 60;
- RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
-
- update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
- if(update_every < 0) update_every = 1;
-
- history = (int)appconfig_get_number(&stream_config, key, "default history", history);
- history = (int)appconfig_get_number(&stream_config, machine_guid, "history", history);
- if(history < 5) history = 5;
-
- mode = rrd_memory_mode_id(appconfig_get(&stream_config, key, "default memory mode", rrd_memory_mode_name(mode)));
- mode = rrd_memory_mode_id(appconfig_get(&stream_config, machine_guid, "memory mode", rrd_memory_mode_name(mode)));
-
- health_enabled = appconfig_get_boolean_ondemand(&stream_config, key, "health enabled by default", health_enabled);
- health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled);
-
- alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay);
- alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay);
-
- rrdpush_enabled = appconfig_get_boolean(&stream_config, key, "default proxy enabled", rrdpush_enabled);
- rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled);
-
- rrdpush_destination = appconfig_get(&stream_config, key, "default proxy destination", rrdpush_destination);
- rrdpush_destination = appconfig_get(&stream_config, machine_guid, "proxy destination", rrdpush_destination);
-
- rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
- rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
-
- rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy);
- rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy);
-
- rrdpush_send_charts_matching = appconfig_get(&stream_config, key, "default proxy send charts matching", rrdpush_send_charts_matching);
- rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
-
- tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:"");
- if(tags && !*tags) tags = NULL;
-
- if (strcmp(machine_guid, localhost->machine_guid) == 0) {
- log_stream_connection(client_ip, client_port, key, machine_guid, hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO MASTER");
- error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the master/proxy machine guid to a slave?", hostname, client_ip, client_port, machine_guid);
- close(fd);
- return 1;
- }
- else
- host = rrdhost_find_or_create(
- hostname
- , registry_hostname
- , machine_guid
- , os
- , timezone
- , tags
- , program_name
- , program_version
- , update_every
- , history
- , mode
- , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
- , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
- , rrdpush_destination
- , rrdpush_api_key
- , rrdpush_send_charts_matching
- , system_info
- );
-
- if(!host) {
- close(fd);
- log_stream_connection(client_ip, client_port, key, machine_guid, hostname, "FAILED - CANNOT ACQUIRE HOST");
- error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", hostname, client_ip, client_port);
- return 1;
- }
-
-#ifdef NETDATA_INTERNAL_CHECKS
- info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s, tags '%s'"
- , hostname
- , client_ip
- , client_port
- , host->hostname
- , host->machine_guid
- , host->rrd_update_every
- , host->rrd_history_entries
- , rrd_memory_mode_name(host->rrd_memory_mode)
- , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
- , host->tags?host->tags:""
- );
-#endif // NETDATA_INTERNAL_CHECKS
-
- struct plugind cd = {
- .enabled = 1,
- .update_every = default_rrd_update_every,
- .pid = 0,
- .serial_failures = 0,
- .successful_collections = 0,
- .obsolete = 0,
- .started_t = now_realtime_sec(),
- .next = NULL,
- };
-
- // put the client IP and port into the buffers used by plugins.d
- snprintfz(cd.id, CONFIG_MAX_NAME, "%s:%s", client_ip, client_port);
- snprintfz(cd.filename, FILENAME_MAX, "%s:%s", client_ip, client_port);
- snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", client_ip, client_port);
- snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
-
- info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
-#ifdef ENABLE_HTTPS
- host->stream_ssl.conn = ssl->conn;
- host->stream_ssl.flags = ssl->flags;
- if(send_timeout(ssl,fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
-#else
- if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
-#endif
- log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
- close(fd);
- return 0;
- }
-
- // remove the non-blocking flag from the socket
- if(sock_delnonblock(fd) < 0)
- error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", host->hostname, client_ip, client_port, fd);
-
- // convert the socket to a FILE *
- FILE *fp = fdopen(fd, "r");
- if(!fp) {
- log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - SOCKET ERROR");
- error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", host->hostname, client_ip, client_port, fd);
- close(fd);
- return 0;
- }
-
- rrdhost_wrlock(host);
- if(host->connected_senders > 0) {
- switch(rrdpush_multiple_connections_strategy) {
- case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
- info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port);
- break;
-
- case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
- rrdhost_unlock(host);
- log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
- info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
- fclose(fp);
- return 0;
- }
- }
-
- rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
- host->connected_senders++;
- host->senders_disconnected_time = 0;
- if(health_enabled != CONFIG_BOOLEAN_NO) {
- if(alarms_delay > 0) {
- host->health_delay_up_to = now_realtime_sec() + alarms_delay;
- info("Postponing health checks for %ld seconds, on host '%s', because it was just connected."
- , alarms_delay
- , host->hostname
- );
- }
- }
- rrdhost_unlock(host);
-
- // call the plugins.d processor to receive the metrics
- info("STREAM %s [receive from [%s]:%s]: receiving metrics...", host->hostname, client_ip, client_port);
- log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "CONNECTED");
-
- size_t count = pluginsd_process(host, &cd, fp, 1);
-
- log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "DISCONNECTED");
- error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", host->hostname, client_ip, client_port, count);
-
- rrdhost_wrlock(host);
- host->senders_disconnected_time = now_realtime_sec();
- host->connected_senders--;
- if(!host->connected_senders) {
- rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
- if(health_enabled == CONFIG_BOOLEAN_AUTO)
- host->health_enabled = 0;
- }
- rrdhost_unlock(host);
-
- if(host->connected_senders == 0)
- rrdpush_sender_thread_stop(host);
-
- // cleanup
- fclose(fp);
-
- return (int)count;
-}
-
-struct rrdpush_thread {
- int fd;
- char *key;
- char *hostname;
- char *registry_hostname;
- char *machine_guid;
- char *os;
- char *timezone;
- char *tags;
- char *client_ip;
- char *client_port;
- char *program_name;
- char *program_version;
- struct rrdhost_system_info *system_info;
- int update_every;
-#ifdef ENABLE_HTTPS
- struct netdata_ssl ssl;
-#endif
-};
-
-static void rrdpush_receiver_thread_cleanup(void *ptr) {
- static __thread int executed = 0;
- if(!executed) {
- executed = 1;
- struct rrdpush_thread *rpt = (struct rrdpush_thread *) ptr;
-
- info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
-
- freez(rpt->key);
- freez(rpt->hostname);
- freez(rpt->registry_hostname);
- freez(rpt->machine_guid);
- freez(rpt->os);
- freez(rpt->timezone);
- freez(rpt->tags);
- freez(rpt->client_ip);
- freez(rpt->client_port);
- freez(rpt->program_name);
- freez(rpt->program_version);
-#ifdef ENABLE_HTTPS
- if(rpt->ssl.conn){
- SSL_free(rpt->ssl.conn);
- }
-#endif
- freez(rpt);
-
- }
-}
-
-static void *rrdpush_receiver_thread(void *ptr) {
- netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
-
- struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr;
- info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
-
- rrdpush_receive(
- rpt->fd
- , rpt->key
- , rpt->hostname
- , rpt->registry_hostname
- , rpt->machine_guid
- , rpt->os
- , rpt->timezone
- , rpt->tags
- , rpt->program_name
- , rpt->program_version
- , rpt->system_info
- , rpt->update_every
- , rpt->client_ip
- , rpt->client_port
-#ifdef ENABLE_HTTPS
- , &rpt->ssl
-#endif
- );
-
- netdata_thread_cleanup_pop(1);
- return NULL;
-}
static void rrdpush_sender_thread_spawn(RRDHOST *host) {
- rrdhost_wrlock(host);
+ netdata_mutex_lock(&host->sender->mutex);
if(!host->rrdpush_sender_spawn) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname);
- if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host))
+ if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender))
error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
else
host->rrdpush_sender_spawn = 1;
}
-
- rrdhost_unlock(host);
+ netdata_mutex_unlock(&host->sender->mutex);
}
int rrdpush_receiver_permission_denied(struct web_client *w) {
@@ -1273,13 +460,13 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) {
return 503;
}
-int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
- (void)host;
-
+void *rrdpush_receiver_thread(void *ptr);
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
info("clients wants to STREAM metrics.");
char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
int update_every = default_rrd_update_every;
+ uint32_t stream_version = UINT_MAX;
char buf[GUID_LEN + 1];
struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
@@ -1308,12 +495,36 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
timezone = value;
else if(!strcmp(name, "tags"))
tags = value;
- else
- if(unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
- info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value);
+ else if(!strcmp(name, "ver"))
+ stream_version = MIN((uint32_t) strtoul(value, NULL, 0), STREAMING_PROTOCOL_CURRENT_VERSION);
+ else {
+ // An old Netdata child does not have a compatible streaming protocol, map to something sane.
+ if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
+ name = "NETDATA_HOST_OS_NAME";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
+ name = "NETDATA_HOST_OS_ID";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
+ name = "NETDATA_HOST_OS_ID_LIKE";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
+ name = "NETDATA_HOST_OS_VERSION";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
+ name = "NETDATA_HOST_OS_VERSION_ID";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
+ name = "NETDATA_HOST_OS_DETECTION";
+ else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) {
+ stream_version = 1;
}
+
+ if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
+ info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
+ w->client_ip, w->client_port, name, value);
+ }
+ }
}
+ if (stream_version == UINT_MAX)
+ stream_version = 0;
+
if(!key || !*key) {
rrdhost_system_info_free(system_info);
log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY");
@@ -1412,7 +623,56 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
netdata_mutex_unlock(&stream_rate_mutex);
}
- struct rrdpush_thread *rpt = callocz(1, sizeof(struct rrdpush_thread));
+ /*
+ * Quick path for rejecting multiple connections. The lock taken is fine-grained - it only protects the receiver
+ * pointer within the host (if a host exists). This protects against multiple concurrent web requests hitting
+ * separate threads within the web-server and landing here. The lock guards the thread-shutdown sequence that
+ * detaches the receiver from the host. If the host is being created (first time-access) then we also use the
+ * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a
+ * lookup to the now-attached structure).
+ */
+ struct receiver_state *rpt = callocz(1, sizeof(*rpt));
+
+ rrd_rdlock();
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
+ if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
+ host = NULL;
+ if (host) {
+ rrdhost_wrlock(host);
+ netdata_mutex_lock(&host->receiver_lock);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ host->senders_disconnected_time = 0;
+ if (host->receiver != NULL) {
+ time_t age = now_realtime_sec() - host->receiver->last_msg_t;
+ if (age > 30) {
+ host->receiver->shutdown = 1;
+ shutdown(host->receiver->fd, SHUT_RDWR);
+ host->receiver = NULL; // Thread holds reference to structure
+ info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is dead (%ld sec), accepting new connection.", host->hostname, w->client_ip, w->client_port, age);
+ }
+ else {
+ netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
+ rrd_unlock();
+ log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname,
+ "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is active (within last %ld sec), rejecting new connection.", host->hostname, w->client_ip, w->client_port, age);
+ // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, "This GUID is already streaming to this server");
+ freez(rpt);
+ return 409;
+ }
+ }
+ host->receiver = rpt;
+ netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
+ }
+ rrd_unlock();
+
+ rpt->last_msg_t = now_realtime_sec();
+
+ rpt->host = host;
rpt->fd = w->ifd;
rpt->key = strdupz(key);
rpt->hostname = strdupz(hostname);
@@ -1425,6 +685,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
rpt->client_port = strdupz(w->client_port);
rpt->update_every = update_every;
rpt->system_info = system_info;
+ rpt->stream_version = stream_version;
#ifdef ENABLE_HTTPS
rpt->ssl.conn = w->ssl.conn;
rpt->ssl.flags = w->ssl.flags;
@@ -1445,14 +706,13 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
}
- netdata_thread_t thread;
debug(D_SYSTEM, "starting STREAM receive thread.");
char tag[FILENAME_MAX + 1];
snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
- if(netdata_thread_create(&thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
+ if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
error("Failed to create new STREAM receive thread for client.");
// prevent the caller from closing the streaming socket
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 7bf3db93a..225d0c299 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -3,23 +3,115 @@
#ifndef NETDATA_RRDPUSH_H
#define NETDATA_RRDPUSH_H 1
+#include "../database/rrd.h"
+#include "../libnetdata/libnetdata.h"
#include "web/server/web_client.h"
#include "daemon/common.h"
+#define CONNECTED_TO_SIZE 100
+
+// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3
+#define VERSION_GAP_FILLING 4
+#define STREAM_VERSION_CLAIM 3
+
+#define STREAMING_PROTOCOL_VERSION "1.1"
+#define START_STREAMING_PROMPT "Hit me baby, push them over..."
+#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
+#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
+
+#define HTTP_HEADER_SIZE 8192
+
+typedef enum {
+ RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
+ RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
+} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
+
+typedef struct {
+ char *os_name;
+ char *os_id;
+ char *os_version;
+ char *kernel_name;
+ char *kernel_version;
+} stream_encoded_t;
+
+// Thread-local storage
+ // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+
+struct sender_state {
+ RRDHOST *host;
+ pid_t task_id;
+ unsigned int overflow:1;
+ int timeout, default_port;
+ usec_t reconnect_delay;
+ char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
+ size_t begin;
+ size_t reconnects_counter;
+ size_t sent_bytes;
+ size_t sent_bytes_on_this_connection;
+ size_t send_attempts;
+ time_t last_sent_t;
+ size_t not_connected_loops;
+ // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
+ // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
+ netdata_mutex_t mutex;
+ struct circular_buffer *buffer;
+ BUFFER *build;
+ char read_buffer[512];
+ int read_len;
+ int32_t version;
+};
+
+struct receiver_state {
+ RRDHOST *host;
+ netdata_thread_t thread;
+ int fd;
+ char *key;
+ char *hostname;
+ char *registry_hostname;
+ char *machine_guid;
+ char *os;
+ char *timezone; // Unused?
+ char *tags;
+ char *client_ip; // Duplicated in pluginsd
+ char *client_port; // Duplicated in pluginsd
+ char *program_name; // Duplicated in pluginsd
+ char *program_version;
+ struct rrdhost_system_info *system_info;
+ int update_every;
+ uint32_t stream_version;
+ time_t last_msg_t;
+ char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields
+ int read_len;
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl ssl;
+#endif
+ unsigned int shutdown:1; // Tell the thread to exit
+ unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
+};
+
+
extern unsigned int default_rrdpush_enabled;
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
extern unsigned int remote_clock_resync_iterations;
+extern void sender_init(struct sender_state *s, RRDHOST *parent);
+void sender_start(struct sender_state *s);
+void sender_commit(struct sender_state *s);
extern int rrdpush_init();
+extern int configured_as_parent();
extern void rrdset_done_push(RRDSET *st);
extern void rrdset_push_chart_definition_now(RRDSET *st);
extern void *rrdpush_sender_thread(void *ptr);
+extern void rrdpush_send_labels(RRDHOST *host);
+extern void rrdpush_claimed_id(RRDHOST *host);
-extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url);
+extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
extern void rrdpush_sender_thread_stop(RRDHOST *host);
extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
+extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
new file mode 100644
index 000000000..d55a420ab
--- /dev/null
+++ b/streaming/sender.c
@@ -0,0 +1,723 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+
+extern struct config stream_config;
+extern int netdata_use_ssl_on_stream;
+extern char *netdata_ssl_ca_path;
+extern char *netdata_ssl_ca_file;
+
+// Collector thread starting a transmission
+void sender_start(struct sender_state *s) {
+ netdata_mutex_lock(&s->mutex);
+ buffer_flush(s->build);
+}
+
+// Collector thread finishing a transmission
+void sender_commit(struct sender_state *s) {
+ if(cbuffer_add_unsafe(s->host->sender->buffer, buffer_tostring(s->host->sender->build),
+ s->host->sender->build->len))
+ s->overflow = 1;
+ buffer_flush(s->build);
+ netdata_mutex_unlock(&s->mutex);
+}
+
+
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+ host->rrdpush_sender_connected = 0;
+
+ if(host->rrdpush_sender_socket != -1) {
+ close(host->rrdpush_sender_socket);
+ host->rrdpush_sender_socket = -1;
+ }
+}
+
+static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) {
+ calculated_number *value = (calculated_number *)rv->value;
+
+ buffer_sprintf(
+ host->sender->build
+ , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n"
+ , rv->name
+ , *value
+ );
+
+ debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value);
+}
+
+void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
+ if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) {
+ sender_start(host->sender);
+ rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
+ sender_commit(host->sender);
+ }
+}
+
+
+static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) {
+ RRDVAR *rv = (RRDVAR *)rrdvar_ptr;
+ RRDHOST *host = (RRDHOST *)host_ptr;
+
+ if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) {
+ rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
+
+ // return 1, so that the traversal will return the number of variables sent
+ return 1;
+ }
+
+ // returning a negative number will break the traversal
+ return 0;
+}
+
+static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
+ sender_start(host->sender);
+ int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host);
+ (void)ret;
+ sender_commit(host->sender);
+
+ debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+}
+
+// resets all the chart, so that their definitions
+// will be resent to the central netdata
+static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
+ rrdhost_rdlock(host);
+
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+
+ st->upstream_resync_time = 0;
+
+ rrdset_rdlock(st);
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st)
+ rd->exposed = 0;
+
+ rrdset_unlock(st);
+ }
+
+ rrdhost_unlock(host);
+}
+
+static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
+ netdata_mutex_lock(&host->sender->mutex);
+
+ size_t len = cbuffer_next_unsafe(host->sender->buffer, NULL);
+ if (len)
+ error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, len);
+
+ cbuffer_remove_unsafe(host->sender->buffer, len);
+ netdata_mutex_unlock(&host->sender->mutex);
+
+ rrdpush_sender_thread_reset_all_charts(host);
+ rrdpush_sender_thread_send_custom_host_variables(host);
+}
+
+static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) {
+ host->labels.labels_flag |= LABEL_FLAG_UPDATE_STREAM;
+ host->labels.labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+}
+
+void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
+{
+ se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):"";
+ se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):"";
+ se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):"";
+ se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):"";
+ se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):"";
+}
+
+void rrdpush_clean_encoded(stream_encoded_t *se)
+{
+ if (se->os_name)
+ freez(se->os_name);
+
+ if (se->os_id)
+ freez(se->os_id);
+
+ if (se->os_version)
+ freez(se->os_version);
+
+ if (se->kernel_name)
+ freez(se->kernel_name);
+
+ if (se->kernel_version)
+ freez(se->kernel_version);
+}
+
+static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout,
+ struct sender_state *s) {
+
+ struct timeval tv = {
+ .tv_sec = timeout,
+ .tv_usec = 0
+ };
+
+ // make sure the socket is closed
+ rrdpush_sender_thread_close_socket(host);
+
+ debug(D_STREAM, "STREAM: Attempting to connect...");
+ info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination);
+
+ host->rrdpush_sender_socket = connect_to_one_of(
+ host->rrdpush_send_destination
+ , default_port
+ , &tv
+ , &s->reconnects_counter
+ , s->connected_to
+ , sizeof(s->connected_to)-1
+ );
+
+ if(unlikely(host->rrdpush_sender_socket == -1)) {
+ error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination);
+ return 0;
+ }
+
+ info("STREAM %s [send to %s]: initializing communication...", host->hostname, s->connected_to);
+
+#ifdef ENABLE_HTTPS
+ if( netdata_client_ctx ){
+ host->ssl.flags = NETDATA_SSL_START;
+ if (!host->ssl.conn){
+ host->ssl.conn = SSL_new(netdata_client_ctx);
+ if(!host->ssl.conn){
+ error("Failed to allocate SSL structure.");
+ host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+ }
+ else{
+ SSL_clear(host->ssl.conn);
+ }
+
+ if (host->ssl.conn)
+ {
+ if (SSL_set_fd(host->ssl.conn, host->rrdpush_sender_socket) != 1) {
+ error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
+ host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ } else{
+ host->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
+ }
+ }
+ }
+ else {
+ host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+#endif
+
+ /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
+ version negotiation resulted in a high enough version.
+ */
+ stream_encoded_t se;
+ rrdpush_encode_variable(&se, host);
+
+ char http[HTTP_HEADER_SIZE + 1];
+ int eol = snprintfz(http, HTTP_HEADER_SIZE,
+ "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s&ver=%u"
+ "&NETDATA_SYSTEM_OS_NAME=%s"
+ "&NETDATA_SYSTEM_OS_ID=%s"
+ "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
+ "&NETDATA_SYSTEM_OS_VERSION=%s"
+ "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
+ "&NETDATA_SYSTEM_OS_DETECTION=%s"
+ "&NETDATA_HOST_IS_K8S_NODE=%s"
+ "&NETDATA_SYSTEM_KERNEL_NAME=%s"
+ "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
+ "&NETDATA_SYSTEM_ARCHITECTURE=%s"
+ "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
+ "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
+ "&NETDATA_SYSTEM_CONTAINER=%s"
+ "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
+ "&NETDATA_CONTAINER_OS_NAME=%s"
+ "&NETDATA_CONTAINER_OS_ID=%s"
+ "&NETDATA_CONTAINER_OS_ID_LIKE=%s"
+ "&NETDATA_CONTAINER_OS_VERSION=%s"
+ "&NETDATA_CONTAINER_OS_VERSION_ID=%s"
+ "&NETDATA_CONTAINER_OS_DETECTION=%s"
+ "&NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT=%s"
+ "&NETDATA_SYSTEM_CPU_FREQ=%s"
+ "&NETDATA_SYSTEM_TOTAL_RAM=%s"
+ "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
+ "&NETDATA_PROTOCOL_VERSION=%s"
+ " HTTP/1.1\r\n"
+ "User-Agent: %s/%s\r\n"
+ "Accept: */*\r\n\r\n"
+ , host->rrdpush_send_api_key
+ , host->hostname
+ , host->registry_hostname
+ , host->machine_guid
+ , default_rrd_update_every
+ , host->os
+ , host->timezone
+ , (host->tags) ? host->tags : ""
+ , STREAMING_PROTOCOL_CURRENT_VERSION
+ , se.os_name
+ , se.os_id
+ , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
+ , se.os_version
+ , (host->system_info->host_os_version_id) ? host->system_info->host_os_version_id : ""
+ , (host->system_info->host_os_detection) ? host->system_info->host_os_detection : ""
+ , (host->system_info->is_k8s_node) ? host->system_info->is_k8s_node : ""
+ , se.kernel_name
+ , se.kernel_version
+ , (host->system_info->architecture) ? host->system_info->architecture : ""
+ , (host->system_info->virtualization) ? host->system_info->virtualization : ""
+ , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
+ , (host->system_info->container) ? host->system_info->container : ""
+ , (host->system_info->container_detection) ? host->system_info->container_detection : ""
+ , (host->system_info->container_os_name) ? host->system_info->container_os_name : ""
+ , (host->system_info->container_os_id) ? host->system_info->container_os_id : ""
+ , (host->system_info->container_os_id_like) ? host->system_info->container_os_id_like : ""
+ , (host->system_info->container_os_version) ? host->system_info->container_os_version : ""
+ , (host->system_info->container_os_version_id) ? host->system_info->container_os_version_id : ""
+ , (host->system_info->container_os_detection) ? host->system_info->container_os_detection : ""
+ , (host->system_info->host_cores) ? host->system_info->host_cores : ""
+ , (host->system_info->host_cpu_freq) ? host->system_info->host_cpu_freq : ""
+ , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
+ , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
+ , STREAMING_PROTOCOL_VERSION
+ , host->program_name
+ , host->program_version
+ );
+ http[eol] = 0x00;
+ rrdpush_clean_encoded(&se);
+
+#ifdef ENABLE_HTTPS
+ if (!host->ssl.flags) {
+ ERR_clear_error();
+ SSL_set_connect_state(host->ssl.conn);
+ int err = SSL_connect(host->ssl.conn);
+ if (err != 1){
+ err = SSL_get_error(host->ssl.conn, err);
+ error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->ssl.conn,err),NULL));
+ if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }else {
+ host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+ }
+ else {
+ if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
+ if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
+ if ( security_test_certificate(host->ssl.conn)) {
+ error("Closing the stream connection, because the server SSL certificate is not valid.");
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }
+ }
+ }
+ }
+ }
+ if(send_timeout(&host->ssl,host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
+#else
+ if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
+#endif
+ error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }
+
+ info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, s->connected_to);
+
+ ssize_t received;
+#ifdef ENABLE_HTTPS
+ received = recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
+ if(received == -1) {
+#else
+ received = recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
+ if(received == -1) {
+#endif
+ error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }
+
+ http[received] = '\0';
+ debug(D_STREAM, "Response to sender from far end: %s", http);
+ int answer = -1;
+ char *version_start = strchr(http, '=');
+ int32_t version = -1;
+ if(version_start) {
+ version_start++;
+ version = (int32_t)strtol(version_start, NULL, 10);
+ answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(version_start - http));
+ if(!answer) {
+ rrdpush_set_flags_to_newest_stream(host);
+ }
+ } else {
+ answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
+ if(!answer) {
+ version = 1;
+ rrdpush_set_flags_to_newest_stream(host);
+ }
+ else {
+ answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
+ if(!answer) {
+ version = 0;
+ host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
+ host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ }
+ }
+ }
+
+ if(version == -1) {
+ error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }
+ s->version = version;
+
+ info("STREAM %s [send to %s]: established communication with a parent using protocol version %d - ready to send metrics..."
+ , host->hostname
+ , s->connected_to
+ , version);
+
+ if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, s->connected_to);
+
+ if(sock_enlarge_out(host->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, s->connected_to);
+
+ debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket);
+
+ return 1;
+}
+
+static void attempt_to_connect(struct sender_state *state)
+{
+ state->send_attempts = 0;
+
+ if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
+ state->last_sent_t = now_monotonic_sec();
+
+ // reset the buffer, to properly send charts and metrics
+ rrdpush_sender_thread_data_flush(state->host);
+
+ // send from the beginning
+ state->begin = 0;
+
+ // make sure the next reconnection will be immediate
+ state->not_connected_loops = 0;
+
+ // reset the bytes we have sent for this session
+ state->sent_bytes_on_this_connection = 0;
+
+ // let the data collection threads know we are ready
+ state->host->rrdpush_sender_connected = 1;
+ }
+ else {
+ // increase the failed connections counter
+ state->not_connected_loops++;
+
+ // reset the number of bytes sent
+ state->sent_bytes_on_this_connection = 0;
+
+ // slow re-connection on repeating errors
+ sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds
+ }
+}
+
+// TCP window is open and we have data to transmit.
+void attempt_to_send(struct sender_state *s) {
+
+ rrdpush_send_labels(s->host);
+
+ struct circular_buffer *cb = s->buffer;
+
+ netdata_thread_disable_cancelability();
+ netdata_mutex_lock(&s->mutex);
+ char *chunk;
+ size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
+ debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
+ ssize_t ret;
+#ifdef ENABLE_HTTPS
+ SSL *conn = s->host->ssl.conn ;
+ if(conn && !s->host->ssl.flags) {
+ ret = SSL_write(conn, chunk, outstanding);
+ } else {
+ ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
+ }
+#else
+ ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
+#endif
+ if (likely(ret > 0)) {
+ cbuffer_remove_unsafe(s->buffer, ret);
+ s->sent_bytes_on_this_connection += ret;
+ s->sent_bytes += ret;
+ debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret);
+ s->last_sent_t = now_monotonic_sec();
+ }
+ else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
+ debug(D_STREAM, "STREAM %s [send to %s]: unavailable aftering polling POLLOUT", s->host->hostname,
+ s->connected_to);
+ else if (ret == -1) {
+ debug(D_STREAM, "STREAM: Send failed - closing socket...");
+ error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", s->host->hostname, s->connected_to, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+ else {
+ debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
+ }
+
+ netdata_mutex_unlock(&s->mutex);
+ netdata_thread_enable_cancelability();
+}
+
+void attempt_read(struct sender_state *s) {
+int ret;
+#ifdef ENABLE_HTTPS
+ if (s->host->ssl.conn && !s->host->stream_ssl.flags) {
+ ERR_clear_error();
+ int desired = sizeof(s->read_buffer) - s->read_len - 1;
+ ret = SSL_read(s->host->ssl.conn, s->read_buffer, desired);
+ if (ret > 0 ) {
+ s->read_len += ret;
+ return;
+ }
+ int sslerrno = SSL_get_error(s->host->ssl.conn, desired);
+ if (sslerrno == SSL_ERROR_WANT_READ || sslerrno == SSL_ERROR_WANT_WRITE)
+ return;
+ u_long err;
+ char buf[256];
+ while ((err = ERR_get_error()) != 0) {
+ ERR_error_string_n(err, buf, sizeof(buf));
+ error("STREAM %s [send to %s] ssl error: %s", s->host->hostname, s->connected_to, buf);
+ }
+ error("Restarting connection");
+ rrdpush_sender_thread_close_socket(s->host);
+ return;
+ }
+#endif
+ ret = recv(s->host->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,
+ MSG_DONTWAIT);
+ if (ret>0) {
+ s->read_len += ret;
+ return;
+ }
+ debug(D_STREAM, "Socket was POLLIN, but req %zu bytes gave %d", sizeof(s->read_buffer) - s->read_len - 1, ret);
+ if (ret<0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
+ return;
+ if (ret==0)
+ error("STREAM %s [send to %s]: connection closed by far end. Restarting connection", s->host->hostname, s->connected_to);
+ else
+ error("STREAM %s [send to %s]: error during read (%d). Restarting connection", s->host->hostname, s->connected_to,
+ ret);
+ rrdpush_sender_thread_close_socket(s->host);
+}
+
+// This is just a placeholder until the gap filling state machine is inserted
+void execute_commands(struct sender_state *s) {
+ char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
+ *end = 0;
+ while( start<end && (newline=strchr(start, '\n')) ) {
+ *newline = 0;
+ info("STREAM %s [send to %s] received command over connection: %s", s->host->hostname, s->connected_to, start);
+ start = newline+1;
+ }
+ if (start<end) {
+ memmove(s->read_buffer, start, end-start);
+ s->read_len = end-start;
+ }
+}
+
+
+static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
+ RRDHOST *host = (RRDHOST *)ptr;
+
+ netdata_mutex_lock(&host->sender->mutex);
+
+ info("STREAM %s [send]: sending thread cleans up...", host->hostname);
+
+ rrdpush_sender_thread_close_socket(host);
+
+ // close the pipe
+ if(host->rrdpush_sender_pipe[PIPE_READ] != -1) {
+ close(host->rrdpush_sender_pipe[PIPE_READ]);
+ host->rrdpush_sender_pipe[PIPE_READ] = -1;
+ }
+
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1) {
+ close(host->rrdpush_sender_pipe[PIPE_WRITE]);
+ host->rrdpush_sender_pipe[PIPE_WRITE] = -1;
+ }
+
+ if(!host->rrdpush_sender_join) {
+ info("STREAM %s [send]: sending thread detaches itself.", host->hostname);
+ netdata_thread_detach(netdata_thread_self());
+ }
+
+ host->rrdpush_sender_spawn = 0;
+
+ info("STREAM %s [send]: sending thread now exits.", host->hostname);
+
+ netdata_mutex_unlock(&host->sender->mutex);
+}
+
+void sender_init(struct sender_state *s, RRDHOST *parent) {
+ memset(s, 0, sizeof(*s));
+ s->host = parent;
+ s->buffer = cbuffer_new(1024, 1024*1024);
+ s->build = buffer_create(1);
+ netdata_mutex_init(&s->mutex);
+}
+
+void *rrdpush_sender_thread(void *ptr) {
+ struct sender_state *s = ptr;
+ s->task_id = gettid();
+
+ if(!s->host->rrdpush_send_enabled || !s->host->rrdpush_send_destination ||
+ !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
+ !*s->host->rrdpush_send_api_key) {
+ error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
+ s->host->hostname, s->task_id);
+ return NULL;
+ }
+
+#ifdef ENABLE_HTTPS
+ if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){
+ security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
+ security_location_for_context(netdata_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
+ }
+#endif
+
+ info("STREAM %s [send]: thread created (task id %d)", s->host->hostname, s->task_id);
+
+ s->timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
+ s->default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
+ s->buffer->max_size =
+ (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
+ s->reconnect_delay =
+ (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
+ remote_clock_resync_iterations = (unsigned int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM,
+ "initial clock resync iterations",
+ remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
+
+ // initialize rrdpush globals
+ s->host->rrdpush_sender_connected = 0;
+ if(pipe(s->host->rrdpush_sender_pipe) == -1) {
+ error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
+ return NULL;
+ }
+
+ enum {
+ Collector,
+ Socket
+ };
+ struct pollfd fds[2];
+ fds[Collector].fd = s->host->rrdpush_sender_pipe[PIPE_READ];
+ fds[Collector].events = POLLIN;
+
+ netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, s->host);
+ for(; s->host->rrdpush_send_enabled && !netdata_exit ;) {
+ // check for outstanding cancellation requests
+ netdata_thread_testcancel();
+
+ // The connection attempt blocks (after which we use the socket in nonblocking)
+ if(unlikely(s->host->rrdpush_sender_socket == -1)) {
+ s->overflow = 0;
+ s->read_len = 0;
+ s->buffer->read = 0;
+ s->buffer->write = 0;
+ attempt_to_connect(s);
+ if (s->version >= VERSION_GAP_FILLING) {
+ time_t now = now_realtime_sec();
+ sender_start(s);
+ buffer_sprintf(s->build, "TIMESTAMP %ld", now);
+ sender_commit(s);
+ }
+ rrdpush_claimed_id(s->host);
+ continue;
+ }
+
+ // If the TCP window never opened then something is wrong, restart connection
+ if(unlikely(now_monotonic_sec() - s->last_sent_t > s->timeout)) {
+ error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", s->host->hostname, s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
+ rrdpush_sender_thread_close_socket(s->host);
+ continue;
+ }
+
+ // Wait until buffer opens in the socket or a rrdset_done_push wakes us
+ fds[Collector].revents = 0;
+ fds[Socket].revents = 0;
+ fds[Socket].fd = s->host->rrdpush_sender_socket;
+
+ netdata_mutex_lock(&s->mutex);
+ char *chunk;
+ size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk);
+ chunk = NULL; // Do not cache pointer outside of region - could be invalidated
+ netdata_mutex_unlock(&s->mutex);
+ if(outstanding) {
+ s->send_attempts++;
+ fds[Socket].events = POLLIN | POLLOUT;
+ }
+ else {
+ fds[Socket].events = POLLIN;
+ }
+
+ int retval = poll(fds, 2, 1000);
+ debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
+ fds[Collector].revents, fds[Socket].revents, outstanding);
+ if(unlikely(netdata_exit)) break;
+
+ // Spurious wake-ups without error - loop again
+ if (retval == 0 || ((retval == -1) && (errno == EAGAIN || errno == EINTR)))
+ {
+ debug(D_STREAM, "Spurious wakeup");
+ continue;
+ }
+ // Only errors from poll() are internal, but try restarting the connection
+ if(unlikely(retval == -1)) {
+ error("STREAM %s [send to %s]: failed to poll(). Closing socket.", s->host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(s->host);
+ continue;
+ }
+
+ // If the collector woke us up then empty the pipe to remove the signal
+ if (fds[Collector].revents & POLLIN || fds[Collector].revents & POLLPRI) {
+ debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
+
+ char buffer[1000 + 1];
+ if (read(s->host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1)
+ error("STREAM %s [send to %s]: cannot read from internal pipe.", s->host->hostname, s->connected_to);
+ }
+
+ // Read as much as possible to fill the buffer, split into full lines for execution.
+ if (fds[Socket].revents & POLLIN)
+ attempt_read(s);
+ execute_commands(s);
+
+ // If we have data and have seen the TCP window open then try to close it by a transmission.
+ if (outstanding && fds[Socket].revents & POLLOUT)
+ attempt_to_send(s);
+
+ // TODO-GAPS - why do we only check this on the socket, not the pipe?
+ if (outstanding) {
+ char *error = NULL;
+ if (unlikely(fds[Socket].revents & POLLERR))
+ error = "socket reports errors (POLLERR)";
+ else if (unlikely(fds[Socket].revents & POLLHUP))
+ error = "connection closed by remote end (POLLHUP)";
+ else if (unlikely(fds[Socket].revents & POLLNVAL))
+ error = "connection is invalid (POLLNVAL)";
+ if(unlikely(error)) {
+ error("STREAM %s [send to %s]: restart stream because %s - %zu bytes transmitted.", s->host->hostname,
+ s->connected_to, error, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+ }
+
+ // protection from overflow
+ if (s->overflow) {
+ errno = 0;
+ error("STREAM %s [send to %s]: buffer full (%zu-bytes) after %zu bytes. Restarting connection",
+ s->host->hostname, s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
diff --git a/streaming/stream.conf b/streaming/stream.conf
index fdff1f25f..b5142632d 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -8,10 +8,10 @@
# -----------------------------------------------------------------------------
-# 1. ON SLAVE NETDATA - THE ONE THAT WILL BE SENDING METRICS
+# 1. ON CHILD NETDATA - THE ONE THAT WILL BE SENDING METRICS
[stream]
- # Enable this on slaves, to have them send metrics.
+ # Enable this on child nodes, to have them send metrics.
enabled = no
# Where is the receiving netdata?
@@ -21,20 +21,20 @@
#
# If many are given, the first available will get the metrics.
#
- # PROTOCOL = tcp, udp, or unix (only tcp and unix are supported by masters)
+ # PROTOCOL = tcp, udp, or unix (only tcp and unix are supported by parent nodes)
# HOST = an IPv4, IPv6 IP, or a hostname, or a unix domain socket path.
# IPv6 IPs should be given with brackets [ip:address]
# INTERFACE = the network interface to use (only for IPv6)
# PORT = the port number or service name (/etc/services)
# SSL = when this word appear at the end of the destination string
- # the Netdata will do encrypt connection with the master.
+ # the Netdata will encrypt the connection with the parent.
#
# This communication is not HTTP (it cannot be proxied by web proxies).
destination =
# Skip Certificate verification?
#
- # The netdata slave is configurated to avoid invalid SSL/TLS certificate,
+ # The netdata child is configurated to avoid invalid SSL/TLS certificate,
# so certificates that are self-signed or expired will stop the streaming.
# Case the server certificate is not valid, you can enable the use of
# 'bad' certificates setting the next option as 'yes'.
@@ -51,7 +51,7 @@
# Certificate Authority file
#
- # When the Netdata master has certificate, that is not recognized as valid,
+ # When the Netdata parent has certificate, that is not recognized as valid,
# we can add this certificate in the list of known certificates in CApath
# and give for Netdata as argument.
#
@@ -87,16 +87,15 @@
# Sync the clock of the charts for that many iterations, when starting.
initial clock resync iterations = 60
-
# -----------------------------------------------------------------------------
-# 2. ON MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS
+# 2. ON PARENT NETDATA - THE ONE THAT WILL BE RECEIVING METRICS
-# You can have one API key per slave,
-# or the same API key for all slaves.
+# You can have one API key per child,
+# or the same API key for all child nodes.
#
# netdata searches for options in this order:
#
-# a) master netdata settings (netdata.conf)
+# a) parent netdata settings (netdata.conf)
# b) [stream] section (above)
# c) [API_KEY] section (below, settings for the API key)
# d) [MACHINE_GUID] section (below, settings for each machine)
@@ -141,8 +140,8 @@
# 3 possible values:
# yes enable alarms
# no do not enable alarms
- # auto enable alarms, only when the sending netdata is connected. For ephemeral slaves or slave system restarts,
- # ensure that the netdata process on the slave is gracefully stopped, to prevent invalid last_collected alarms
+ # auto enable alarms, only when the sending netdata is connected. For ephemeral child nodes or child system restarts,
+ # ensure that the netdata process on the child is gracefully stopped, to prevent invalid last_collected alarms
# You can also set it per host, below.
# The default is taken from [health].enabled of netdata.conf
health enabled by default = auto
@@ -150,13 +149,6 @@
# postpone alarms for a short period after the sender is connected
default postpone alarms on connect seconds = 60
- # allow or deny multiple connections for the same host?
- # If you are sure all your netdata have their own machine GUID,
- # set this to 'allow', since it allows faster reconnects.
- # When set to 'deny', new connections for a host will not be
- # accepted until an existing connection is cleared.
- multiple connections = allow
-
# need to route metrics differently? set these.
# the defaults are the ones at the [stream] section (above)
#default proxy enabled = yes | no
@@ -166,22 +158,22 @@
# -----------------------------------------------------------------------------
-# 3. PER SENDING HOST SETTINGS, ON MASTER NETDATA
+# 3. PER SENDING HOST SETTINGS, ON PARENT NETDATA
# THIS IS OPTIONAL - YOU DON'T HAVE TO CONFIGURE IT
-# This section exists to give you finer control of the master settings for each
-# slave host, when the same API key is used by many netdata slaves / proxies.
+# This section exists to give you finer control of the parent settings for each
+# child host, when the same API key is used by many netdata child nodes / proxies.
#
# Each netdata has a unique GUID - generated the first time netdata starts.
# You can find it at /var/lib/netdata/registry/netdata.public.unique.id
-# (at the slave).
+# (at the child).
#
# The host sending data will have one. If the host is not ephemeral,
# you can give settings for each sending host here.
[MACHINE_GUID]
# enable this host: yes | no
- # When disabled, the master will not receive metrics for this host.
+ # When disabled, the parent will not receive metrics for this host.
# THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID.
# Use only the API key for security.
enabled = no
@@ -205,13 +197,6 @@
# postpone alarms when the sender connects
postpone alarms on connect seconds = 60
- # allow or deny multiple connections for the same host?
- # If you are sure all your netdata have their own machine GUID,
- # set this to 'allow', since it allows faster reconnects.
- # When set to 'deny', new connections for a host will not be
- # accepted until an existing connection is cleared.
- multiple connections = allow
-
# need to route metrics differently?
# the defaults are the ones at the [API KEY] section
#proxy enabled = yes | no