diff options
-rw-r--r-- | Makefile.am | 4 | ||||
-rw-r--r-- | Makefile.in | 22 | ||||
-rw-r--r-- | config.h.in | 4 | ||||
-rwxr-xr-x | configure | 179 | ||||
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | dns.cc | 15 | ||||
-rw-r--r-- | dnsdist-backend.cc | 3 | ||||
-rw-r--r-- | dnsdist-lua-bindings.cc | 7 | ||||
-rw-r--r-- | dnsdist-lua-ffi.cc | 2 | ||||
-rw-r--r-- | dnsdist-lua-hooks.cc | 23 | ||||
-rw-r--r-- | dnsdist-lua-hooks.hh | 2 | ||||
-rw-r--r-- | dnsdist-lua.cc | 11 | ||||
-rw-r--r-- | dnsdist-lua.hh | 1 | ||||
-rw-r--r-- | dnsdist-web.cc | 8 | ||||
-rw-r--r-- | dnsdist-xsk.cc | 26 | ||||
-rw-r--r-- | dnsdist.1 | 2 | ||||
-rw-r--r-- | dnsdist.cc | 6 | ||||
-rw-r--r-- | dnsdist.hh | 3 | ||||
-rw-r--r-- | doq.cc | 13 | ||||
-rw-r--r-- | libssl.cc | 23 | ||||
-rw-r--r-- | libssl.hh | 3 | ||||
-rw-r--r-- | lock.hh | 105 | ||||
-rw-r--r-- | m4/pdns_with_quiche.m4 | 11 | ||||
-rw-r--r-- | tcpiohandler.cc | 39 | ||||
-rw-r--r-- | tcpiohandler.hh | 18 | ||||
-rw-r--r-- | test-dnsdistlbpolicies_cc.cc | 2 | ||||
-rw-r--r-- | views.hh | 24 | ||||
-rw-r--r-- | xsk.cc | 164 | ||||
-rw-r--r-- | xsk.hh | 61 |
29 files changed, 599 insertions, 184 deletions
diff --git a/Makefile.am b/Makefile.am index faaeefd..59aad1a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -439,14 +439,14 @@ endif if HAVE_DNS_OVER_TLS if HAVE_GNUTLS -dnsdist_LDADD += -lgnutls +dnsdist_LDADD += $(GNUTLS_LIBS) endif endif if HAVE_DNS_OVER_HTTPS if HAVE_GNUTLS -dnsdist_LDADD += -lgnutls +dnsdist_LDADD += $(GNUTLS_LIBS) endif if HAVE_LIBH2OEVLOOP diff --git a/Makefile.in b/Makefile.in index b21b7d0..7285ab1 100644 --- a/Makefile.in +++ b/Makefile.in @@ -118,8 +118,8 @@ bin_PROGRAMS = dnsdist$(EXEEXT) $(am__EXEEXT_3) @HAVE_LMDB_TRUE@am__append_22 = $(LMDB_LDFLAGS) $(LMDB_LIBS) @HAVE_LMDB_TRUE@am__append_23 = ext/lmdb-safe/lmdb-safe.cc ext/lmdb-safe/lmdb-safe.hh @HAVE_LMDB_TRUE@am__append_24 = ext/lmdb-safe/lmdb-safe.cc ext/lmdb-safe/lmdb-safe.hh -@HAVE_DNS_OVER_TLS_TRUE@@HAVE_GNUTLS_TRUE@am__append_25 = -lgnutls -@HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_GNUTLS_TRUE@am__append_26 = -lgnutls +@HAVE_DNS_OVER_TLS_TRUE@@HAVE_GNUTLS_TRUE@am__append_25 = $(GNUTLS_LIBS) +@HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_GNUTLS_TRUE@am__append_26 = $(GNUTLS_LIBS) @HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_LIBH2OEVLOOP_TRUE@am__append_27 = doh.cc @HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_LIBH2OEVLOOP_TRUE@am__append_28 = $(LIBH2OEVLOOP_LIBS) @HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_NGHTTP2_TRUE@am__append_29 = dnsdist-nghttp2-in.cc \ @@ -379,9 +379,11 @@ am__DEPENDENCIES_1 = @HAVE_LIBCRYPTO_TRUE@am__DEPENDENCIES_5 = $(am__DEPENDENCIES_1) \ @HAVE_LIBCRYPTO_TRUE@ $(am__DEPENDENCIES_1) @HAVE_LMDB_TRUE@am__DEPENDENCIES_6 = $(am__DEPENDENCIES_1) -@HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_LIBH2OEVLOOP_TRUE@am__DEPENDENCIES_7 = $(am__DEPENDENCIES_1) -@HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_NGHTTP2_TRUE@am__DEPENDENCIES_8 = $(am__DEPENDENCIES_1) -@HAVE_QUICHE_TRUE@am__DEPENDENCIES_9 = $(am__DEPENDENCIES_1) +@HAVE_DNS_OVER_TLS_TRUE@@HAVE_GNUTLS_TRUE@am__DEPENDENCIES_7 = $(am__DEPENDENCIES_1) +@HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_GNUTLS_TRUE@am__DEPENDENCIES_8 = $(am__DEPENDENCIES_1) +@HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_LIBH2OEVLOOP_TRUE@am__DEPENDENCIES_9 = $(am__DEPENDENCIES_1) +@HAVE_DNS_OVER_HTTPS_TRUE@@HAVE_NGHTTP2_TRUE@am__DEPENDENCIES_10 = $(am__DEPENDENCIES_1) +@HAVE_QUICHE_TRUE@am__DEPENDENCIES_11 = $(am__DEPENDENCIES_1) dnsdist_DEPENDENCIES = $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_1) \ $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_1) \ $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_1) \ @@ -390,9 +392,9 @@ dnsdist_DEPENDENCIES = $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_1) \ $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_2) \ $(am__DEPENDENCIES_3) $(am__DEPENDENCIES_4) \ $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_5) \ - $(am__DEPENDENCIES_6) $(am__DEPENDENCIES_1) \ - $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_7) \ - $(am__DEPENDENCIES_8) $(am__DEPENDENCIES_9) + $(am__DEPENDENCIES_6) $(am__DEPENDENCIES_7) \ + $(am__DEPENDENCIES_8) $(am__DEPENDENCIES_9) \ + $(am__DEPENDENCIES_10) $(am__DEPENDENCIES_11) AM_V_lt = $(am__v_lt_@AM_V@) am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@) am__v_lt_0 = --silent @@ -423,7 +425,7 @@ am__fuzz_target_dnsdistcache_SOURCES_DIST = channel.hh channel.cc \ @FUZZ_TARGETS_TRUE@ qtype.$(OBJEXT) svc-records.$(OBJEXT) fuzz_target_dnsdistcache_OBJECTS = \ $(am_fuzz_target_dnsdistcache_OBJECTS) -@FUZZ_TARGETS_TRUE@am__DEPENDENCIES_10 = $(am__DEPENDENCIES_1) \ +@FUZZ_TARGETS_TRUE@am__DEPENDENCIES_12 = $(am__DEPENDENCIES_1) \ @FUZZ_TARGETS_TRUE@ $(am__DEPENDENCIES_1) fuzz_target_dnsdistcache_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CXX \ $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CXXLD) \ @@ -562,7 +564,7 @@ testrunner_DEPENDENCIES = $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_1) \ $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_1) \ $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_2) \ $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_5) \ - $(am__DEPENDENCIES_6) $(am__DEPENDENCIES_8) + $(am__DEPENDENCIES_6) $(am__DEPENDENCIES_10) testrunner_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CXX $(AM_LIBTOOLFLAGS) \ $(LIBTOOLFLAGS) --mode=link $(CXXLD) $(AM_CXXFLAGS) \ $(CXXFLAGS) $(testrunner_LDFLAGS) $(LDFLAGS) -o $@ diff --git a/config.h.in b/config.h.in index bce954e..39bf050 100644 --- a/config.h.in +++ b/config.h.in @@ -243,6 +243,10 @@ /* Define to 1 if you have quiche */ #undef HAVE_QUICHE +/* Define to 1 if the Quiche API includes error code in + quiche_conn_stream_recv and quiche_conn_stream_send */ +#undef HAVE_QUICHE_STREAM_ERROR_CODES + /* Define to 1 if you have the `randombytes_stir' function. */ #undef HAVE_RANDOMBYTES_STIR @@ -1,6 +1,6 @@ #! /bin/sh # Guess values for system-dependent variables and create Makefiles. -# Generated by GNU Autoconf 2.71 for dnsdist 1.9.5. +# Generated by GNU Autoconf 2.71 for dnsdist 1.9.6. # # # Copyright (C) 1992-1996, 1998-2017, 2020-2021 Free Software Foundation, @@ -618,8 +618,8 @@ MAKEFLAGS= # Identity of this package. PACKAGE_NAME='dnsdist' PACKAGE_TARNAME='dnsdist' -PACKAGE_VERSION='1.9.5' -PACKAGE_STRING='dnsdist 1.9.5' +PACKAGE_VERSION='1.9.6' +PACKAGE_STRING='dnsdist 1.9.6' PACKAGE_BUGREPORT='' PACKAGE_URL='' @@ -1645,7 +1645,7 @@ if test "$ac_init_help" = "long"; then # Omit some internal or obsolete options to make the list less imposing. # This message is too long to be a string in the A/UX 3.1 sh. cat <<_ACEOF -\`configure' configures dnsdist 1.9.5 to adapt to many kinds of systems. +\`configure' configures dnsdist 1.9.6 to adapt to many kinds of systems. Usage: $0 [OPTION]... [VAR=VALUE]... @@ -1716,7 +1716,7 @@ fi if test -n "$ac_init_help"; then case $ac_init_help in - short | recursive ) echo "Configuration of dnsdist 1.9.5:";; + short | recursive ) echo "Configuration of dnsdist 1.9.6:";; esac cat <<\_ACEOF @@ -1951,7 +1951,7 @@ fi test -n "$ac_init_help" && exit $ac_status if $ac_init_version; then cat <<\_ACEOF -dnsdist configure 1.9.5 +dnsdist configure 1.9.6 generated by GNU Autoconf 2.71 Copyright (C) 2021 Free Software Foundation, Inc. @@ -2440,7 +2440,7 @@ cat >config.log <<_ACEOF This file contains any messages produced by compilers while running configure, to aid debugging if configure makes a mistake. -It was created by dnsdist $as_me 1.9.5, which was +It was created by dnsdist $as_me 1.9.6, which was generated by GNU Autoconf 2.71. Invocation command line was $ $0$ac_configure_args_raw @@ -3932,7 +3932,7 @@ fi # Define the identity of the package. PACKAGE='dnsdist' - VERSION='1.9.5' + VERSION='1.9.6' printf "%s\n" "#define PACKAGE \"$PACKAGE\"" >>confdefs.h @@ -17767,6 +17767,149 @@ then : pkg_failed=no +{ printf "%s\n" "$as_me:${as_lineno-$LINENO}: checking for quiche >= 0.22.0" >&5 +printf %s "checking for quiche >= 0.22.0... " >&6; } + +if test -n "$QUICHE_CFLAGS"; then + pkg_cv_QUICHE_CFLAGS="$QUICHE_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { printf "%s\n" "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"quiche >= 0.22.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "quiche >= 0.22.0") 2>&5 + ac_status=$? + printf "%s\n" "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_QUICHE_CFLAGS=`$PKG_CONFIG --cflags "quiche >= 0.22.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$QUICHE_LIBS"; then + pkg_cv_QUICHE_LIBS="$QUICHE_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { printf "%s\n" "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"quiche >= 0.22.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "quiche >= 0.22.0") 2>&5 + ac_status=$? + printf "%s\n" "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_QUICHE_LIBS=`$PKG_CONFIG --libs "quiche >= 0.22.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: no" >&5 +printf "%s\n" "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + QUICHE_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "quiche >= 0.22.0" 2>&1` + else + QUICHE_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "quiche >= 0.22.0" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$QUICHE_PKG_ERRORS" >&5 + + + # Quiche is older than 0.22.0, or no Quiche at all + +pkg_failed=no +{ printf "%s\n" "$as_me:${as_lineno-$LINENO}: checking for quiche >= 0.15.0" >&5 +printf %s "checking for quiche >= 0.15.0... " >&6; } + +if test -n "$QUICHE_CFLAGS"; then + pkg_cv_QUICHE_CFLAGS="$QUICHE_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { printf "%s\n" "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"quiche >= 0.15.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "quiche >= 0.15.0") 2>&5 + ac_status=$? + printf "%s\n" "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_QUICHE_CFLAGS=`$PKG_CONFIG --cflags "quiche >= 0.15.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$QUICHE_LIBS"; then + pkg_cv_QUICHE_LIBS="$QUICHE_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { printf "%s\n" "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"quiche >= 0.15.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "quiche >= 0.15.0") 2>&5 + ac_status=$? + printf "%s\n" "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_QUICHE_LIBS=`$PKG_CONFIG --libs "quiche >= 0.15.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: no" >&5 +printf "%s\n" "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + QUICHE_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "quiche >= 0.15.0" 2>&1` + else + QUICHE_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "quiche >= 0.15.0" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$QUICHE_PKG_ERRORS" >&5 + + : +elif test $pkg_failed = untried; then + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: no" >&5 +printf "%s\n" "no" >&6; } + : +else + QUICHE_CFLAGS=$pkg_cv_QUICHE_CFLAGS + QUICHE_LIBS=$pkg_cv_QUICHE_LIBS + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +printf "%s\n" "yes" >&6; } + + HAVE_QUICHE=1 + +printf "%s\n" "#define HAVE_QUICHE 1" >>confdefs.h + + +fi + +elif test $pkg_failed = untried; then + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: no" >&5 +printf "%s\n" "no" >&6; } + + # Quiche is older than 0.22.0, or no Quiche at all + +pkg_failed=no { printf "%s\n" "$as_me:${as_lineno-$LINENO}: checking for quiche >= 0.15.0" >&5 printf %s "checking for quiche >= 0.15.0... " >&6; } @@ -17835,11 +17978,27 @@ else { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: yes" >&5 printf "%s\n" "yes" >&6; } + HAVE_QUICHE=1 + +printf "%s\n" "#define HAVE_QUICHE 1" >>confdefs.h + + +fi + +else + QUICHE_CFLAGS=$pkg_cv_QUICHE_CFLAGS + QUICHE_LIBS=$pkg_cv_QUICHE_LIBS + { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +printf "%s\n" "yes" >&6; } + HAVE_QUICHE=1 printf "%s\n" "#define HAVE_QUICHE 1" >>confdefs.h +printf "%s\n" "#define HAVE_QUICHE_STREAM_ERROR_CODES 1" >>confdefs.h + + fi fi @@ -28149,7 +28308,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1 # report actual input values of CONFIG_FILES etc. instead of their # values after options handling. ac_log=" -This file was extended by dnsdist $as_me 1.9.5, which was +This file was extended by dnsdist $as_me 1.9.6, which was generated by GNU Autoconf 2.71. Invocation command line was CONFIG_FILES = $CONFIG_FILES @@ -28217,7 +28376,7 @@ ac_cs_config_escaped=`printf "%s\n" "$ac_cs_config" | sed "s/^ //; s/'/'\\\\\\\\ cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1 ac_cs_config='$ac_cs_config_escaped' ac_cs_version="\\ -dnsdist config.status 1.9.5 +dnsdist config.status 1.9.6 configured by $0, generated by GNU Autoconf 2.71, with options \\"\$ac_cs_config\\" diff --git a/configure.ac b/configure.ac index eb34476..b2d4206 100644 --- a/configure.ac +++ b/configure.ac @@ -1,6 +1,6 @@ AC_PREREQ([2.69]) -AC_INIT([dnsdist], [1.9.5]) +AC_INIT([dnsdist], [1.9.6]) AM_INIT_AUTOMAKE([foreign tar-ustar dist-bzip2 no-dist-gzip parallel-tests 1.11 subdir-objects]) AM_SILENT_RULES([yes]) AC_CONFIG_MACRO_DIR([m4]) @@ -24,6 +24,7 @@ #endif #include "dns.hh" #include "misc.hh" +#include "views.hh" #include <stdexcept> #include <iostream> #include <boost/algorithm/string.hpp> @@ -102,27 +103,27 @@ std::string Opcode::to_s(uint8_t opcode) { } // goal is to hash based purely on the question name, and turn error into 'default' -uint32_t hashQuestion(const uint8_t* packet, uint16_t packet_len, uint32_t init, bool& ok) +uint32_t hashQuestion(const uint8_t* packet, uint16_t packet_len, uint32_t init, bool& wasOK) { if (packet_len < sizeof(dnsheader)) { - ok = false; + wasOK = false; return init; } - // C++ 17 does not have std::u8string_view - std::basic_string_view<uint8_t> name(packet + sizeof(dnsheader), packet_len - sizeof(dnsheader)); - std::basic_string_view<uint8_t>::size_type len = 0; + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + pdns::views::UnsignedCharView name(packet + sizeof(dnsheader), packet_len - sizeof(dnsheader)); + pdns::views::UnsignedCharView::size_type len = 0; while (len < name.length()) { uint8_t labellen = name[len++]; if (labellen == 0) { - ok = true; + wasOK = true; // len is name.length() at max as it was < before the increment return burtleCI(name.data(), len, init); } len += labellen; } // We've encountered a label that is too long - ok = false; + wasOK = false; return init; } diff --git a/dnsdist-backend.cc b/dnsdist-backend.cc index 7f56034..d59a5df 100644 --- a/dnsdist-backend.cc +++ b/dnsdist-backend.cc @@ -905,10 +905,9 @@ void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks) d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress(); for (auto& xsk : d_xskSockets) { - auto xskInfo = XskWorker::create(); + auto xskInfo = XskWorker::create(XskWorker::Type::Bidirectional, xsk->sharedEmptyFrameOffset); d_xskInfos.push_back(xskInfo); xsk->addWorker(xskInfo); - xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset; } reconnect(false); } diff --git a/dnsdist-lua-bindings.cc b/dnsdist-lua-bindings.cc index 3f5d6e2..ffed8f0 100644 --- a/dnsdist-lua-bindings.cc +++ b/dnsdist-lua-bindings.cc @@ -31,8 +31,7 @@ #include "dolog.hh" #include "xsk.hh" -// NOLINTNEXTLINE(readability-function-cognitive-complexity): this function declares Lua bindings, even with a good refactoring it will likely blow up the threshold -void setupLuaBindings(LuaContext& luaCtx, bool client, bool configCheck) +void setupLuaBindingsLogging(LuaContext& luaCtx) { luaCtx.writeFunction("vinfolog", [](const string& arg) { vinfolog("%s", arg); @@ -50,7 +49,11 @@ void setupLuaBindings(LuaContext& luaCtx, bool client, bool configCheck) g_outputBuffer+=arg; g_outputBuffer+="\n"; }); +} +// NOLINTNEXTLINE(readability-function-cognitive-complexity): this function declares Lua bindings, even with a good refactoring it will likely blow up the threshold +void setupLuaBindings(LuaContext& luaCtx, bool client, bool configCheck) +{ /* Exceptions */ luaCtx.registerFunction<string(std::exception_ptr::*)()const>("__tostring", [](const std::exception_ptr& eptr) -> std::string { try { diff --git a/dnsdist-lua-ffi.cc b/dnsdist-lua-ffi.cc index 8ab36f5..118d0ce 100644 --- a/dnsdist-lua-ffi.cc +++ b/dnsdist-lua-ffi.cc @@ -1012,6 +1012,7 @@ void setupLuaLoadBalancingContext(LuaContext& luaCtx) { setupLuaBindings(luaCtx, true, false); setupLuaBindingsDNSQuestion(luaCtx); + setupLuaBindingsLogging(luaCtx); setupLuaBindingsKVS(luaCtx, true); setupLuaVars(luaCtx); @@ -1023,6 +1024,7 @@ void setupLuaLoadBalancingContext(LuaContext& luaCtx) void setupLuaFFIPerThreadContext(LuaContext& luaCtx) { setupLuaVars(luaCtx); + setupLuaBindingsLogging(luaCtx); #ifdef LUAJIT_VERSION luaCtx.executeCode(getLuaFFIWrappers()); diff --git a/dnsdist-lua-hooks.cc b/dnsdist-lua-hooks.cc index c5ccb48..2904cd3 100644 --- a/dnsdist-lua-hooks.cc +++ b/dnsdist-lua-hooks.cc @@ -2,9 +2,13 @@ #include "dnsdist-lua-hooks.hh" #include "dnsdist-lua.hh" #include "lock.hh" +#include "tcpiohandler.hh" namespace dnsdist::lua::hooks { +using MaintenanceCallback = std::function<void()>; +using TicketsKeyAddedHook = std::function<void(const char*, size_t)>; + static LockGuarded<std::vector<MaintenanceCallback>> s_maintenanceHooks; void runMaintenanceHooks(const LuaContext& context) @@ -15,7 +19,7 @@ void runMaintenanceHooks(const LuaContext& context) } } -void addMaintenanceCallback(const LuaContext& context, MaintenanceCallback callback) +static void addMaintenanceCallback(const LuaContext& context, MaintenanceCallback callback) { (void)context; s_maintenanceHooks.lock()->push_back(std::move(callback)); @@ -26,12 +30,29 @@ void clearMaintenanceHooks() s_maintenanceHooks.lock()->clear(); } +static void setTicketsKeyAddedHook(const LuaContext& context, const TicketsKeyAddedHook& hook) +{ + TLSCtx::setTicketsKeyAddedHook([hook](const std::string& key) { + try { + auto lua = g_lua.lock(); + hook(key.c_str(), key.size()); + } + catch (const std::exception& exp) { + warnlog("Error calling the Lua hook after new tickets key has been added: %s", exp.what()); + } + }); +} + void setupLuaHooks(LuaContext& luaCtx) { luaCtx.writeFunction("addMaintenanceCallback", [&luaCtx](const MaintenanceCallback& callback) { setLuaSideEffect(); addMaintenanceCallback(luaCtx, callback); }); + luaCtx.writeFunction("setTicketsKeyAddedHook", [&luaCtx](const TicketsKeyAddedHook& hook) { + setLuaSideEffect(); + setTicketsKeyAddedHook(luaCtx, hook); + }); } } diff --git a/dnsdist-lua-hooks.hh b/dnsdist-lua-hooks.hh index 11a9084..e35c0f1 100644 --- a/dnsdist-lua-hooks.hh +++ b/dnsdist-lua-hooks.hh @@ -27,9 +27,7 @@ class LuaContext; namespace dnsdist::lua::hooks { -using MaintenanceCallback = std::function<void()>; void runMaintenanceHooks(const LuaContext& context); -void addMaintenanceCallback(const LuaContext& context, MaintenanceCallback callback); void clearMaintenanceHooks(); void setupLuaHooks(LuaContext& luaCtx); } diff --git a/dnsdist-lua.cc b/dnsdist-lua.cc index 73a8567..4e0400d 100644 --- a/dnsdist-lua.cc +++ b/dnsdist-lua.cc @@ -809,10 +809,11 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) std::shared_ptr<XskSocket> socket; parseXskVars(vars, socket); if (socket) { - udpCS->xskInfo = XskWorker::create(); - udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset; + udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional, socket->sharedEmptyFrameOffset); socket->addWorker(udpCS->xskInfo); socket->addWorkerRoute(udpCS->xskInfo, loc); + udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly, socket->sharedEmptyFrameOffset); + socket->addWorker(udpCS->xskInfoResponder); vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort()); } #endif /* HAVE_XSK */ @@ -863,10 +864,11 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) std::shared_ptr<XskSocket> socket; parseXskVars(vars, socket); if (socket) { - udpCS->xskInfo = XskWorker::create(); - udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset; + udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional, socket->sharedEmptyFrameOffset); socket->addWorker(udpCS->xskInfo); socket->addWorkerRoute(udpCS->xskInfo, loc); + udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly, socket->sharedEmptyFrameOffset); + socket->addWorker(udpCS->xskInfoResponder); vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort()); } #endif /* HAVE_XSK */ @@ -3482,6 +3484,7 @@ vector<std::function<void(void)>> setupLua(LuaContext& luaCtx, bool client, bool setupLuaBindingsDNSParser(luaCtx); setupLuaBindingsDNSQuestion(luaCtx); setupLuaBindingsKVS(luaCtx, client); + setupLuaBindingsLogging(luaCtx); setupLuaBindingsNetwork(luaCtx, client); setupLuaBindingsPacketCache(luaCtx, client); setupLuaBindingsProtoBuf(luaCtx, client, configCheck); diff --git a/dnsdist-lua.hh b/dnsdist-lua.hh index fed468c..fc99e20 100644 --- a/dnsdist-lua.hh +++ b/dnsdist-lua.hh @@ -169,6 +169,7 @@ void setupLuaBindingsDNSCrypt(LuaContext& luaCtx, bool client); void setupLuaBindingsDNSParser(LuaContext& luaCtx); void setupLuaBindingsDNSQuestion(LuaContext& luaCtx); void setupLuaBindingsKVS(LuaContext& luaCtx, bool client); +void setupLuaBindingsLogging(LuaContext& luaCtx); void setupLuaBindingsNetwork(LuaContext& luaCtx, bool client); void setupLuaBindingsPacketCache(LuaContext& luaCtx, bool client); void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck); diff --git a/dnsdist-web.cc b/dnsdist-web.cc index 84ea079..000579e 100644 --- a/dnsdist-web.cc +++ b/dnsdist-web.cc @@ -474,6 +474,7 @@ static void handlePrometheus(const YaHTTP::Request& req, YaHTTP::Response& resp) static const std::set<std::string> metricBlacklist = { "special-memory-usage", "latency-count", "latency-sum" }; { auto entries = dnsdist::metrics::g_stats.entries.read_lock(); + std::unordered_set<std::string> helpAndTypeSent; for (const auto& entry : *entries) { const auto& metricName = entry.d_name; @@ -505,8 +506,11 @@ static void handlePrometheus(const YaHTTP::Request& req, YaHTTP::Response& resp) // for these we have the help and types encoded in the sources // but we need to be careful about labels in custom metrics std::string helpName = prometheusMetricName.substr(0, prometheusMetricName.find('{')); - output << "# HELP " << helpName << " " << metricDetails.description << "\n"; - output << "# TYPE " << helpName << " " << prometheusTypeName << "\n"; + if (helpAndTypeSent.count(helpName) == 0) { + helpAndTypeSent.insert(helpName); + output << "# HELP " << helpName << " " << metricDetails.description << "\n"; + output << "# TYPE " << helpName << " " << prometheusTypeName << "\n"; + } output << prometheusMetricName << " "; if (const auto& val = std::get_if<pdns::stat_t*>(&entry.d_value)) { diff --git a/dnsdist-xsk.cc b/dnsdist-xsk.cc index 058e381..7e83c51 100644 --- a/dnsdist-xsk.cc +++ b/dnsdist-xsk.cc @@ -48,11 +48,7 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<Xs if ((pollfds[0].revents & POLLIN) != 0) { needNotify = true; xskInfo->cleanSocketNotification(); -#if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif + xskInfo->processIncomingFrames([&](XskPacket& packet) { if (packet.getDataLen() < sizeof(dnsheader)) { xskInfo->markAsFree(packet); return; @@ -77,7 +73,7 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<Xs } if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { xskInfo->markAsFree(packet); - infolog("XSK packet pushed to queue because processResponderPacket failed"); + vinfolog("XSK packet dropped because processResponderPacket failed"); return; } if (response.size() > packet.getCapacity()) { @@ -171,11 +167,7 @@ void XskRouter(std::shared_ptr<XskSocket> xsk) if ((fds.at(fdIndex).revents & POLLIN) != 0) { ready--; const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd); -#if defined(__SANITIZE_THREAD__) - info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif + info->processOutgoingFrames([&](XskPacket& packet) { if ((packet.getFlags() & XskPacket::UPDATE) == 0) { xsk->markAsFree(packet); return; @@ -207,18 +199,10 @@ void XskClientThread(ClientState* clientState) LocalHolders holders; for (;;) { -#if defined(__SANITIZE_THREAD__) - while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) { -#else - while (xskInfo->incomingPacketsQueue.read_available() == 0U) { -#endif + while (!xskInfo->hasIncomingFrames()) { xskInfo->waitForXskSocket(); } -#if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif + xskInfo->processIncomingFrames([&](XskPacket& packet) { if (XskProcessQuery(*clientState, holders, packet)) { packet.updatePacket(); xskInfo->pushToSendQueue(packet); @@ -27,7 +27,7 @@ level margin: \\n[rst2man-indent\\n[rst2man-indent-level]] .\" new: \\n[rst2man-indent\\n[rst2man-indent-level]] .in \\n[rst2man-indent\\n[rst2man-indent-level]]u .. -.TH "DNSDIST" "1" "Jun 20, 2024" "" "dnsdist" +.TH "DNSDIST" "1" "Jul 15, 2024" "" "dnsdist" .SH NAME dnsdist \- A DNS and DoS aware, scriptable loadbalancer .SH SYNOPSIS @@ -861,9 +861,9 @@ void responderThread(std::shared_ptr<DownstreamState> dss) continue; } - if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfo) { + if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfoResponder) { #ifdef HAVE_XSK - auto& xskInfo = ids->cs->xskInfo; + auto& xskInfo = ids->cs->xskInfoResponder; auto xskPacket = xskInfo->getEmptyFrame(); if (!xskPacket) { continue; @@ -897,7 +897,7 @@ catch (...) { } } -LockGuarded<LuaContext> g_lua{LuaContext()}; +RecursiveLockGuarded<LuaContext> g_lua{LuaContext()}; ComboAddress g_serverControl{"127.0.0.1:5199"}; @@ -516,6 +516,7 @@ struct ClientState std::shared_ptr<DOH3Frontend> doh3Frontend{nullptr}; std::shared_ptr<BPFFilter> d_filter{nullptr}; std::shared_ptr<XskWorker> xskInfo{nullptr}; + std::shared_ptr<XskWorker> xskInfoResponder{nullptr}; size_t d_maxInFlightQueriesPerConn{1}; size_t d_tcpConcurrentConnectionsLimit{0}; int udpFD{-1}; @@ -1026,7 +1027,7 @@ public: using servers_t = vector<std::shared_ptr<DownstreamState>>; void responderThread(std::shared_ptr<DownstreamState> state); -extern LockGuarded<LuaContext> g_lua; +extern RecursiveLockGuarded<LuaContext> g_lua; extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex class DNSRule @@ -267,7 +267,12 @@ static bool tryWriteResponse(Connection& conn, const uint64_t streamID, PacketBu { size_t pos = 0; while (pos < response.size()) { +#ifdef HAVE_QUICHE_STREAM_ERROR_CODES + uint64_t quicheErrorCode{0}; + auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, &response.at(pos), response.size() - pos, true, &quicheErrorCode); +#else auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, &response.at(pos), response.size() - pos, true); +#endif if (res == QUICHE_ERR_DONE) { response.erase(response.begin(), response.begin() + static_cast<ssize_t>(pos)); return false; @@ -606,9 +611,17 @@ static void handleReadableStream(DOQFrontend& frontend, ClientState& clientState bool fin = false; auto existingLength = streamBuffer.size(); streamBuffer.resize(existingLength + 512); +#ifdef HAVE_QUICHE_STREAM_ERROR_CODES + uint64_t quicheErrorCode{0}; + auto received = quiche_conn_stream_recv(conn.d_conn.get(), streamID, + &streamBuffer.at(existingLength), 512, + &fin, + &quicheErrorCode); +#else auto received = quiche_conn_stream_recv(conn.d_conn.get(), streamID, &streamBuffer.at(existingLength), 512, &fin); +#endif if (received == 0 || received == QUICHE_ERR_DONE) { streamBuffer.resize(existingLength); return; @@ -12,9 +12,11 @@ #include <pthread.h> #include <openssl/conf.h> +#if OPENSSL_VERSION_MAJOR < 3 || !defined(HAVE_TLS_PROVIDERS) #ifndef OPENSSL_NO_ENGINE #include <openssl/engine.h> #endif +#endif #include <openssl/err.h> #ifndef DISABLE_OCSP_STAPLING #include <openssl/ocsp.h> @@ -42,6 +44,7 @@ #undef CERT #include "misc.hh" +#include "tcpiohandler.hh" #if (OPENSSL_VERSION_NUMBER < 0x1010000fL || (defined LIBRESSL_VERSION_NUMBER) && LIBRESSL_VERSION_NUMBER < 0x2090100fL) /* OpenSSL < 1.1.0 needs support for threading/locking in the calling application. */ @@ -631,6 +634,13 @@ OpenSSLTLSTicketKeysRing::~OpenSSLTLSTicketKeysRing() = default; void OpenSSLTLSTicketKeysRing::addKey(std::shared_ptr<OpenSSLTLSTicketKey>&& newKey) { d_ticketKeys.write_lock()->push_front(std::move(newKey)); + if (TLSCtx::hasTicketsKeyAddedHook()) { + auto key = d_ticketKeys.read_lock()->front(); + auto keyContent = key->content(); + TLSCtx::getTicketsKeyAddedHook()(keyContent); + // fills mem with 0's + OPENSSL_cleanse(keyContent.data(), keyContent.size()); + } } std::shared_ptr<OpenSSLTLSTicketKey> OpenSSLTLSTicketKeysRing::getEncryptionKey() @@ -737,6 +747,19 @@ bool OpenSSLTLSTicketKey::nameMatches(const unsigned char name[TLS_TICKETS_KEY_N return (memcmp(d_name, name, sizeof(d_name)) == 0); } +std::string OpenSSLTLSTicketKey::content() const +{ + std::string result{}; + result.reserve(TLS_TICKETS_KEY_NAME_SIZE + TLS_TICKETS_CIPHER_KEY_SIZE + TLS_TICKETS_MAC_KEY_SIZE); + // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) + result.append(reinterpret_cast<const char*>(d_name), TLS_TICKETS_KEY_NAME_SIZE); + result.append(reinterpret_cast<const char*>(d_cipherKey), TLS_TICKETS_CIPHER_KEY_SIZE); + result.append(reinterpret_cast<const char*>(d_hmacKey), TLS_TICKETS_MAC_KEY_SIZE); + // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) + + return result; +} + #if OPENSSL_VERSION_MAJOR >= 3 static const std::string sha256KeyName{"sha256"}; #endif @@ -105,6 +105,8 @@ public: bool decrypt(const unsigned char* iv, EVP_CIPHER_CTX* ectx, HMAC_CTX* hctx) const; #endif + [[nodiscard]] std::string content() const; + private: unsigned char d_name[TLS_TICKETS_KEY_NAME_SIZE]; unsigned char d_cipherKey[TLS_TICKETS_CIPHER_KEY_SIZE]; @@ -124,7 +126,6 @@ public: private: void addKey(std::shared_ptr<OpenSSLTLSTicketKey>&& newKey); - SharedLockGuarded<boost::circular_buffer<std::shared_ptr<OpenSSLTLSTicketKey> > > d_ticketKeys; }; @@ -298,6 +298,111 @@ private: }; template <typename T> +class RecursiveLockGuardedHolder +{ +public: + explicit RecursiveLockGuardedHolder(T& value, std::recursive_mutex& mutex) : + d_lock(mutex), d_value(value) + { + } + + T& operator*() const noexcept + { + return d_value; + } + + T* operator->() const noexcept + { + return &d_value; + } + +private: + std::lock_guard<std::recursive_mutex> d_lock; + T& d_value; +}; + +template <typename T> +class RecursiveLockGuardedTryHolder +{ +public: + explicit RecursiveLockGuardedTryHolder(T& value, std::recursive_mutex& mutex) : + d_lock(mutex, std::try_to_lock), d_value(value) + { + } + + T& operator*() const + { + if (!owns_lock()) { + throw std::runtime_error("Trying to access data protected by a mutex while the lock has not been acquired"); + } + return d_value; + } + + T* operator->() const + { + if (!owns_lock()) { + throw std::runtime_error("Trying to access data protected by a mutex while the lock has not been acquired"); + } + return &d_value; + } + + operator bool() const noexcept + { + return d_lock.owns_lock(); + } + + [[nodiscard]] bool owns_lock() const noexcept + { + return d_lock.owns_lock(); + } + + void lock() + { + d_lock.lock(); + } + +private: + std::unique_lock<std::recursive_mutex> d_lock; + T& d_value; +}; + +template <typename T> +class RecursiveLockGuarded +{ +public: + explicit RecursiveLockGuarded(const T& value) : + d_value(value) + { + } + + explicit RecursiveLockGuarded(T&& value) : + d_value(std::move(value)) + { + } + + explicit RecursiveLockGuarded() = default; + + RecursiveLockGuardedTryHolder<T> try_lock() + { + return RecursiveLockGuardedTryHolder<T>(d_value, d_mutex); + } + + RecursiveLockGuardedHolder<T> lock() + { + return RecursiveLockGuardedHolder<T>(d_value, d_mutex); + } + + RecursiveLockGuardedHolder<const T> read_only_lock() + { + return RecursiveLockGuardedHolder<const T>(d_value, d_mutex); + } + +private: + std::recursive_mutex d_mutex; + T d_value; +}; + +template <typename T> class SharedLockGuardedHolder { public: diff --git a/m4/pdns_with_quiche.m4 b/m4/pdns_with_quiche.m4 index 5c3297b..672fe0f 100644 --- a/m4/pdns_with_quiche.m4 +++ b/m4/pdns_with_quiche.m4 @@ -10,10 +10,17 @@ AC_DEFUN([PDNS_WITH_QUICHE], [ AS_IF([test "x$with_quiche" != "xno"], [ AS_IF([test "x$with_quiche" = "xyes" -o "x$with_quiche" = "xauto"], [ - PKG_CHECK_MODULES([QUICHE], [quiche >= 0.15.0], [ + PKG_CHECK_MODULES([QUICHE], [quiche >= 0.22.0], [ [HAVE_QUICHE=1] AC_DEFINE([HAVE_QUICHE], [1], [Define to 1 if you have quiche]) - ], [ : ]) + AC_DEFINE([HAVE_QUICHE_STREAM_ERROR_CODES], [1], [Define to 1 if the Quiche API includes error code in quiche_conn_stream_recv and quiche_conn_stream_send]) + ], [ + # Quiche is older than 0.22.0, or no Quiche at all + PKG_CHECK_MODULES([QUICHE], [quiche >= 0.15.0], [ + [HAVE_QUICHE=1] + AC_DEFINE([HAVE_QUICHE], [1], [Define to 1 if you have quiche]) + ], [ : ]) + ]) ]) ]) AM_CONDITIONAL([HAVE_QUICHE], [test "x$QUICHE_LIBS" != "x"]) diff --git a/tcpiohandler.cc b/tcpiohandler.cc index cf82471..edee311 100644 --- a/tcpiohandler.cc +++ b/tcpiohandler.cc @@ -11,6 +11,8 @@ const bool TCPIOHandler::s_disableConnectForUnitTests = false; #include <sodium.h> #endif /* HAVE_LIBSODIUM */ +TLSCtx::tickets_key_added_hook TLSCtx::s_ticketsKeyAddedHook{nullptr}; + #if defined(HAVE_DNS_OVER_TLS) || defined(HAVE_DNS_OVER_HTTPS) #ifdef HAVE_LIBSSL @@ -987,6 +989,16 @@ public: throw; } } + [[nodiscard]] std::string content() const + { + std::string result{}; + if (d_key.data != nullptr && d_key.size > 0) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + result.append(reinterpret_cast<const char*>(d_key.data), d_key.size); + safe_memory_lock(result.data(), result.size()); + } + return result; + } ~GnuTLSTicketsKey() { @@ -1730,14 +1742,12 @@ public: return connection; } - void rotateTicketsKey(time_t now) override + void addTicketsKey(time_t now, std::shared_ptr<GnuTLSTicketsKey>&& newKey) { if (!d_enableTickets) { return; } - auto newKey = std::make_shared<GnuTLSTicketsKey>(); - { *(d_ticketsKey.write_lock()) = std::move(newKey); } @@ -1745,8 +1755,23 @@ public: if (d_ticketsKeyRotationDelay > 0) { d_ticketsKeyNextRotation = now + d_ticketsKeyRotationDelay; } + + if (TLSCtx::hasTicketsKeyAddedHook()) { + auto ticketsKey = *(d_ticketsKey.read_lock()); + auto content = ticketsKey->content(); + TLSCtx::getTicketsKeyAddedHook()(content); + safe_memory_release(content.data(), content.size()); + } } + void rotateTicketsKey(time_t now) override + { + if (!d_enableTickets) { + return; + } + auto newKey = std::make_shared<GnuTLSTicketsKey>(); + addTicketsKey(now, std::move(newKey)); + } void loadTicketsKeys(const std::string& file) final { if (!d_enableTickets) { @@ -1754,13 +1779,7 @@ public: } auto newKey = std::make_shared<GnuTLSTicketsKey>(file); - { - *(d_ticketsKey.write_lock()) = std::move(newKey); - } - - if (d_ticketsKeyRotationDelay > 0) { - d_ticketsKeyNextRotation = time(nullptr) + d_ticketsKeyRotationDelay; - } + addTicketsKey(time(nullptr), std::move(newKey)); } size_t getTicketsKeysCount() override diff --git a/tcpiohandler.hh b/tcpiohandler.hh index 058d104..8420529 100644 --- a/tcpiohandler.hh +++ b/tcpiohandler.hh @@ -81,7 +81,6 @@ public: { throw std::runtime_error("This TLS backend does not have the capability to load a tickets key from a file"); } - void handleTicketsKeyRotation(time_t now) { if (d_ticketsKeyRotationDelay != 0 && now > d_ticketsKeyNextRotation) { @@ -124,10 +123,27 @@ public: return false; } + using tickets_key_added_hook = std::function<void(const std::string& key)>; + + static void setTicketsKeyAddedHook(const tickets_key_added_hook& hook) + { + TLSCtx::s_ticketsKeyAddedHook = hook; + } + static const tickets_key_added_hook& getTicketsKeyAddedHook() + { + return TLSCtx::s_ticketsKeyAddedHook; + } + static bool hasTicketsKeyAddedHook() + { + return TLSCtx::s_ticketsKeyAddedHook != nullptr; + } protected: std::atomic_flag d_rotatingTicketsKey; std::atomic<time_t> d_ticketsKeyNextRotation{0}; time_t d_ticketsKeyRotationDelay{0}; + +private: + static tickets_key_added_hook s_ticketsKeyAddedHook; }; class TLSFrontend diff --git a/test-dnsdistlbpolicies_cc.cc b/test-dnsdistlbpolicies_cc.cc index 401108f..4fc5e7b 100644 --- a/test-dnsdistlbpolicies_cc.cc +++ b/test-dnsdistlbpolicies_cc.cc @@ -15,7 +15,7 @@ uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()}; #include "ext/luawrapper/include/LuaContext.hpp" -LockGuarded<LuaContext> g_lua{LuaContext()}; +RecursiveLockGuarded<LuaContext> g_lua{LuaContext()}; bool g_snmpEnabled{false}; bool g_snmpTrapsEnabled{false}; @@ -33,21 +33,39 @@ public: view(data_, size_) { } - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast): No unsigned char view in C++17 + // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast): No unsigned char view in C++17 UnsignedCharView(const unsigned char* data_, size_t size_) : view(reinterpret_cast<const char*>(data_), size_) { } - const unsigned char& at(std::string_view::size_type pos) const + using size_type = std::string_view::size_type; + + [[nodiscard]] const unsigned char& at(size_type pos) const { return reinterpret_cast<const unsigned char&>(view.at(pos)); } - size_t size() const + [[nodiscard]] const unsigned char& operator[](size_type pos) const + { + return reinterpret_cast<const unsigned char&>(view[pos]); + } + + [[nodiscard]] const unsigned char* data() const + { + return reinterpret_cast<const unsigned char*>(view.data()); + } + // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast): No unsigned char view in C++17 + + [[nodiscard]] size_t size() const { return view.size(); } + [[nodiscard]] size_t length() const + { + return view.length(); + } + private: std::string_view view; }; @@ -82,20 +82,21 @@ struct UmemEntryStatus Status status{Status::Free}; }; -LockGuarded<std::unordered_map<uint64_t, UmemEntryStatus>> s_umems; +LockGuarded<std::map<std::pair<void*, uint64_t>, UmemEntryStatus>> s_umems; -void checkUmemIntegrity(const char* function, int line, uint64_t offset, const std::set<UmemEntryStatus::Status>& validStatuses, UmemEntryStatus::Status newStatus) +void checkUmemIntegrity(const char* function, int line, std::shared_ptr<LockGuarded<vector<uint64_t>>> vect, uint64_t offset, const std::set<UmemEntryStatus::Status>& validStatuses, UmemEntryStatus::Status newStatus) { auto umems = s_umems.lock(); - if (validStatuses.count(umems->at(offset).status) == 0) { - std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status is " << static_cast<int>(umems->at(offset).status) << ", expected: "; + auto& umemState = umems->at({vect.get(), offset}); + if (validStatuses.count(umemState.status) == 0) { + std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status of " << (void*)vect.get() << ", " << offset << " is " << static_cast<int>(umemState.status) << ", expected: "; for (const auto status : validStatuses) { std::cerr << static_cast<int>(status) << " "; } std::cerr << std::endl; abort(); } - (*umems)[offset].status = newStatus; + umemState.status = newStatus; } } #endif /* DEBUG_UMEM */ @@ -164,7 +165,7 @@ XskSocket::XskSocket(size_t frameNum_, std::string ifName_, uint32_t queue_id, c #ifdef DEBUG_UMEM { auto umems = s_umems.lock(); - (*umems)[idx * frameSize + XDP_PACKET_HEADROOM] = UmemEntryStatus(); + (*umems)[{sharedEmptyFrameOffset.get(), idx * frameSize + XDP_PACKET_HEADROOM}] = UmemEntryStatus(); } #endif /* DEBUG_UMEM */ } @@ -275,7 +276,16 @@ void XskSocket::removeDestinationAddress(const std::string& mapPath, const Combo void XskSocket::fillFq(uint32_t fillSize) noexcept { - { + if (uniqueEmptyFrameOffset.size() < fillSize) { + auto frames = sharedEmptyFrameOffset->lock(); + const auto moveSize = std::min(static_cast<size_t>(fillSize), frames->size()); + if (moveSize > 0) { + // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) + uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end())); + frames->resize(frames->size() - moveSize); + } + } + else if (uniqueEmptyFrameOffset.size() > (10 * fillSize)) { // if we have less than holdThreshold frames in the shared queue (which might be an issue // when the XskWorker needs empty frames), move frames from the unique container into the // shared one. This might not be optimal right now. @@ -290,7 +300,9 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept } } - if (uniqueEmptyFrameOffset.size() < fillSize) { + fillSize = std::min(fillSize, static_cast<uint32_t>(uniqueEmptyFrameOffset.size())); + if (fillSize == 0) { + auto frames = sharedEmptyFrameOffset->lock(); return; } @@ -303,7 +315,7 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept for (; processed < toFill; processed++) { *xsk_ring_prod__fill_addr(&fq, idx++) = uniqueEmptyFrameOffset.back(); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue); #endif /* DEBUG_UMEM */ uniqueEmptyFrameOffset.pop_back(); } @@ -351,7 +363,7 @@ void XskSocket::send(std::vector<XskPacket>& packets) .len = packet.getFrameLen(), .options = 0}; #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue); #endif /* DEBUG_UMEM */ queued++; } @@ -381,7 +393,7 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast,performance-no-int-to-ptr) XskPacket packet = XskPacket(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, frameOffset(packet), {UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received); #endif /* DEBUG_UMEM */ if (!packet.parse(false)) { @@ -393,11 +405,13 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou } } catch (const std::exception& exp) { - std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl; + ++failed; + ++processed; break; } catch (...) { - std::cerr << "Exception while processing the XSK RX queue" << std::endl; + ++failed; + ++processed; break; } } @@ -437,7 +451,7 @@ void XskSocket::recycle(size_t size) noexcept for (; processed < completeSize; ++processed) { uniqueEmptyFrameOffset.push_back(*xsk_ring_cons__comp_addr(&cq, idx++)); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ } xsk_ring_cons__release(&cq, processed); @@ -510,9 +524,8 @@ void XskSocket::markAsFree(const XskPacket& packet) { auto offset = frameOffset(packet); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ - uniqueEmptyFrameOffset.push_back(offset); } @@ -717,7 +730,7 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept // IPV6 auto ipv6 = getIPv6Header(); std::swap(ipv6.daddr, ipv6.saddr); - assert(ipv6.nexthdr == IPPROTO_UDP); + ipv6.nexthdr = IPPROTO_UDP; auto udp = getUDPHeader(); std::swap(udp.dest, udp.source); @@ -726,16 +739,18 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept /* needed to get the correct checksum */ setIPv6Header(ipv6); setUDPHeader(udp); - udp.check = tcp_udp_v6_checksum(&ipv6); + // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP + // implementations do the same + // udp.check = tcp_udp_v6_checksum(&ipv6); rewriteIpv6Header(&ipv6, getFrameLen()); setIPv6Header(ipv6); setUDPHeader(udp); } - else { + else if (ethHeader.h_proto == htons(ETH_P_IP)) { // IPV4 auto ipv4 = getIPv4Header(); std::swap(ipv4.daddr, ipv4.saddr); - assert(ipv4.protocol == IPPROTO_UDP); + ipv4.protocol = IPPROTO_UDP; auto udp = getUDPHeader(); std::swap(udp.dest, udp.source); @@ -744,7 +759,9 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept /* needed to get the correct checksum */ setIPv4Header(ipv4); setUDPHeader(udp); - udp.check = tcp_udp_v4_checksum(&ipv4); + // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP + // implementations do the same + // udp.check = tcp_udp_v4_checksum(&ipv4); rewriteIpv4Header(&ipv4, getFrameLen()); setIPv4Header(ipv4); setUDPHeader(udp); @@ -844,29 +861,24 @@ void XskWorker::notify(int desc) } } -XskWorker::XskWorker() : - workerWaker(createEventfd()), xskSocketWaker(createEventfd()) +XskWorker::XskWorker(XskWorker::Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames) : + d_sharedEmptyFrameOffset(frames), d_type(type), workerWaker(createEventfd()), xskSocketWaker(createEventfd()) { } void XskWorker::pushToProcessingQueue(XskPacket& packet) { -#if defined(__SANITIZE_THREAD__) - if (!incomingPacketsQueue.lock()->push(packet)) { -#else - if (!incomingPacketsQueue.push(packet)) { -#endif + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Trying to push an incoming packet into an outgoing-only XSK Worker"); + } + if (!d_incomingPacketsQueue.push(packet)) { markAsFree(packet); } } void XskWorker::pushToSendQueue(XskPacket& packet) { -#if defined(__SANITIZE_THREAD__) - if (!outgoingPacketsQueue.lock()->push(packet)) { -#else - if (!outgoingPacketsQueue.push(packet)) { -#endif + if (!d_outgoingPacketsQueue.push(packet)) { markAsFree(packet); } } @@ -901,9 +913,9 @@ void XskPacket::rewrite() noexcept auto ipHeader = getIPv4Header(); ipHeader.daddr = to.sin4.sin_addr.s_addr; ipHeader.saddr = from.sin4.sin_addr.s_addr; + ipHeader.protocol = IPPROTO_UDP; auto udpHeader = getUDPHeader(); - ipHeader.protocol = IPPROTO_UDP; udpHeader.source = from.sin4.sin_port; udpHeader.dest = to.sin4.sin_port; udpHeader.len = htons(getDataSize() + sizeof(udpHeader)); @@ -911,7 +923,9 @@ void XskPacket::rewrite() noexcept /* needed to get the correct checksum */ setIPv4Header(ipHeader); setUDPHeader(udpHeader); - udpHeader.check = tcp_udp_v4_checksum(&ipHeader); + // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP + // implementations do the same + // udpHeader.check = tcp_udp_v4_checksum(&ipHeader); rewriteIpv4Header(&ipHeader, getFrameLen()); setIPv4Header(ipHeader); setUDPHeader(udpHeader); @@ -922,9 +936,9 @@ void XskPacket::rewrite() noexcept auto ipHeader = getIPv6Header(); memcpy(&ipHeader.daddr, &to.sin6.sin6_addr, sizeof(ipHeader.daddr)); memcpy(&ipHeader.saddr, &from.sin6.sin6_addr, sizeof(ipHeader.saddr)); + ipHeader.nexthdr = IPPROTO_UDP; auto udpHeader = getUDPHeader(); - ipHeader.nexthdr = IPPROTO_UDP; udpHeader.source = from.sin6.sin6_port; udpHeader.dest = to.sin6.sin6_port; udpHeader.len = htons(getDataSize() + sizeof(udpHeader)); @@ -932,7 +946,9 @@ void XskPacket::rewrite() noexcept /* needed to get the correct checksum */ setIPv6Header(ipHeader); setUDPHeader(udpHeader); - udpHeader.check = tcp_udp_v6_checksum(&ipHeader); + // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP + // implementations do the same + // udpHeader.check = tcp_udp_v6_checksum(&ipHeader); setIPv6Header(ipHeader); setUDPHeader(udpHeader); } @@ -1119,15 +1135,15 @@ void XskWorker::notifyXskSocket() const notify(xskSocketWaker); } -std::shared_ptr<XskWorker> XskWorker::create() +std::shared_ptr<XskWorker> XskWorker::create(Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames) { - return std::make_shared<XskWorker>(); + return std::make_shared<XskWorker>(type, frames); } void XskSocket::addWorker(std::shared_ptr<XskWorker> worker) { const auto socketWaker = worker->xskSocketWaker.getHandle(); - worker->umemBufBase = umem.bufBase; + worker->setUmemBufBase(umem.bufBase); d_workers.insert({socketWaker, std::move(worker)}); fds.push_back(pollfd{ .fd = socketWaker, @@ -1145,9 +1161,14 @@ void XskSocket::removeWorkerRoute(const ComboAddress& dest) d_workerRoutes.lock()->erase(dest); } +void XskWorker::setUmemBufBase(uint8_t* base) +{ + d_umemBufBase = base; +} + uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept { - return packet.getFrameOffsetFrom(umemBufBase); + return packet.getFrameOffsetFrom(d_umemBufBase); } void XskWorker::notifyWorker() const @@ -1155,6 +1176,29 @@ void XskWorker::notifyWorker() const notify(workerWaker); } +bool XskWorker::hasIncomingFrames() +{ + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker"); + } + + return d_incomingPacketsQueue.read_available() != 0U; +} + +void XskWorker::processIncomingFrames(const std::function<void(XskPacket& packet)>& callback) +{ + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker"); + } + + d_incomingPacketsQueue.consume_all(callback); +} + +void XskWorker::processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback) +{ + d_outgoingPacketsQueue.consume_all(callback); +} + void XskSocket::getMACFromIfName() { ifreq ifr{}; @@ -1213,42 +1257,28 @@ std::vector<pollfd> getPollFdsForWorker(XskWorker& info) return fds; } -void XskWorker::fillUniqueEmptyOffset() -{ - auto frames = sharedEmptyFrameOffset->lock(); - const auto moveSize = std::min(static_cast<size_t>(32), frames->size()); - if (moveSize > 0) { - // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) - uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end())); - frames->resize(frames->size() - moveSize); - } -} - std::optional<XskPacket> XskWorker::getEmptyFrame() { - if (!uniqueEmptyFrameOffset.empty()) { - auto offset = uniqueEmptyFrameOffset.back(); - uniqueEmptyFrameOffset.pop_back(); - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - return XskPacket(offset + umemBufBase, 0, frameSize); + auto frames = d_sharedEmptyFrameOffset->lock(); + if (frames->empty()) { + return std::nullopt; } - fillUniqueEmptyOffset(); - if (!uniqueEmptyFrameOffset.empty()) { - auto offset = uniqueEmptyFrameOffset.back(); - uniqueEmptyFrameOffset.pop_back(); - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - return XskPacket(offset + umemBufBase, 0, frameSize); - } - return std::nullopt; + auto offset = frames->back(); + frames->pop_back(); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + return XskPacket(offset + d_umemBufBase, 0, d_frameSize); } void XskWorker::markAsFree(const XskPacket& packet) { auto offset = frameOffset(packet); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, d_sharedEmptyFrameOffset, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ - uniqueEmptyFrameOffset.push_back(offset); + { + auto frames = d_sharedEmptyFrameOffset->lock(); + frames->push_back(offset); + } } uint32_t XskPacket::getFlags() const noexcept @@ -58,7 +58,7 @@ using MACAddr = std::array<uint8_t, 6>; // We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue. // Once we have read the descriptors from the rx queue we release them, but we own the frames. // After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent). -// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue). +// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue). // XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet. // The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively. @@ -192,8 +192,10 @@ class XskPacket public: enum Flags : uint32_t { + /* whether the payload has been modified */ UPDATE = 1 << 0, DELAY = 1 << 1, + /* whether the headers have already been updated */ REWRITE = 1 << 2 }; @@ -234,6 +236,7 @@ private: void setIPv6Header(const ipv6hdr& ipv6Header) noexcept; [[nodiscard]] udphdr getUDPHeader() const noexcept; void setUDPHeader(const udphdr& udpHeader) noexcept; + /* exchange the source and destination addresses (ethernet and IP) */ void changeDirectAndUpdateChecksum() noexcept; constexpr static uint8_t DefaultTTL = 64; @@ -250,10 +253,13 @@ public: [[nodiscard]] PacketBuffer cloneHeaderToPacketBuffer() const; void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept; bool setPayload(const PacketBuffer& buf); + /* rewrite the headers, usually after setAddr() and setPayload() have been called */ void rewrite() noexcept; void setHeader(PacketBuffer& buf); XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize); void addDelay(int relativeMilliseconds) noexcept; + /* if the payload have been updated, and the headers have not been rewritten, exchange the source + and destination addresses (ethernet and IP) and rewrite the headers */ void updatePacket() noexcept; // parse IP and UDP payloads bool parse(bool fromSetHeader); @@ -269,47 +275,45 @@ public: }; bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept; -/* g++ defines __SANITIZE_THREAD__ - clang++ supports the nice __has_feature(thread_sanitizer), - let's merge them */ -#if defined(__has_feature) -#if __has_feature(thread_sanitizer) -#define __SANITIZE_THREAD__ 1 -#endif -#endif - // XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq. // After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags. // XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq. class XskWorker { -#if defined(__SANITIZE_THREAD__) - using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>>; -#else - using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>; -#endif - public: + enum class Type : uint8_t + { + OutgoingOnly, + Bidirectional + }; + +private: + using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>; // queue of packets to be processed by this worker - XskPacketRing incomingPacketsQueue; + XskPacketRing d_incomingPacketsQueue; // queue of packets processed by this worker (to be sent, or discarded) - XskPacketRing outgoingPacketsQueue; - - uint8_t* umemBufBase{nullptr}; + XskPacketRing d_outgoingPacketsQueue; // list of frames that are shared with the XskRouter - std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset; - // list of frames that we own, used to generate new packets (health-check) - vector<uint64_t> uniqueEmptyFrameOffset; - const size_t frameSize{XskSocket::getFrameSize()}; + std::shared_ptr<LockGuarded<vector<uint64_t>>> d_sharedEmptyFrameOffset; + uint8_t* d_umemBufBase{nullptr}; + const size_t d_frameSize{XskSocket::getFrameSize()}; + Type d_type; + +public: FDWrapper workerWaker; FDWrapper xskSocketWaker; - XskWorker(); static int createEventfd(); static void notify(int desc); - static std::shared_ptr<XskWorker> create(); + static std::shared_ptr<XskWorker> create(Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames); + + XskWorker(Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames); + void setUmemBufBase(uint8_t* base); void pushToProcessingQueue(XskPacket& packet); void pushToSendQueue(XskPacket& packet); + bool hasIncomingFrames(); + void processIncomingFrames(const std::function<void(XskPacket& packet)>& callback); + void processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback); void markAsFree(const XskPacket& packet); // notify worker that at least one packet is available for processing void notifyWorker() const; @@ -319,10 +323,7 @@ public: void cleanWorkerNotification() const noexcept; void cleanSocketNotification() const noexcept; [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept; - // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset - void fillUniqueEmptyOffset(); - // look for an empty umem entry in uniqueEmptyFrameOffset - // then sharedEmptyFrameOffset if needed + // get an empty umem entry from sharedEmptyFrameOffset std::optional<XskPacket> getEmptyFrame(); }; std::vector<pollfd> getPollFdsForWorker(XskWorker& info); |