summaryrefslogtreecommitdiffstats
path: root/lib/isc/netmgr/netmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/isc/netmgr/netmgr.c')
-rw-r--r--lib/isc/netmgr/netmgr.c3396
1 files changed, 3396 insertions, 0 deletions
diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c
new file mode 100644
index 0000000..6f42ec9
--- /dev/null
+++ b/lib/isc/netmgr/netmgr.c
@@ -0,0 +1,3396 @@
+/*
+ * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+ *
+ * SPDX-License-Identifier: MPL-2.0
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, you can obtain one at https://mozilla.org/MPL/2.0/.
+ *
+ * See the COPYRIGHT file distributed with this work for additional
+ * information regarding copyright ownership.
+ */
+
+#include <inttypes.h>
+#include <unistd.h>
+#include <uv.h>
+#ifdef HAVE_LIBCTRACE
+#include <execinfo.h>
+#endif /* ifdef HAVE_LIBCTRACE */
+
+#include <isc/atomic.h>
+#include <isc/barrier.h>
+#include <isc/buffer.h>
+#include <isc/condition.h>
+#include <isc/errno.h>
+#include <isc/list.h>
+#include <isc/log.h>
+#include <isc/magic.h>
+#include <isc/mem.h>
+#include <isc/netmgr.h>
+#include <isc/print.h>
+#include <isc/quota.h>
+#include <isc/random.h>
+#include <isc/refcount.h>
+#include <isc/region.h>
+#include <isc/result.h>
+#include <isc/sockaddr.h>
+#include <isc/stats.h>
+#include <isc/strerr.h>
+#include <isc/task.h>
+#include <isc/thread.h>
+#include <isc/util.h>
+
+#include "netmgr-int.h"
+#include "netmgr_p.h"
+#include "openssl_shim.h"
+#include "trampoline_p.h"
+#include "uv-compat.h"
+
+/*%
+ * How many isc_nmhandles and isc_nm_uvreqs will we be
+ * caching for reuse in a socket.
+ */
+#define ISC_NM_HANDLES_STACK_SIZE 600
+#define ISC_NM_REQS_STACK_SIZE 600
+
+/*%
+ * Shortcut index arrays to get access to statistics counters.
+ */
+
+static const isc_statscounter_t udp4statsindex[] = {
+ isc_sockstatscounter_udp4open,
+ isc_sockstatscounter_udp4openfail,
+ isc_sockstatscounter_udp4close,
+ isc_sockstatscounter_udp4bindfail,
+ isc_sockstatscounter_udp4connectfail,
+ isc_sockstatscounter_udp4connect,
+ -1,
+ -1,
+ isc_sockstatscounter_udp4sendfail,
+ isc_sockstatscounter_udp4recvfail,
+ isc_sockstatscounter_udp4active
+};
+
+static const isc_statscounter_t udp6statsindex[] = {
+ isc_sockstatscounter_udp6open,
+ isc_sockstatscounter_udp6openfail,
+ isc_sockstatscounter_udp6close,
+ isc_sockstatscounter_udp6bindfail,
+ isc_sockstatscounter_udp6connectfail,
+ isc_sockstatscounter_udp6connect,
+ -1,
+ -1,
+ isc_sockstatscounter_udp6sendfail,
+ isc_sockstatscounter_udp6recvfail,
+ isc_sockstatscounter_udp6active
+};
+
+static const isc_statscounter_t tcp4statsindex[] = {
+ isc_sockstatscounter_tcp4open, isc_sockstatscounter_tcp4openfail,
+ isc_sockstatscounter_tcp4close, isc_sockstatscounter_tcp4bindfail,
+ isc_sockstatscounter_tcp4connectfail, isc_sockstatscounter_tcp4connect,
+ isc_sockstatscounter_tcp4acceptfail, isc_sockstatscounter_tcp4accept,
+ isc_sockstatscounter_tcp4sendfail, isc_sockstatscounter_tcp4recvfail,
+ isc_sockstatscounter_tcp4active
+};
+
+static const isc_statscounter_t tcp6statsindex[] = {
+ isc_sockstatscounter_tcp6open, isc_sockstatscounter_tcp6openfail,
+ isc_sockstatscounter_tcp6close, isc_sockstatscounter_tcp6bindfail,
+ isc_sockstatscounter_tcp6connectfail, isc_sockstatscounter_tcp6connect,
+ isc_sockstatscounter_tcp6acceptfail, isc_sockstatscounter_tcp6accept,
+ isc_sockstatscounter_tcp6sendfail, isc_sockstatscounter_tcp6recvfail,
+ isc_sockstatscounter_tcp6active
+};
+
+#if 0
+/* XXX: not currently used */
+static const isc_statscounter_t unixstatsindex[] = {
+ isc_sockstatscounter_unixopen,
+ isc_sockstatscounter_unixopenfail,
+ isc_sockstatscounter_unixclose,
+ isc_sockstatscounter_unixbindfail,
+ isc_sockstatscounter_unixconnectfail,
+ isc_sockstatscounter_unixconnect,
+ isc_sockstatscounter_unixacceptfail,
+ isc_sockstatscounter_unixaccept,
+ isc_sockstatscounter_unixsendfail,
+ isc_sockstatscounter_unixrecvfail,
+ isc_sockstatscounter_unixactive
+};
+#endif /* if 0 */
+
+/*
+ * libuv is not thread safe, but has mechanisms to pass messages
+ * between threads. Each socket is owned by a thread. For UDP
+ * sockets we have a set of sockets for each interface and we can
+ * choose a sibling and send the message directly. For TCP, or if
+ * we're calling from a non-networking thread, we need to pass the
+ * request using async_cb.
+ */
+
+#if defined(HAVE_THREAD_LOCAL)
+#include <threads.h>
+static thread_local int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
+#elif defined(HAVE___THREAD)
+static __thread int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
+#elif HAVE___DECLSPEC_THREAD
+__declspec(thread) int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
+#endif /* if defined(HAVE_THREAD_LOCAL) */
+
+static void
+nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG);
+static void
+nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle);
+static isc_threadresult_t
+nm_thread(isc_threadarg_t worker0);
+static void
+async_cb(uv_async_t *handle);
+
+static bool
+process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
+static isc_result_t
+process_queue(isc__networker_t *worker, netievent_type_t type);
+static void
+wait_for_priority_queue(isc__networker_t *worker);
+static void
+drain_queue(isc__networker_t *worker, netievent_type_t type);
+
+static void
+isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
+static void
+isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0);
+static void
+isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0);
+static void
+isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0);
+static void
+isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0);
+
+static void
+isc__nm_threadpool_initialize(uint32_t workers);
+static void
+isc__nm_work_cb(uv_work_t *req);
+static void
+isc__nm_after_work_cb(uv_work_t *req, int status);
+
+void
+isc__nmsocket_reset(isc_nmsocket_t *sock);
+
+/*%<
+ * Issue a 'handle closed' callback on the socket.
+ */
+
+static void
+nmhandle_detach_cb(isc_nmhandle_t **handlep FLARG);
+
+int
+isc_nm_tid(void) {
+ return (isc__nm_tid_v);
+}
+
+bool
+isc__nm_in_netthread(void) {
+ return (isc__nm_tid_v >= 0);
+}
+
+#ifdef WIN32
+static void
+isc__nm_winsock_initialize(void) {
+ WORD wVersionRequested = MAKEWORD(2, 2);
+ WSADATA wsaData;
+ int result;
+
+ result = WSAStartup(wVersionRequested, &wsaData);
+ if (result != 0) {
+ char strbuf[ISC_STRERRORSIZE];
+ strerror_r(result, strbuf, sizeof(strbuf));
+ UNEXPECTED_ERROR(__FILE__, __LINE__,
+ "WSAStartup() failed with error code %lu: %s",
+ result, strbuf);
+ }
+
+ /*
+ * Confirm that the WinSock DLL supports version 2.2. Note that if the
+ * DLL supports versions greater than 2.2 in addition to 2.2, it will
+ * still return 2.2 in wVersion since that is the version we requested.
+ */
+ if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) {
+ UNEXPECTED_ERROR(__FILE__, __LINE__,
+ "Unusable WinSock DLL version: %u.%u",
+ LOBYTE(wsaData.wVersion),
+ HIBYTE(wsaData.wVersion));
+ }
+}
+
+static void
+isc__nm_winsock_destroy(void) {
+ WSACleanup();
+}
+#endif /* WIN32 */
+
+static void
+isc__nm_threadpool_initialize(uint32_t workers) {
+ char buf[11];
+ int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
+ &(size_t){ sizeof(buf) });
+ if (r == UV_ENOENT) {
+ snprintf(buf, sizeof(buf), "%" PRIu32, workers);
+ uv_os_setenv("UV_THREADPOOL_SIZE", buf);
+ }
+}
+
+#if HAVE_DECL_UV_UDP_MMSG_FREE
+#define MINIMAL_UV_VERSION UV_VERSION(1, 40, 0)
+#elif HAVE_DECL_UV_UDP_RECVMMSG
+#define MAXIMAL_UV_VERSION UV_VERSION(1, 39, 99)
+#define MINIMAL_UV_VERSION UV_VERSION(1, 37, 0)
+#elif _WIN32
+#define MINIMAL_UV_VERSION UV_VERSION(1, 0, 0)
+#else
+#define MAXIMAL_UV_VERSION UV_VERSION(1, 34, 99)
+#define MINIMAL_UV_VERSION UV_VERSION(1, 0, 0)
+#endif
+
+void
+isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
+ isc_nm_t *mgr = NULL;
+ char name[32];
+
+ REQUIRE(workers > 0);
+
+#ifdef MAXIMAL_UV_VERSION
+ if (uv_version() > MAXIMAL_UV_VERSION) {
+ isc_error_fatal(__FILE__, __LINE__,
+ "libuv version too new: running with libuv %s "
+ "when compiled with libuv %s will lead to "
+ "libuv failures",
+ uv_version_string(), UV_VERSION_STRING);
+ }
+#endif /* MAXIMAL_UV_VERSION */
+
+ if (uv_version() < MINIMAL_UV_VERSION) {
+ isc_error_fatal(__FILE__, __LINE__,
+ "libuv version too old: running with libuv %s "
+ "when compiled with libuv %s will lead to "
+ "libuv failures",
+ uv_version_string(), UV_VERSION_STRING);
+ }
+
+#ifdef WIN32
+ isc__nm_winsock_initialize();
+#endif /* WIN32 */
+
+ isc__nm_threadpool_initialize(workers);
+
+ mgr = isc_mem_get(mctx, sizeof(*mgr));
+ *mgr = (isc_nm_t){ .nworkers = workers };
+
+ isc_mem_attach(mctx, &mgr->mctx);
+ isc_mutex_init(&mgr->lock);
+ isc_condition_init(&mgr->wkstatecond);
+ isc_condition_init(&mgr->wkpausecond);
+ isc_refcount_init(&mgr->references, 1);
+ atomic_init(&mgr->maxudp, 0);
+ atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED);
+ atomic_init(&mgr->workers_paused, 0);
+ atomic_init(&mgr->paused, false);
+ atomic_init(&mgr->closing, false);
+#if HAVE_SO_REUSEPORT_LB
+ mgr->load_balance_sockets = true;
+#else
+ mgr->load_balance_sockets = false;
+#endif
+
+#ifdef NETMGR_TRACE
+ ISC_LIST_INIT(mgr->active_sockets);
+#endif
+
+ /*
+ * Default TCP timeout values.
+ * May be updated by isc_nm_tcptimeouts().
+ */
+ atomic_init(&mgr->init, 30000);
+ atomic_init(&mgr->idle, 30000);
+ atomic_init(&mgr->keepalive, 30000);
+ atomic_init(&mgr->advertised, 30000);
+
+ isc_barrier_init(&mgr->pausing, workers);
+ isc_barrier_init(&mgr->resuming, workers);
+
+ mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));
+ for (size_t i = 0; i < workers; i++) {
+ isc__networker_t *worker = &mgr->workers[i];
+ int r;
+
+ *worker = (isc__networker_t){
+ .mgr = mgr,
+ .id = i,
+ };
+
+ r = uv_loop_init(&worker->loop);
+ UV_RUNTIME_CHECK(uv_loop_init, r);
+
+ worker->loop.data = &mgr->workers[i];
+
+ r = uv_async_init(&worker->loop, &worker->async, async_cb);
+ UV_RUNTIME_CHECK(uv_async_init, r);
+
+ for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+ isc_mutex_init(&worker->ievents[type].lock);
+ isc_condition_init(&worker->ievents[type].cond);
+ ISC_LIST_INIT(worker->ievents[type].list);
+ }
+
+ worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
+ worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
+
+ /*
+ * We need to do this here and not in nm_thread to avoid a
+ * race - we could exit isc_nm_start, launch nm_destroy,
+ * and nm_thread would still not be up.
+ */
+ mgr->workers_running++;
+ isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread);
+
+ snprintf(name, sizeof(name), "isc-net-%04zu", i);
+ isc_thread_setname(worker->thread, name);
+ }
+
+ mgr->magic = NM_MAGIC;
+ *netmgrp = mgr;
+}
+
+/*
+ * Free the resources of the network manager.
+ */
+static void
+nm_destroy(isc_nm_t **mgr0) {
+ REQUIRE(VALID_NM(*mgr0));
+ REQUIRE(!isc__nm_in_netthread());
+
+ isc_nm_t *mgr = *mgr0;
+ *mgr0 = NULL;
+
+ isc_refcount_destroy(&mgr->references);
+
+ mgr->magic = 0;
+
+ for (int i = 0; i < mgr->nworkers; i++) {
+ isc__networker_t *worker = &mgr->workers[i];
+ isc__netievent_t *event = isc__nm_get_netievent_stop(mgr);
+ isc__nm_enqueue_ievent(worker, event);
+ }
+
+ LOCK(&mgr->lock);
+ while (mgr->workers_running > 0) {
+ WAIT(&mgr->wkstatecond, &mgr->lock);
+ }
+ UNLOCK(&mgr->lock);
+
+ for (int i = 0; i < mgr->nworkers; i++) {
+ isc__networker_t *worker = &mgr->workers[i];
+ int r;
+
+ r = uv_loop_close(&worker->loop);
+ UV_RUNTIME_CHECK(uv_loop_close, r);
+
+ for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+ INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
+ isc_condition_destroy(&worker->ievents[type].cond);
+ isc_mutex_destroy(&worker->ievents[type].lock);
+ }
+
+ isc_mem_put(mgr->mctx, worker->sendbuf,
+ ISC_NETMGR_SENDBUF_SIZE);
+ isc_mem_put(mgr->mctx, worker->recvbuf,
+ ISC_NETMGR_RECVBUF_SIZE);
+ isc_thread_join(worker->thread, NULL);
+ }
+
+ if (mgr->stats != NULL) {
+ isc_stats_detach(&mgr->stats);
+ }
+
+ isc_barrier_destroy(&mgr->resuming);
+ isc_barrier_destroy(&mgr->pausing);
+
+ isc_condition_destroy(&mgr->wkstatecond);
+ isc_condition_destroy(&mgr->wkpausecond);
+ isc_mutex_destroy(&mgr->lock);
+
+ isc_mem_put(mgr->mctx, mgr->workers,
+ mgr->nworkers * sizeof(isc__networker_t));
+ isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr));
+
+#ifdef WIN32
+ isc__nm_winsock_destroy();
+#endif /* WIN32 */
+}
+
+static void
+enqueue_pause(isc__networker_t *worker) {
+ isc__netievent_pause_t *event =
+ isc__nm_get_netievent_pause(worker->mgr);
+ isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event);
+}
+
+static void
+isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0) {
+ UNUSED(ev0);
+ REQUIRE(worker->paused == false);
+
+ worker->paused = true;
+ uv_stop(&worker->loop);
+}
+
+void
+isc_nm_pause(isc_nm_t *mgr) {
+ REQUIRE(VALID_NM(mgr));
+ REQUIRE(!atomic_load(&mgr->paused));
+
+ isc__nm_acquire_interlocked_force(mgr);
+
+ if (isc__nm_in_netthread()) {
+ REQUIRE(isc_nm_tid() == 0);
+ }
+
+ for (int i = 0; i < mgr->nworkers; i++) {
+ isc__networker_t *worker = &mgr->workers[i];
+ if (i == isc_nm_tid()) {
+ isc__nm_async_pause(worker, NULL);
+ } else {
+ enqueue_pause(worker);
+ }
+ }
+
+ if (isc__nm_in_netthread()) {
+ atomic_fetch_add(&mgr->workers_paused, 1);
+ isc_barrier_wait(&mgr->pausing);
+ }
+
+ LOCK(&mgr->lock);
+ while (atomic_load(&mgr->workers_paused) != mgr->workers_running) {
+ WAIT(&mgr->wkstatecond, &mgr->lock);
+ }
+ UNLOCK(&mgr->lock);
+
+ REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false },
+ true));
+}
+
+static void
+enqueue_resume(isc__networker_t *worker) {
+ isc__netievent_resume_t *event =
+ isc__nm_get_netievent_resume(worker->mgr);
+ isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event);
+}
+
+static void
+isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0) {
+ UNUSED(ev0);
+ REQUIRE(worker->paused == true);
+
+ worker->paused = false;
+}
+
+void
+isc_nm_resume(isc_nm_t *mgr) {
+ REQUIRE(VALID_NM(mgr));
+ REQUIRE(atomic_load(&mgr->paused));
+
+ if (isc__nm_in_netthread()) {
+ REQUIRE(isc_nm_tid() == 0);
+ drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY);
+ }
+
+ for (int i = 0; i < mgr->nworkers; i++) {
+ isc__networker_t *worker = &mgr->workers[i];
+ if (i == isc_nm_tid()) {
+ isc__nm_async_resume(worker, NULL);
+ } else {
+ enqueue_resume(worker);
+ }
+ }
+
+ if (isc__nm_in_netthread()) {
+ drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED);
+
+ atomic_fetch_sub(&mgr->workers_paused, 1);
+ isc_barrier_wait(&mgr->resuming);
+ }
+
+ LOCK(&mgr->lock);
+ while (atomic_load(&mgr->workers_paused) != 0) {
+ WAIT(&mgr->wkstatecond, &mgr->lock);
+ }
+ UNLOCK(&mgr->lock);
+
+ REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true },
+ false));
+
+ isc__nm_drop_interlocked(mgr);
+}
+
+void
+isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst) {
+ REQUIRE(VALID_NM(mgr));
+ REQUIRE(dst != NULL && *dst == NULL);
+
+ isc_refcount_increment(&mgr->references);
+
+ *dst = mgr;
+}
+
+void
+isc_nm_detach(isc_nm_t **mgr0) {
+ isc_nm_t *mgr = NULL;
+
+ REQUIRE(mgr0 != NULL);
+ REQUIRE(VALID_NM(*mgr0));
+
+ mgr = *mgr0;
+ *mgr0 = NULL;
+
+ if (isc_refcount_decrement(&mgr->references) == 1) {
+ nm_destroy(&mgr);
+ }
+}
+
+void
+isc__netmgr_shutdown(isc_nm_t *mgr) {
+ REQUIRE(VALID_NM(mgr));
+
+ atomic_store(&mgr->closing, true);
+ for (int i = 0; i < mgr->nworkers; i++) {
+ isc__netievent_t *event = NULL;
+ event = isc__nm_get_netievent_shutdown(mgr);
+ isc__nm_enqueue_ievent(&mgr->workers[i], event);
+ }
+}
+
+void
+isc__netmgr_destroy(isc_nm_t **netmgrp) {
+ isc_nm_t *mgr = NULL;
+ int counter = 0;
+
+ REQUIRE(VALID_NM(*netmgrp));
+
+ mgr = *netmgrp;
+
+ /*
+ * Close active connections.
+ */
+ isc__netmgr_shutdown(mgr);
+
+ /*
+ * Wait for the manager to be dereferenced elsewhere.
+ */
+ while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) {
+ uv_sleep(10);
+ }
+
+#ifdef NETMGR_TRACE
+ if (isc_refcount_current(&mgr->references) > 1) {
+ isc__nm_dump_active(mgr);
+ UNREACHABLE();
+ }
+#endif
+
+ /*
+ * Now just patiently wait
+ */
+ while (isc_refcount_current(&mgr->references) > 1) {
+ uv_sleep(10);
+ }
+
+ /*
+ * Detach final reference.
+ */
+ isc_nm_detach(netmgrp);
+}
+
+void
+isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) {
+ REQUIRE(VALID_NM(mgr));
+
+ atomic_store(&mgr->maxudp, maxudp);
+}
+
+void
+isc_nmhandle_setwritetimeout(isc_nmhandle_t *handle, uint64_t write_timeout) {
+ REQUIRE(VALID_NMHANDLE(handle));
+ REQUIRE(VALID_NMSOCK(handle->sock));
+
+ handle->sock->write_timeout = write_timeout;
+}
+
+void
+isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
+ uint32_t keepalive, uint32_t advertised) {
+ REQUIRE(VALID_NM(mgr));
+
+ atomic_store(&mgr->init, init);
+ atomic_store(&mgr->idle, idle);
+ atomic_store(&mgr->keepalive, keepalive);
+ atomic_store(&mgr->advertised, advertised);
+}
+
+bool
+isc_nm_getloadbalancesockets(isc_nm_t *mgr) {
+ REQUIRE(VALID_NM(mgr));
+
+ return (mgr->load_balance_sockets);
+}
+
+void
+isc_nm_setloadbalancesockets(isc_nm_t *mgr, bool enabled) {
+ REQUIRE(VALID_NM(mgr));
+
+#if HAVE_SO_REUSEPORT_LB
+ mgr->load_balance_sockets = enabled;
+#else
+ UNUSED(enabled);
+#endif
+}
+
+void
+isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
+ uint32_t *keepalive, uint32_t *advertised) {
+ REQUIRE(VALID_NM(mgr));
+
+ if (initial != NULL) {
+ *initial = atomic_load(&mgr->init);
+ }
+
+ if (idle != NULL) {
+ *idle = atomic_load(&mgr->idle);
+ }
+
+ if (keepalive != NULL) {
+ *keepalive = atomic_load(&mgr->keepalive);
+ }
+
+ if (advertised != NULL) {
+ *advertised = atomic_load(&mgr->advertised);
+ }
+}
+
+/*
+ * nm_thread is a single worker thread, that runs uv_run event loop
+ * until asked to stop.
+ *
+ * There are four queues for asynchronous events:
+ *
+ * 1. priority queue - netievents on the priority queue are run even when
+ * the taskmgr enters exclusive mode and the netmgr is paused. This
+ * is needed to properly start listening on the interfaces, free
+ * resources on shutdown, or resume from a pause.
+ *
+ * 2. privileged task queue - only privileged tasks are queued here and
+ * this is the first queue that gets processed when network manager
+ * is unpaused using isc_nm_resume(). All netmgr workers need to
+ * clean the privileged task queue before they all proceed to normal
+ * operation. Both task queues are processed when the workers are
+ * shutting down.
+ *
+ * 3. task queue - only (traditional) tasks are scheduled here, and this
+ * queue and the privileged task queue are both processed when the
+ * netmgr workers are finishing. This is needed to process the task
+ * shutdown events.
+ *
+ * 4. normal queue - this is the queue with netmgr events, e.g. reading,
+ * sending, callbacks, etc.
+ */
+
+static isc_threadresult_t
+nm_thread(isc_threadarg_t worker0) {
+ isc__networker_t *worker = (isc__networker_t *)worker0;
+ isc_nm_t *mgr = worker->mgr;
+
+ isc__nm_tid_v = worker->id;
+
+ while (true) {
+ /*
+ * uv_run() runs async_cb() in a loop, which processes
+ * all four event queues until a "pause" or "stop" event
+ * is encountered. On pause, we process only priority and
+ * privileged events until resuming.
+ */
+ int r = uv_run(&worker->loop, UV_RUN_DEFAULT);
+ INSIST(r > 0 || worker->finished);
+
+ if (worker->paused) {
+ INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid());
+
+ atomic_fetch_add(&mgr->workers_paused, 1);
+ if (isc_barrier_wait(&mgr->pausing) != 0) {
+ LOCK(&mgr->lock);
+ SIGNAL(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
+ }
+
+ while (worker->paused) {
+ wait_for_priority_queue(worker);
+ }
+
+ /*
+ * All workers must drain the privileged event
+ * queue before we resume from pause.
+ */
+ drain_queue(worker, NETIEVENT_PRIVILEGED);
+
+ atomic_fetch_sub(&mgr->workers_paused, 1);
+ if (isc_barrier_wait(&mgr->resuming) != 0) {
+ LOCK(&mgr->lock);
+ SIGNAL(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
+ }
+ }
+
+ if (r == 0) {
+ INSIST(worker->finished);
+ break;
+ }
+
+ INSIST(!worker->finished);
+ }
+
+ /*
+ * We are shutting down. Drain the queues.
+ */
+ drain_queue(worker, NETIEVENT_PRIVILEGED);
+ drain_queue(worker, NETIEVENT_TASK);
+
+ for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+ LOCK(&worker->ievents[type].lock);
+ INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
+ UNLOCK(&worker->ievents[type].lock);
+ }
+
+ LOCK(&mgr->lock);
+ mgr->workers_running--;
+ SIGNAL(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
+
+ return ((isc_threadresult_t)0);
+}
+
+static bool
+process_all_queues(isc__networker_t *worker) {
+ bool reschedule = false;
+ /*
+ * The queue processing functions will return false when the
+ * system is pausing or stopping and we don't want to process
+ * the other queues in such case, but we need the async event
+ * to be rescheduled in the next uv_run().
+ */
+ for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+ isc_result_t result = process_queue(worker, type);
+ switch (result) {
+ case ISC_R_SUSPEND:
+ reschedule = true;
+ break;
+ case ISC_R_EMPTY:
+ /* empty queue */
+ break;
+ case ISC_R_SUCCESS:
+ reschedule = true;
+ break;
+ default:
+ UNREACHABLE();
+ }
+ }
+
+ return (reschedule);
+}
+
+/*
+ * async_cb() is a universal callback for 'async' events sent to event loop.
+ * It's the only way to safely pass data to the libuv event loop. We use a
+ * single async event and a set of lockless queues of 'isc__netievent_t'
+ * structures passed from other threads.
+ */
+static void
+async_cb(uv_async_t *handle) {
+ isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
+
+ if (process_all_queues(worker)) {
+ /*
+ * If we didn't process all the events, we need to enqueue
+ * async_cb to be run in the next iteration of the uv_loop
+ */
+ uv_async_send(handle);
+ }
+}
+
+static void
+isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0) {
+ UNUSED(ev0);
+ worker->finished = true;
+ /* Close the async handler */
+ uv_close((uv_handle_t *)&worker->async, NULL);
+}
+
+void
+isc_nm_task_enqueue(isc_nm_t *nm, isc_task_t *task, int threadid) {
+ isc__netievent_t *event = NULL;
+ int tid;
+ isc__networker_t *worker = NULL;
+
+ if (threadid == -1) {
+ tid = (int)isc_random_uniform(nm->nworkers);
+ } else {
+ tid = threadid % nm->nworkers;
+ }
+
+ worker = &nm->workers[tid];
+
+ if (isc_task_privileged(task)) {
+ event = (isc__netievent_t *)
+ isc__nm_get_netievent_privilegedtask(nm, task);
+ } else {
+ event = (isc__netievent_t *)isc__nm_get_netievent_task(nm,
+ task);
+ }
+
+ isc__nm_enqueue_ievent(worker, event);
+}
+
+#define isc__nm_async_privilegedtask(worker, ev0) \
+ isc__nm_async_task(worker, ev0)
+
+static void
+isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc__netievent_task_t *ievent = (isc__netievent_task_t *)ev0;
+ isc_result_t result;
+
+ UNUSED(worker);
+
+ result = isc_task_run(ievent->task);
+
+ switch (result) {
+ case ISC_R_QUOTA:
+ isc_task_ready(ievent->task);
+ return;
+ case ISC_R_SUCCESS:
+ return;
+ default:
+ UNREACHABLE();
+ }
+}
+
+static void
+wait_for_priority_queue(isc__networker_t *worker) {
+ isc_condition_t *cond = &worker->ievents[NETIEVENT_PRIORITY].cond;
+ isc_mutex_t *lock = &worker->ievents[NETIEVENT_PRIORITY].lock;
+ isc__netievent_list_t *list =
+ &(worker->ievents[NETIEVENT_PRIORITY].list);
+
+ LOCK(lock);
+ while (ISC_LIST_EMPTY(*list)) {
+ WAIT(cond, lock);
+ }
+ UNLOCK(lock);
+
+ drain_queue(worker, NETIEVENT_PRIORITY);
+}
+
+static void
+drain_queue(isc__networker_t *worker, netievent_type_t type) {
+ bool empty = false;
+ while (!empty) {
+ if (process_queue(worker, type) == ISC_R_EMPTY) {
+ LOCK(&worker->ievents[type].lock);
+ empty = ISC_LIST_EMPTY(worker->ievents[type].list);
+ UNLOCK(&worker->ievents[type].lock);
+ }
+ }
+}
+
+/*
+ * The two macros here generate the individual cases for the process_netievent()
+ * function. The NETIEVENT_CASE(type) macro is the common case, and
+ * NETIEVENT_CASE_NOMORE(type) is a macro that causes the loop in the
+ * process_queue() to stop, e.g. it's only used for the netievent that
+ * stops/pauses processing the enqueued netievents.
+ */
+#define NETIEVENT_CASE(type) \
+ case netievent_##type: { \
+ isc__nm_async_##type(worker, ievent); \
+ isc__nm_put_netievent_##type( \
+ worker->mgr, (isc__netievent_##type##_t *)ievent); \
+ return (true); \
+ }
+
+#define NETIEVENT_CASE_NOMORE(type) \
+ case netievent_##type: { \
+ isc__nm_async_##type(worker, ievent); \
+ isc__nm_put_netievent_##type(worker->mgr, ievent); \
+ return (false); \
+ }
+
+static bool
+process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
+ REQUIRE(worker->id == isc_nm_tid());
+
+ switch (ievent->type) {
+ /* Don't process more ievents when we are stopping */
+ NETIEVENT_CASE_NOMORE(stop);
+
+ NETIEVENT_CASE(privilegedtask);
+ NETIEVENT_CASE(task);
+
+ NETIEVENT_CASE(udpconnect);
+ NETIEVENT_CASE(udplisten);
+ NETIEVENT_CASE(udpstop);
+ NETIEVENT_CASE(udpsend);
+ NETIEVENT_CASE(udpread);
+ NETIEVENT_CASE(udpcancel);
+ NETIEVENT_CASE(udpclose);
+
+ NETIEVENT_CASE(tcpaccept);
+ NETIEVENT_CASE(tcpconnect);
+ NETIEVENT_CASE(tcplisten);
+ NETIEVENT_CASE(tcpstartread);
+ NETIEVENT_CASE(tcppauseread);
+ NETIEVENT_CASE(tcpsend);
+ NETIEVENT_CASE(tcpstop);
+ NETIEVENT_CASE(tcpcancel);
+ NETIEVENT_CASE(tcpclose);
+
+ NETIEVENT_CASE(tcpdnsaccept);
+ NETIEVENT_CASE(tcpdnslisten);
+ NETIEVENT_CASE(tcpdnsconnect);
+ NETIEVENT_CASE(tcpdnssend);
+ NETIEVENT_CASE(tcpdnscancel);
+ NETIEVENT_CASE(tcpdnsclose);
+ NETIEVENT_CASE(tcpdnsread);
+ NETIEVENT_CASE(tcpdnsstop);
+
+ NETIEVENT_CASE(connectcb);
+ NETIEVENT_CASE(readcb);
+ NETIEVENT_CASE(sendcb);
+
+ NETIEVENT_CASE(close);
+ NETIEVENT_CASE(detach);
+
+ NETIEVENT_CASE(shutdown);
+ NETIEVENT_CASE(resume);
+ NETIEVENT_CASE_NOMORE(pause);
+ default:
+ UNREACHABLE();
+ }
+ return (true);
+}
+
+static isc_result_t
+process_queue(isc__networker_t *worker, netievent_type_t type) {
+ isc__netievent_t *ievent = NULL;
+ isc__netievent_list_t list;
+
+ ISC_LIST_INIT(list);
+
+ LOCK(&worker->ievents[type].lock);
+ ISC_LIST_MOVE(list, worker->ievents[type].list);
+ UNLOCK(&worker->ievents[type].lock);
+
+ ievent = ISC_LIST_HEAD(list);
+ if (ievent == NULL) {
+ /* There's nothing scheduled */
+ return (ISC_R_EMPTY);
+ }
+
+ while (ievent != NULL) {
+ isc__netievent_t *next = ISC_LIST_NEXT(ievent, link);
+ ISC_LIST_DEQUEUE(list, ievent, link);
+
+ if (!process_netievent(worker, ievent)) {
+ /* The netievent told us to stop */
+ if (!ISC_LIST_EMPTY(list)) {
+ /*
+ * Reschedule the rest of the unprocessed
+ * events.
+ */
+ LOCK(&worker->ievents[type].lock);
+ ISC_LIST_PREPENDLIST(worker->ievents[type].list,
+ list, link);
+ UNLOCK(&worker->ievents[type].lock);
+ }
+ return (ISC_R_SUSPEND);
+ }
+
+ ievent = next;
+ }
+
+ /* We processed at least one */
+ return (ISC_R_SUCCESS);
+}
+
+void *
+isc__nm_get_netievent(isc_nm_t *mgr, isc__netievent_type type) {
+ isc__netievent_storage_t *event = isc_mem_get(mgr->mctx,
+ sizeof(*event));
+
+ *event = (isc__netievent_storage_t){ .ni.type = type };
+ ISC_LINK_INIT(&(event->ni), link);
+ return (event);
+}
+
+void
+isc__nm_put_netievent(isc_nm_t *mgr, void *ievent) {
+ isc_mem_put(mgr->mctx, ievent, sizeof(isc__netievent_storage_t));
+}
+
+NETIEVENT_SOCKET_DEF(tcpclose);
+NETIEVENT_SOCKET_DEF(tcplisten);
+NETIEVENT_SOCKET_DEF(tcppauseread);
+NETIEVENT_SOCKET_DEF(tcpstartread);
+NETIEVENT_SOCKET_DEF(tcpstop);
+NETIEVENT_SOCKET_DEF(udpclose);
+NETIEVENT_SOCKET_DEF(udplisten);
+NETIEVENT_SOCKET_DEF(udpread);
+NETIEVENT_SOCKET_DEF(udpsend);
+NETIEVENT_SOCKET_DEF(udpstop);
+
+NETIEVENT_SOCKET_DEF(tcpdnsclose);
+NETIEVENT_SOCKET_DEF(tcpdnsread);
+NETIEVENT_SOCKET_DEF(tcpdnsstop);
+NETIEVENT_SOCKET_DEF(tcpdnslisten);
+NETIEVENT_SOCKET_REQ_DEF(tcpdnsconnect);
+NETIEVENT_SOCKET_REQ_DEF(tcpdnssend);
+NETIEVENT_SOCKET_HANDLE_DEF(tcpdnscancel);
+NETIEVENT_SOCKET_QUOTA_DEF(tcpdnsaccept);
+
+NETIEVENT_SOCKET_REQ_DEF(tcpconnect);
+NETIEVENT_SOCKET_REQ_DEF(tcpsend);
+NETIEVENT_SOCKET_REQ_DEF(udpconnect);
+NETIEVENT_SOCKET_REQ_RESULT_DEF(connectcb);
+NETIEVENT_SOCKET_REQ_RESULT_DEF(readcb);
+NETIEVENT_SOCKET_REQ_RESULT_DEF(sendcb);
+
+NETIEVENT_SOCKET_DEF(detach);
+NETIEVENT_SOCKET_HANDLE_DEF(tcpcancel);
+NETIEVENT_SOCKET_HANDLE_DEF(udpcancel);
+
+NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept);
+
+NETIEVENT_SOCKET_DEF(close);
+NETIEVENT_DEF(pause);
+NETIEVENT_DEF(resume);
+NETIEVENT_DEF(shutdown);
+NETIEVENT_DEF(stop);
+
+NETIEVENT_TASK_DEF(task);
+NETIEVENT_TASK_DEF(privilegedtask);
+
+void
+isc__nm_maybe_enqueue_ievent(isc__networker_t *worker,
+ isc__netievent_t *event) {
+ /*
+ * If we are already in the matching nmthread, process the ievent
+ * directly.
+ */
+ if (worker->id == isc_nm_tid()) {
+ process_netievent(worker, event);
+ return;
+ }
+
+ isc__nm_enqueue_ievent(worker, event);
+}
+
+void
+isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
+ netievent_type_t type;
+
+ if (event->type > netievent_prio) {
+ type = NETIEVENT_PRIORITY;
+ } else {
+ switch (event->type) {
+ case netievent_prio:
+ UNREACHABLE();
+ break;
+ case netievent_privilegedtask:
+ type = NETIEVENT_PRIVILEGED;
+ break;
+ case netievent_task:
+ type = NETIEVENT_TASK;
+ break;
+ default:
+ type = NETIEVENT_NORMAL;
+ break;
+ }
+ }
+
+ /*
+ * We need to make sure this signal will be delivered and
+ * the queue will be processed.
+ */
+ LOCK(&worker->ievents[type].lock);
+ ISC_LIST_ENQUEUE(worker->ievents[type].list, event, link);
+ if (type == NETIEVENT_PRIORITY) {
+ SIGNAL(&worker->ievents[type].cond);
+ }
+ UNLOCK(&worker->ievents[type].lock);
+
+ uv_async_send(&worker->async);
+}
+
+bool
+isc__nmsocket_active(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+ if (sock->parent != NULL) {
+ return (atomic_load(&sock->parent->active));
+ }
+
+ return (atomic_load(&sock->active));
+}
+
+bool
+isc__nmsocket_deactivate(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+
+ if (sock->parent != NULL) {
+ return (atomic_compare_exchange_strong(&sock->parent->active,
+ &(bool){ true }, false));
+ }
+
+ return (atomic_compare_exchange_strong(&sock->active, &(bool){ true },
+ false));
+}
+
+void
+isc___nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target FLARG) {
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(target != NULL && *target == NULL);
+
+ isc_nmsocket_t *rsock = NULL;
+
+ if (sock->parent != NULL) {
+ rsock = sock->parent;
+ INSIST(rsock->parent == NULL); /* sanity check */
+ } else {
+ rsock = sock;
+ }
+
+ NETMGR_TRACE_LOG("isc__nmsocket_attach():%p->references = %" PRIuFAST32
+ "\n",
+ rsock, isc_refcount_current(&rsock->references) + 1);
+
+ isc_refcount_increment0(&rsock->references);
+
+ *target = sock;
+}
+
+/*
+ * Free all resources inside a socket (including its children if any).
+ */
+static void
+nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) {
+ isc_nmhandle_t *handle = NULL;
+ isc__nm_uvreq_t *uvreq = NULL;
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(!isc__nmsocket_active(sock));
+
+ NETMGR_TRACE_LOG("nmsocket_cleanup():%p->references = %" PRIuFAST32
+ "\n",
+ sock, isc_refcount_current(&sock->references));
+
+ atomic_store(&sock->destroying, true);
+
+ if (sock->parent == NULL && sock->children != NULL) {
+ /*
+ * We shouldn't be here unless there are no active handles,
+ * so we can clean up and free the children.
+ */
+ for (size_t i = 0; i < sock->nchildren; i++) {
+ if (!atomic_load(&sock->children[i].destroying)) {
+ nmsocket_cleanup(&sock->children[i],
+ false FLARG_PASS);
+ }
+ }
+
+ /*
+ * This was a parent socket: destroy the listening
+ * barriers that synchronized the children.
+ */
+ isc_barrier_destroy(&sock->startlistening);
+ isc_barrier_destroy(&sock->stoplistening);
+
+ /*
+ * Now free them.
+ */
+ isc_mem_put(sock->mgr->mctx, sock->children,
+ sock->nchildren * sizeof(*sock));
+ sock->children = NULL;
+ sock->nchildren = 0;
+ }
+ if (sock->statsindex != NULL) {
+ isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
+ }
+
+ sock->statichandle = NULL;
+
+ if (sock->outerhandle != NULL) {
+ isc__nmhandle_detach(&sock->outerhandle FLARG_PASS);
+ }
+
+ if (sock->outer != NULL) {
+ isc___nmsocket_detach(&sock->outer FLARG_PASS);
+ }
+
+ while ((handle = isc_astack_pop(sock->inactivehandles)) != NULL) {
+ nmhandle_free(sock, handle);
+ }
+
+ if (sock->buf != NULL) {
+ isc_mem_free(sock->mgr->mctx, sock->buf);
+ }
+
+ if (sock->quota != NULL) {
+ isc_quota_detach(&sock->quota);
+ }
+
+ sock->pquota = NULL;
+
+ isc_astack_destroy(sock->inactivehandles);
+
+ while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) {
+ isc_mem_put(sock->mgr->mctx, uvreq, sizeof(*uvreq));
+ }
+
+ isc_astack_destroy(sock->inactivereqs);
+ sock->magic = 0;
+
+ isc_condition_destroy(&sock->scond);
+ isc_condition_destroy(&sock->cond);
+ isc_mutex_destroy(&sock->lock);
+#ifdef NETMGR_TRACE
+ LOCK(&sock->mgr->lock);
+ ISC_LIST_UNLINK(sock->mgr->active_sockets, sock, active_link);
+ UNLOCK(&sock->mgr->lock);
+#endif
+ if (dofree) {
+ isc_nm_t *mgr = sock->mgr;
+ isc_mem_put(mgr->mctx, sock, sizeof(*sock));
+ isc_nm_detach(&mgr);
+ } else {
+ isc_nm_detach(&sock->mgr);
+ }
+}
+
+static void
+nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) {
+ int active_handles;
+ bool destroy = false;
+
+ NETMGR_TRACE_LOG("%s():%p->references = %" PRIuFAST32 "\n", __func__,
+ sock, isc_refcount_current(&sock->references));
+
+ if (sock->parent != NULL) {
+ /*
+ * This is a child socket and cannot be destroyed except
+ * as a side effect of destroying the parent, so let's go
+ * see if the parent is ready to be destroyed.
+ */
+ nmsocket_maybe_destroy(sock->parent FLARG_PASS);
+ return;
+ }
+
+ /*
+ * This is a parent socket (or a standalone). See whether the
+ * children have active handles before deciding whether to
+ * accept destruction.
+ */
+ LOCK(&sock->lock);
+ if (atomic_load(&sock->active) || atomic_load(&sock->destroying) ||
+ !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0)
+ {
+ UNLOCK(&sock->lock);
+ return;
+ }
+
+ active_handles = atomic_load(&sock->ah);
+ if (sock->children != NULL) {
+ for (size_t i = 0; i < sock->nchildren; i++) {
+ LOCK(&sock->children[i].lock);
+ active_handles += atomic_load(&sock->children[i].ah);
+ UNLOCK(&sock->children[i].lock);
+ }
+ }
+
+ if (active_handles == 0 || sock->statichandle != NULL) {
+ destroy = true;
+ }
+
+ NETMGR_TRACE_LOG("%s:%p->active_handles = %d, .statichandle = %p\n",
+ __func__, sock, active_handles, sock->statichandle);
+
+ if (destroy) {
+ atomic_store(&sock->destroying, true);
+ UNLOCK(&sock->lock);
+ nmsocket_cleanup(sock, true FLARG_PASS);
+ } else {
+ UNLOCK(&sock->lock);
+ }
+}
+
+void
+isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
+ REQUIRE(sock->parent == NULL);
+
+ NETMGR_TRACE_LOG("isc___nmsocket_prep_destroy():%p->references = "
+ "%" PRIuFAST32 "\n",
+ sock, isc_refcount_current(&sock->references));
+
+ /*
+ * The final external reference to the socket is gone. We can try
+ * destroying the socket, but we have to wait for all the inflight
+ * handles to finish first.
+ */
+ atomic_store(&sock->active, false);
+
+ /*
+ * If the socket has children, they'll need to be marked inactive
+ * so they can be cleaned up too.
+ */
+ if (sock->children != NULL) {
+ for (size_t i = 0; i < sock->nchildren; i++) {
+ atomic_store(&sock->children[i].active, false);
+ }
+ }
+
+ /*
+ * If we're here then we already stopped listening; otherwise
+ * we'd have a hanging reference from the listening process.
+ *
+ * If it's a regular socket we may need to close it.
+ */
+ if (!atomic_load(&sock->closed)) {
+ switch (sock->type) {
+ case isc_nm_udpsocket:
+ isc__nm_udp_close(sock);
+ return;
+ case isc_nm_tcpsocket:
+ isc__nm_tcp_close(sock);
+ return;
+ case isc_nm_tcpdnssocket:
+ isc__nm_tcpdns_close(sock);
+ return;
+ default:
+ break;
+ }
+ }
+
+ nmsocket_maybe_destroy(sock FLARG_PASS);
+}
+
+void
+isc___nmsocket_detach(isc_nmsocket_t **sockp FLARG) {
+ REQUIRE(sockp != NULL && *sockp != NULL);
+ REQUIRE(VALID_NMSOCK(*sockp));
+
+ isc_nmsocket_t *sock = *sockp, *rsock = NULL;
+ *sockp = NULL;
+
+ /*
+ * If the socket is a part of a set (a child socket) we are
+ * counting references for the whole set at the parent.
+ */
+ if (sock->parent != NULL) {
+ rsock = sock->parent;
+ INSIST(rsock->parent == NULL); /* Sanity check */
+ } else {
+ rsock = sock;
+ }
+
+ NETMGR_TRACE_LOG("isc__nmsocket_detach():%p->references = %" PRIuFAST32
+ "\n",
+ rsock, isc_refcount_current(&rsock->references) - 1);
+
+ if (isc_refcount_decrement(&rsock->references) == 1) {
+ isc___nmsocket_prep_destroy(rsock FLARG_PASS);
+ }
+}
+
+void
+isc_nmsocket_close(isc_nmsocket_t **sockp) {
+ REQUIRE(sockp != NULL);
+ REQUIRE(VALID_NMSOCK(*sockp));
+ REQUIRE((*sockp)->type == isc_nm_udplistener ||
+ (*sockp)->type == isc_nm_tcplistener ||
+ (*sockp)->type == isc_nm_tcpdnslistener);
+
+ isc__nmsocket_detach(sockp);
+}
+
+void
+isc___nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
+ isc_sockaddr_t *iface FLARG) {
+ uint16_t family;
+
+ REQUIRE(sock != NULL);
+ REQUIRE(mgr != NULL);
+ REQUIRE(iface != NULL);
+
+ family = iface->type.sa.sa_family;
+
+ *sock = (isc_nmsocket_t){ .type = type,
+ .iface = *iface,
+ .fd = -1,
+ .inactivehandles = isc_astack_new(
+ mgr->mctx, ISC_NM_HANDLES_STACK_SIZE),
+ .inactivereqs = isc_astack_new(
+ mgr->mctx, ISC_NM_REQS_STACK_SIZE) };
+
+#if NETMGR_TRACE
+ sock->backtrace_size = backtrace(sock->backtrace, TRACE_SIZE);
+ ISC_LINK_INIT(sock, active_link);
+ ISC_LIST_INIT(sock->active_handles);
+ LOCK(&mgr->lock);
+ ISC_LIST_APPEND(mgr->active_sockets, sock, active_link);
+ UNLOCK(&mgr->lock);
+#endif
+
+ isc_nm_attach(mgr, &sock->mgr);
+ sock->uv_handle.handle.data = sock;
+
+ ISC_LINK_INIT(&sock->quotacb, link);
+
+ switch (type) {
+ case isc_nm_udpsocket:
+ case isc_nm_udplistener:
+ if (family == AF_INET) {
+ sock->statsindex = udp4statsindex;
+ } else {
+ sock->statsindex = udp6statsindex;
+ }
+ isc__nm_incstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
+ break;
+ case isc_nm_tcpsocket:
+ case isc_nm_tcplistener:
+ case isc_nm_tcpdnssocket:
+ case isc_nm_tcpdnslistener:
+ if (family == AF_INET) {
+ sock->statsindex = tcp4statsindex;
+ } else {
+ sock->statsindex = tcp6statsindex;
+ }
+ isc__nm_incstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
+ break;
+ default:
+ break;
+ }
+
+ isc_mutex_init(&sock->lock);
+ isc_condition_init(&sock->cond);
+ isc_condition_init(&sock->scond);
+ isc_refcount_init(&sock->references, 1);
+
+ NETMGR_TRACE_LOG("isc__nmsocket_init():%p->references = %" PRIuFAST32
+ "\n",
+ sock, isc_refcount_current(&sock->references));
+
+ atomic_init(&sock->active, true);
+ atomic_init(&sock->sequential, false);
+ atomic_init(&sock->readpaused, false);
+ atomic_init(&sock->closing, false);
+ atomic_init(&sock->listening, 0);
+ atomic_init(&sock->closed, 0);
+ atomic_init(&sock->destroying, 0);
+ atomic_init(&sock->ah, 0);
+ atomic_init(&sock->client, 0);
+ atomic_init(&sock->connecting, false);
+ atomic_init(&sock->keepalive, false);
+ atomic_init(&sock->connected, false);
+ atomic_init(&sock->timedout, false);
+
+ atomic_init(&sock->active_child_connections, 0);
+
+ sock->magic = NMSOCK_MAGIC;
+}
+
+void
+isc__nmsocket_clearcb(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(!isc__nm_in_netthread() || sock->tid == isc_nm_tid());
+
+ sock->recv_cb = NULL;
+ sock->recv_cbarg = NULL;
+ sock->accept_cb = NULL;
+ sock->accept_cbarg = NULL;
+ sock->connect_cb = NULL;
+ sock->connect_cbarg = NULL;
+}
+
+void
+isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) {
+ isc__networker_t *worker = NULL;
+
+ REQUIRE(VALID_NMSOCK(sock));
+
+ worker = &sock->mgr->workers[sock->tid];
+ REQUIRE(buf->base == worker->recvbuf);
+
+ worker->recvbuf_inuse = false;
+}
+
+static isc_nmhandle_t *
+alloc_handle(isc_nmsocket_t *sock) {
+ isc_nmhandle_t *handle =
+ isc_mem_get(sock->mgr->mctx,
+ sizeof(isc_nmhandle_t) + sock->extrahandlesize);
+
+ *handle = (isc_nmhandle_t){ .magic = NMHANDLE_MAGIC };
+#ifdef NETMGR_TRACE
+ ISC_LINK_INIT(handle, active_link);
+#endif
+ isc_refcount_init(&handle->references, 1);
+
+ return (handle);
+}
+
+isc_nmhandle_t *
+isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
+ isc_sockaddr_t *local FLARG) {
+ isc_nmhandle_t *handle = NULL;
+
+ REQUIRE(VALID_NMSOCK(sock));
+
+ handle = isc_astack_pop(sock->inactivehandles);
+
+ if (handle == NULL) {
+ handle = alloc_handle(sock);
+ } else {
+ isc_refcount_init(&handle->references, 1);
+ INSIST(VALID_NMHANDLE(handle));
+ }
+
+ NETMGR_TRACE_LOG(
+ "isc__nmhandle_get():handle %p->references = %" PRIuFAST32 "\n",
+ handle, isc_refcount_current(&handle->references));
+
+ isc___nmsocket_attach(sock, &handle->sock FLARG_PASS);
+
+#if NETMGR_TRACE
+ handle->backtrace_size = backtrace(handle->backtrace, TRACE_SIZE);
+#endif
+
+ if (peer != NULL) {
+ handle->peer = *peer;
+ } else {
+ handle->peer = sock->peer;
+ }
+
+ if (local != NULL) {
+ handle->local = *local;
+ } else {
+ handle->local = sock->iface;
+ }
+
+ (void)atomic_fetch_add(&sock->ah, 1);
+
+#ifdef NETMGR_TRACE
+ LOCK(&sock->lock);
+ ISC_LIST_APPEND(sock->active_handles, handle, active_link);
+ UNLOCK(&sock->lock);
+#endif
+
+ switch (sock->type) {
+ case isc_nm_udpsocket:
+ case isc_nm_tcpdnssocket:
+ if (!atomic_load(&sock->client)) {
+ break;
+ }
+ FALLTHROUGH;
+ case isc_nm_tcpsocket:
+ INSIST(sock->statichandle == NULL);
+
+ /*
+ * statichandle must be assigned, not attached;
+ * otherwise, if a handle was detached elsewhere
+ * it could never reach 0 references, and the
+ * handle and socket would never be freed.
+ */
+ sock->statichandle = handle;
+ break;
+ default:
+ break;
+ }
+
+ return (handle);
+}
+
+void
+isc__nmhandle_attach(isc_nmhandle_t *handle, isc_nmhandle_t **handlep FLARG) {
+ REQUIRE(VALID_NMHANDLE(handle));
+ REQUIRE(handlep != NULL && *handlep == NULL);
+
+ NETMGR_TRACE_LOG("isc__nmhandle_attach():handle %p->references = "
+ "%" PRIuFAST32 "\n",
+ handle, isc_refcount_current(&handle->references) + 1);
+
+ isc_refcount_increment(&handle->references);
+ *handlep = handle;
+}
+
+bool
+isc_nmhandle_is_stream(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ return (handle->sock->type == isc_nm_tcpsocket ||
+ handle->sock->type == isc_nm_tcpdnssocket);
+}
+
+static void
+nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
+ size_t extra = sock->extrahandlesize;
+
+ isc_refcount_destroy(&handle->references);
+
+ if (handle->dofree != NULL) {
+ handle->dofree(handle->opaque);
+ }
+
+ *handle = (isc_nmhandle_t){ .magic = 0 };
+
+ isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra);
+}
+
+static void
+nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
+ bool reuse = false;
+
+ /*
+ * We do all of this under lock to avoid races with socket
+ * destruction. We have to do this now, because at this point the
+ * socket is either unused or still attached to event->sock.
+ */
+ LOCK(&sock->lock);
+
+#ifdef NETMGR_TRACE
+ ISC_LIST_UNLINK(sock->active_handles, handle, active_link);
+#endif
+
+ INSIST(atomic_fetch_sub(&sock->ah, 1) > 0);
+
+#if !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__
+ if (atomic_load(&sock->active)) {
+ reuse = isc_astack_trypush(sock->inactivehandles, handle);
+ }
+#endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
+ if (!reuse) {
+ nmhandle_free(sock, handle);
+ }
+ UNLOCK(&sock->lock);
+}
+
+void
+isc__nmhandle_detach(isc_nmhandle_t **handlep FLARG) {
+ isc_nmsocket_t *sock = NULL;
+ isc_nmhandle_t *handle = NULL;
+
+ REQUIRE(handlep != NULL);
+ REQUIRE(VALID_NMHANDLE(*handlep));
+
+ handle = *handlep;
+ *handlep = NULL;
+
+ /*
+ * If the closehandle_cb is set, it needs to run asynchronously to
+ * ensure correct ordering of the isc__nm_process_sock_buffer().
+ */
+ sock = handle->sock;
+ if (sock->tid == isc_nm_tid() && sock->closehandle_cb == NULL) {
+ nmhandle_detach_cb(&handle FLARG_PASS);
+ } else {
+ isc__netievent_detach_t *event =
+ isc__nm_get_netievent_detach(sock->mgr, sock);
+ /*
+ * we are using implicit "attach" as the last reference
+ * need to be destroyed explicitly in the async callback
+ */
+ event->handle = handle;
+ FLARG_IEVENT_PASS(event);
+ isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *)event);
+ }
+}
+
+void
+isc__nmsocket_shutdown(isc_nmsocket_t *sock);
+
+static void
+nmhandle_detach_cb(isc_nmhandle_t **handlep FLARG) {
+ isc_nmsocket_t *sock = NULL;
+ isc_nmhandle_t *handle = NULL;
+
+ REQUIRE(handlep != NULL);
+ REQUIRE(VALID_NMHANDLE(*handlep));
+
+ handle = *handlep;
+ *handlep = NULL;
+
+ NETMGR_TRACE_LOG("isc__nmhandle_detach():%p->references = %" PRIuFAST32
+ "\n",
+ handle, isc_refcount_current(&handle->references) - 1);
+
+ if (isc_refcount_decrement(&handle->references) > 1) {
+ return;
+ }
+
+ /* We need an acquire memory barrier here */
+ (void)isc_refcount_current(&handle->references);
+
+ sock = handle->sock;
+ handle->sock = NULL;
+
+ if (handle->doreset != NULL) {
+ handle->doreset(handle->opaque);
+ }
+
+ nmhandle_deactivate(sock, handle);
+
+ /*
+ * The handle is gone now. If the socket has a callback configured
+ * for that (e.g., to perform cleanup after request processing),
+ * call it now, or schedule it to run asynchronously.
+ */
+ if (sock->closehandle_cb != NULL) {
+ if (sock->tid == isc_nm_tid()) {
+ sock->closehandle_cb(sock);
+ } else {
+ isc__netievent_close_t *event =
+ isc__nm_get_netievent_close(sock->mgr, sock);
+ isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *)event);
+ }
+ }
+
+ if (handle == sock->statichandle) {
+ /* statichandle is assigned, not attached. */
+ sock->statichandle = NULL;
+ }
+
+ isc___nmsocket_detach(&sock FLARG_PASS);
+}
+
+void *
+isc_nmhandle_getdata(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ return (handle->opaque);
+}
+
+void
+isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
+ isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ handle->opaque = arg;
+ handle->doreset = doreset;
+ handle->dofree = dofree;
+}
+
+void
+isc__nm_alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
+ REQUIRE(len <= NM_BIG_BUF);
+
+ if (sock->buf == NULL) {
+ /* We don't have the buffer at all */
+ size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF;
+ sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len);
+ sock->buf_size = alloc_len;
+ } else {
+ /* We have the buffer but it's too small */
+ sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf,
+ NM_BIG_BUF);
+ sock->buf_size = NM_BIG_BUF;
+ }
+}
+
+void
+isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+ isc_result_t eresult) {
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_UVREQ(req));
+
+ if (req->cb.send != NULL) {
+ isc__nm_sendcb(sock, req, eresult, true);
+ } else {
+ isc__nm_uvreq_put(&req, sock);
+ }
+}
+
+void
+isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
+ REQUIRE(sock->accepting);
+ REQUIRE(sock->server);
+
+ /*
+ * Detach the quota early to make room for other connections;
+ * otherwise it'd be detached later asynchronously, and clog
+ * the quota unnecessarily.
+ */
+ if (sock->quota != NULL) {
+ isc_quota_detach(&sock->quota);
+ }
+
+ isc__nmsocket_detach(&sock->server);
+
+ sock->accepting = false;
+
+ switch (eresult) {
+ case ISC_R_NOTCONNECTED:
+ /* IGNORE: The client disconnected before we could accept */
+ break;
+ default:
+ isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
+ ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
+ "Accepting TCP connection failed: %s",
+ isc_result_totext(eresult));
+ }
+}
+
+void
+isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+ isc_result_t eresult, bool async) {
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_UVREQ(req));
+ REQUIRE(sock->tid == isc_nm_tid());
+ REQUIRE(req->cb.connect != NULL);
+
+ isc__nmsocket_timer_stop(sock);
+ uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
+
+ INSIST(atomic_compare_exchange_strong(&sock->connecting,
+ &(bool){ true }, false));
+
+ isc__nmsocket_clearcb(sock);
+ isc__nm_connectcb(sock, req, eresult, async);
+
+ isc__nmsocket_prep_destroy(sock);
+}
+
+void
+isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, bool async) {
+ REQUIRE(VALID_NMSOCK(sock));
+ UNUSED(async);
+
+ switch (sock->type) {
+ case isc_nm_udpsocket:
+ isc__nm_udp_failed_read_cb(sock, result);
+ return;
+ case isc_nm_tcpsocket:
+ isc__nm_tcp_failed_read_cb(sock, result);
+ return;
+ case isc_nm_tcpdnssocket:
+ isc__nm_tcpdns_failed_read_cb(sock, result);
+ return;
+ default:
+ UNREACHABLE();
+ }
+}
+
+void
+isc__nmsocket_connecttimeout_cb(uv_timer_t *timer) {
+ uv_connect_t *uvreq = uv_handle_get_data((uv_handle_t *)timer);
+ isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
+ isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)uvreq);
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(sock->tid == isc_nm_tid());
+ REQUIRE(atomic_load(&sock->connecting));
+ REQUIRE(VALID_UVREQ(req));
+ REQUIRE(VALID_NMHANDLE(req->handle));
+
+ isc__nmsocket_timer_stop(sock);
+
+ /*
+ * Mark the connection as timed out and shutdown the socket.
+ */
+
+ INSIST(atomic_compare_exchange_strong(&sock->timedout, &(bool){ false },
+ true));
+ isc__nmsocket_clearcb(sock);
+ isc__nmsocket_shutdown(sock);
+}
+
+void
+isc__nm_accept_connection_log(isc_result_t result, bool can_log_quota) {
+ int level;
+
+ switch (result) {
+ case ISC_R_SUCCESS:
+ case ISC_R_NOCONN:
+ return;
+ case ISC_R_QUOTA:
+ case ISC_R_SOFTQUOTA:
+ if (!can_log_quota) {
+ return;
+ }
+ level = ISC_LOG_INFO;
+ break;
+ case ISC_R_NOTCONNECTED:
+ level = ISC_LOG_INFO;
+ break;
+ default:
+ level = ISC_LOG_ERROR;
+ }
+
+ isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
+ level, "Accepting TCP connection failed: %s",
+ isc_result_totext(result));
+}
+
+void
+isc__nmsocket_writetimeout_cb(void *data, isc_result_t eresult) {
+ isc__nm_uvreq_t *req = data;
+ isc_nmsocket_t *sock = NULL;
+
+ REQUIRE(eresult == ISC_R_TIMEDOUT);
+ REQUIRE(VALID_UVREQ(req));
+ REQUIRE(VALID_NMSOCK(req->sock));
+
+ sock = req->sock;
+
+ isc__nmsocket_reset(sock);
+}
+
+void
+isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
+ isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)timer);
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(sock->tid == isc_nm_tid());
+ REQUIRE(sock->reading);
+
+ if (atomic_load(&sock->client)) {
+ uv_timer_stop(timer);
+
+ if (sock->recv_cb != NULL) {
+ isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
+ isc__nm_readcb(sock, req, ISC_R_TIMEDOUT);
+ }
+
+ if (!isc__nmsocket_timer_running(sock)) {
+ isc__nmsocket_clearcb(sock);
+ isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false);
+ }
+ } else {
+ isc__nm_failed_read_cb(sock, ISC_R_TIMEDOUT, false);
+ }
+}
+
+void
+isc__nmsocket_timer_restart(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+
+ if (atomic_load(&sock->connecting)) {
+ int r;
+
+ if (sock->connect_timeout == 0) {
+ return;
+ }
+
+ r = uv_timer_start(&sock->read_timer,
+ isc__nmsocket_connecttimeout_cb,
+ sock->connect_timeout + 10, 0);
+ UV_RUNTIME_CHECK(uv_timer_start, r);
+
+ } else {
+ int r;
+
+ if (sock->read_timeout == 0) {
+ return;
+ }
+
+ r = uv_timer_start(&sock->read_timer,
+ isc__nmsocket_readtimeout_cb,
+ sock->read_timeout, 0);
+ UV_RUNTIME_CHECK(uv_timer_start, r);
+ }
+}
+
+bool
+isc__nmsocket_timer_running(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+
+ return (uv_is_active((uv_handle_t *)&sock->read_timer));
+}
+
+void
+isc__nmsocket_timer_start(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+
+ if (isc__nmsocket_timer_running(sock)) {
+ return;
+ }
+
+ isc__nmsocket_timer_restart(sock);
+}
+
+void
+isc__nmsocket_timer_stop(isc_nmsocket_t *sock) {
+ int r;
+
+ REQUIRE(VALID_NMSOCK(sock));
+
+ /* uv_timer_stop() is idempotent, no need to check if running */
+
+ r = uv_timer_stop(&sock->read_timer);
+ UV_RUNTIME_CHECK(uv_timer_stop, r);
+}
+
+isc__nm_uvreq_t *
+isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) {
+ isc__nm_uvreq_t *req = NULL;
+
+ req = isc__nm_uvreq_get(sock->mgr, sock);
+ req->cb.recv = sock->recv_cb;
+ req->cbarg = sock->recv_cbarg;
+
+ switch (sock->type) {
+ case isc_nm_tcpsocket:
+ isc_nmhandle_attach(sock->statichandle, &req->handle);
+ break;
+ default:
+ if (atomic_load(&sock->client)) {
+ isc_nmhandle_attach(sock->statichandle, &req->handle);
+ } else {
+ req->handle = isc__nmhandle_get(sock, sockaddr, NULL);
+ }
+ break;
+ }
+
+ return (req);
+}
+
+/*%<
+ * Allocator callback for read operations.
+ *
+ * Note this doesn't actually allocate anything, it just assigns the
+ * worker's receive buffer to a socket, and marks it as "in use".
+ */
+void
+isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
+ isc_nmsocket_t *sock = uv_handle_get_data(handle);
+ isc__networker_t *worker = NULL;
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(isc__nm_in_netthread());
+ /*
+ * The size provided by libuv is only suggested size, and it always
+ * defaults to 64 * 1024 in the current versions of libuv (see
+ * src/unix/udp.c and src/unix/stream.c).
+ */
+ UNUSED(size);
+
+ worker = &sock->mgr->workers[sock->tid];
+ INSIST(!worker->recvbuf_inuse);
+ INSIST(worker->recvbuf != NULL);
+
+ switch (sock->type) {
+ case isc_nm_udpsocket:
+ buf->len = ISC_NETMGR_UDP_RECVBUF_SIZE;
+ break;
+ case isc_nm_tcpsocket:
+ case isc_nm_tcpdnssocket:
+ buf->len = ISC_NETMGR_TCP_RECVBUF_SIZE;
+ break;
+ default:
+ UNREACHABLE();
+ }
+
+ REQUIRE(buf->len <= ISC_NETMGR_RECVBUF_SIZE);
+ buf->base = worker->recvbuf;
+
+ worker->recvbuf_inuse = true;
+}
+
+isc_result_t
+isc__nm_start_reading(isc_nmsocket_t *sock) {
+ isc_result_t result = ISC_R_SUCCESS;
+ int r;
+
+ if (sock->reading) {
+ return (ISC_R_SUCCESS);
+ }
+
+ switch (sock->type) {
+ case isc_nm_udpsocket:
+ r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb,
+ isc__nm_udp_read_cb);
+ break;
+ case isc_nm_tcpsocket:
+ r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb,
+ isc__nm_tcp_read_cb);
+ break;
+ case isc_nm_tcpdnssocket:
+ r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb,
+ isc__nm_tcpdns_read_cb);
+ break;
+ default:
+ UNREACHABLE();
+ }
+
+ if (r != 0) {
+ result = isc__nm_uverr2result(r);
+ } else {
+ sock->reading = true;
+ }
+
+ return (result);
+}
+
+void
+isc__nm_stop_reading(isc_nmsocket_t *sock) {
+ int r;
+
+ if (!sock->reading) {
+ return;
+ }
+
+ switch (sock->type) {
+ case isc_nm_udpsocket:
+ r = uv_udp_recv_stop(&sock->uv_handle.udp);
+ UV_RUNTIME_CHECK(uv_udp_recv_stop, r);
+ break;
+ case isc_nm_tcpsocket:
+ case isc_nm_tcpdnssocket:
+ r = uv_read_stop(&sock->uv_handle.stream);
+ UV_RUNTIME_CHECK(uv_read_stop, r);
+ break;
+ default:
+ UNREACHABLE();
+ }
+ sock->reading = false;
+}
+
+bool
+isc__nm_closing(isc_nmsocket_t *sock) {
+ return (atomic_load(&sock->mgr->closing));
+}
+
+bool
+isc__nmsocket_closing(isc_nmsocket_t *sock) {
+ return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) ||
+ atomic_load(&sock->mgr->closing) ||
+ (sock->server != NULL && !isc__nmsocket_active(sock->server)));
+}
+
+static isc_result_t
+processbuffer(isc_nmsocket_t *sock) {
+ switch (sock->type) {
+ case isc_nm_tcpdnssocket:
+ return (isc__nm_tcpdns_processbuffer(sock));
+ default:
+ UNREACHABLE();
+ }
+}
+
+/*
+ * Process a DNS message.
+ *
+ * If we only have an incomplete DNS message, we don't touch any
+ * timers. If we do have a full message, reset the timer.
+ *
+ * Stop reading if this is a client socket, or if the server socket
+ * has been set to sequential mode, or the number of queries we are
+ * processing simultaneously has reached the clients-per-connection
+ * limit. In this case we'll be called again by resume_processing()
+ * later.
+ */
+isc_result_t
+isc__nm_process_sock_buffer(isc_nmsocket_t *sock) {
+ for (;;) {
+ int_fast32_t ah = atomic_load(&sock->ah);
+ isc_result_t result = processbuffer(sock);
+ switch (result) {
+ case ISC_R_NOMORE:
+ /*
+ * Don't reset the timer until we have a
+ * full DNS message.
+ */
+ result = isc__nm_start_reading(sock);
+ if (result != ISC_R_SUCCESS) {
+ return (result);
+ }
+ /*
+ * Start the timer only if there are no externally used
+ * active handles, there's always one active handle
+ * attached internally to sock->recv_handle in
+ * accept_connection()
+ */
+ if (ah == 1) {
+ isc__nmsocket_timer_start(sock);
+ }
+ goto done;
+ case ISC_R_CANCELED:
+ isc__nmsocket_timer_stop(sock);
+ isc__nm_stop_reading(sock);
+ goto done;
+ case ISC_R_SUCCESS:
+ /*
+ * Stop the timer on the successful message read, this
+ * also allows to restart the timer when we have no more
+ * data.
+ */
+ isc__nmsocket_timer_stop(sock);
+
+ if (atomic_load(&sock->client) ||
+ atomic_load(&sock->sequential) ||
+ ah >= STREAM_CLIENTS_PER_CONN)
+ {
+ isc__nm_stop_reading(sock);
+ goto done;
+ }
+ break;
+ default:
+ UNREACHABLE();
+ }
+ }
+done:
+ return (ISC_R_SUCCESS);
+}
+
+void
+isc__nm_resume_processing(void *arg) {
+ isc_nmsocket_t *sock = (isc_nmsocket_t *)arg;
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(sock->tid == isc_nm_tid());
+ REQUIRE(!atomic_load(&sock->client));
+
+ if (isc__nmsocket_closing(sock)) {
+ return;
+ }
+
+ isc__nm_process_sock_buffer(sock);
+}
+
+void
+isc_nmhandle_cleartimeout(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+ REQUIRE(VALID_NMSOCK(handle->sock));
+
+ switch (handle->sock->type) {
+ default:
+ handle->sock->read_timeout = 0;
+
+ if (uv_is_active((uv_handle_t *)&handle->sock->read_timer)) {
+ isc__nmsocket_timer_stop(handle->sock);
+ }
+ }
+}
+
+void
+isc_nmhandle_settimeout(isc_nmhandle_t *handle, uint32_t timeout) {
+ REQUIRE(VALID_NMHANDLE(handle));
+ REQUIRE(VALID_NMSOCK(handle->sock));
+
+ switch (handle->sock->type) {
+ default:
+ handle->sock->read_timeout = timeout;
+ isc__nmsocket_timer_restart(handle->sock);
+ }
+}
+
+void
+isc_nmhandle_keepalive(isc_nmhandle_t *handle, bool value) {
+ isc_nmsocket_t *sock = NULL;
+
+ REQUIRE(VALID_NMHANDLE(handle));
+ REQUIRE(VALID_NMSOCK(handle->sock));
+
+ sock = handle->sock;
+
+ switch (sock->type) {
+ case isc_nm_tcpsocket:
+ case isc_nm_tcpdnssocket:
+ atomic_store(&sock->keepalive, value);
+ sock->read_timeout = value ? atomic_load(&sock->mgr->keepalive)
+ : atomic_load(&sock->mgr->idle);
+ sock->write_timeout = value ? atomic_load(&sock->mgr->keepalive)
+ : atomic_load(&sock->mgr->idle);
+ break;
+ default:
+ /*
+ * For any other protocol, this is a no-op.
+ */
+ return;
+ }
+}
+
+void *
+isc_nmhandle_getextra(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ return (handle->extra);
+}
+
+isc_sockaddr_t
+isc_nmhandle_peeraddr(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ return (handle->peer);
+}
+
+isc_sockaddr_t
+isc_nmhandle_localaddr(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ return (handle->local);
+}
+
+isc_nm_t *
+isc_nmhandle_netmgr(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+ REQUIRE(VALID_NMSOCK(handle->sock));
+
+ return (handle->sock->mgr);
+}
+
+isc__nm_uvreq_t *
+isc___nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock FLARG) {
+ isc__nm_uvreq_t *req = NULL;
+
+ REQUIRE(VALID_NM(mgr));
+ REQUIRE(VALID_NMSOCK(sock));
+
+ if (sock != NULL && isc__nmsocket_active(sock)) {
+ /* Try to reuse one */
+ req = isc_astack_pop(sock->inactivereqs);
+ }
+
+ if (req == NULL) {
+ req = isc_mem_get(mgr->mctx, sizeof(*req));
+ }
+
+ *req = (isc__nm_uvreq_t){ .magic = 0 };
+ ISC_LINK_INIT(req, link);
+ req->uv_req.req.data = req;
+ isc___nmsocket_attach(sock, &req->sock FLARG_PASS);
+ req->magic = UVREQ_MAGIC;
+
+ return (req);
+}
+
+void
+isc___nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock FLARG) {
+ isc__nm_uvreq_t *req = NULL;
+ isc_nmhandle_t *handle = NULL;
+
+ REQUIRE(req0 != NULL);
+ REQUIRE(VALID_UVREQ(*req0));
+
+ req = *req0;
+ *req0 = NULL;
+
+ INSIST(sock == req->sock);
+
+ req->magic = 0;
+
+ /*
+ * We need to save this first to make sure that handle,
+ * sock, and the netmgr won't all disappear.
+ */
+ handle = req->handle;
+ req->handle = NULL;
+
+#if !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__
+ if (!isc__nmsocket_active(sock) ||
+ !isc_astack_trypush(sock->inactivereqs, req))
+ {
+ isc_mem_put(sock->mgr->mctx, req, sizeof(*req));
+ }
+#else /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
+ isc_mem_put(sock->mgr->mctx, req, sizeof(*req));
+#endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
+
+ if (handle != NULL) {
+ isc__nmhandle_detach(&handle FLARG_PASS);
+ }
+
+ isc___nmsocket_detach(&sock FLARG_PASS);
+}
+
+void
+isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
+ void *cbarg) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ switch (handle->sock->type) {
+ case isc_nm_udpsocket:
+ case isc_nm_udplistener:
+ isc__nm_udp_send(handle, region, cb, cbarg);
+ break;
+ case isc_nm_tcpsocket:
+ isc__nm_tcp_send(handle, region, cb, cbarg);
+ break;
+ case isc_nm_tcpdnssocket:
+ isc__nm_tcpdns_send(handle, region, cb, cbarg);
+ break;
+ default:
+ UNREACHABLE();
+ }
+}
+
+void
+isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ /*
+ * This is always called via callback (from accept or connect), and
+ * caller must attach to the handle, so the references always need to be
+ * at least 2.
+ */
+ REQUIRE(isc_refcount_current(&handle->references) >= 2);
+
+ switch (handle->sock->type) {
+ case isc_nm_udpsocket:
+ isc__nm_udp_read(handle, cb, cbarg);
+ break;
+ case isc_nm_tcpsocket:
+ isc__nm_tcp_read(handle, cb, cbarg);
+ break;
+ case isc_nm_tcpdnssocket:
+ isc__nm_tcpdns_read(handle, cb, cbarg);
+ break;
+ default:
+ UNREACHABLE();
+ }
+}
+
+void
+isc_nm_cancelread(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ switch (handle->sock->type) {
+ case isc_nm_udpsocket:
+ isc__nm_udp_cancelread(handle);
+ break;
+ case isc_nm_tcpsocket:
+ isc__nm_tcp_cancelread(handle);
+ break;
+ case isc_nm_tcpdnssocket:
+ isc__nm_tcpdns_cancelread(handle);
+ break;
+ default:
+ UNREACHABLE();
+ }
+}
+
+void
+isc_nm_pauseread(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ isc_nmsocket_t *sock = handle->sock;
+
+ switch (sock->type) {
+ case isc_nm_tcpsocket:
+ isc__nm_tcp_pauseread(handle);
+ break;
+ default:
+ UNREACHABLE();
+ }
+}
+
+void
+isc_nm_resumeread(isc_nmhandle_t *handle) {
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ isc_nmsocket_t *sock = handle->sock;
+
+ switch (sock->type) {
+ case isc_nm_tcpsocket:
+ isc__nm_tcp_resumeread(handle);
+ break;
+ default:
+ UNREACHABLE();
+ }
+}
+
+void
+isc_nm_stoplistening(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+
+ switch (sock->type) {
+ case isc_nm_udplistener:
+ isc__nm_udp_stoplistening(sock);
+ break;
+ case isc_nm_tcpdnslistener:
+ isc__nm_tcpdns_stoplistening(sock);
+ break;
+ case isc_nm_tcplistener:
+ isc__nm_tcp_stoplistening(sock);
+ break;
+ default:
+ UNREACHABLE();
+ }
+}
+
+void
+isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+ isc_result_t eresult, bool async) {
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_UVREQ(uvreq));
+ REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+ if (!async) {
+ isc__netievent_connectcb_t ievent = { .sock = sock,
+ .req = uvreq,
+ .result = eresult };
+ isc__nm_async_connectcb(NULL, (isc__netievent_t *)&ievent);
+ } else {
+ isc__netievent_connectcb_t *ievent =
+ isc__nm_get_netievent_connectcb(sock->mgr, sock, uvreq,
+ eresult);
+ isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *)ievent);
+ }
+}
+
+void
+isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc__netievent_connectcb_t *ievent = (isc__netievent_connectcb_t *)ev0;
+ isc_nmsocket_t *sock = ievent->sock;
+ isc__nm_uvreq_t *uvreq = ievent->req;
+ isc_result_t eresult = ievent->result;
+
+ UNUSED(worker);
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_UVREQ(uvreq));
+ REQUIRE(VALID_NMHANDLE(uvreq->handle));
+ REQUIRE(ievent->sock->tid == isc_nm_tid());
+ REQUIRE(uvreq->cb.connect != NULL);
+
+ uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);
+
+ isc__nm_uvreq_put(&uvreq, sock);
+}
+
+void
+isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+ isc_result_t eresult) {
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_UVREQ(uvreq));
+ REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+ if (eresult == ISC_R_SUCCESS || eresult == ISC_R_TIMEDOUT) {
+ isc__netievent_readcb_t ievent = { .sock = sock,
+ .req = uvreq,
+ .result = eresult };
+
+ isc__nm_async_readcb(NULL, (isc__netievent_t *)&ievent);
+ } else {
+ isc__netievent_readcb_t *ievent = isc__nm_get_netievent_readcb(
+ sock->mgr, sock, uvreq, eresult);
+ isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *)ievent);
+ }
+}
+
+void
+isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc__netievent_readcb_t *ievent = (isc__netievent_readcb_t *)ev0;
+ isc_nmsocket_t *sock = ievent->sock;
+ isc__nm_uvreq_t *uvreq = ievent->req;
+ isc_result_t eresult = ievent->result;
+ isc_region_t region;
+
+ UNUSED(worker);
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_UVREQ(uvreq));
+ REQUIRE(VALID_NMHANDLE(uvreq->handle));
+ REQUIRE(sock->tid == isc_nm_tid());
+
+ region.base = (unsigned char *)uvreq->uvbuf.base;
+ region.length = uvreq->uvbuf.len;
+
+ uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);
+
+ isc__nm_uvreq_put(&uvreq, sock);
+}
+
+void
+isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+ isc_result_t eresult, bool async) {
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_UVREQ(uvreq));
+ REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+ if (!async) {
+ isc__netievent_sendcb_t ievent = { .sock = sock,
+ .req = uvreq,
+ .result = eresult };
+ isc__nm_async_sendcb(NULL, (isc__netievent_t *)&ievent);
+ return;
+ }
+
+ isc__netievent_sendcb_t *ievent =
+ isc__nm_get_netievent_sendcb(sock->mgr, sock, uvreq, eresult);
+ isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *)ievent);
+}
+
+void
+isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc__netievent_sendcb_t *ievent = (isc__netievent_sendcb_t *)ev0;
+ isc_nmsocket_t *sock = ievent->sock;
+ isc__nm_uvreq_t *uvreq = ievent->req;
+ isc_result_t eresult = ievent->result;
+
+ UNUSED(worker);
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_UVREQ(uvreq));
+ REQUIRE(VALID_NMHANDLE(uvreq->handle));
+ REQUIRE(sock->tid == isc_nm_tid());
+
+ uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg);
+
+ isc__nm_uvreq_put(&uvreq, sock);
+}
+
+static void
+isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc__netievent_close_t *ievent = (isc__netievent_close_t *)ev0;
+ isc_nmsocket_t *sock = ievent->sock;
+
+ REQUIRE(VALID_NMSOCK(ievent->sock));
+ REQUIRE(sock->tid == isc_nm_tid());
+ REQUIRE(sock->closehandle_cb != NULL);
+
+ UNUSED(worker);
+
+ ievent->sock->closehandle_cb(sock);
+}
+
+void
+isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc__netievent_detach_t *ievent = (isc__netievent_detach_t *)ev0;
+ FLARG_IEVENT(ievent);
+
+ REQUIRE(VALID_NMSOCK(ievent->sock));
+ REQUIRE(VALID_NMHANDLE(ievent->handle));
+ REQUIRE(ievent->sock->tid == isc_nm_tid());
+
+ UNUSED(worker);
+
+ nmhandle_detach_cb(&ievent->handle FLARG_PASS);
+}
+
+static void
+reset_shutdown(uv_handle_t *handle) {
+ isc_nmsocket_t *sock = uv_handle_get_data(handle);
+
+ isc__nmsocket_shutdown(sock);
+ isc__nmsocket_detach(&sock);
+}
+
+void
+isc__nmsocket_reset(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+
+ switch (sock->type) {
+ case isc_nm_tcpsocket:
+ case isc_nm_tcpdnssocket:
+ /*
+ * This can be called from the TCP write timeout.
+ */
+ REQUIRE(sock->parent == NULL);
+ break;
+ default:
+ UNREACHABLE();
+ break;
+ }
+
+ if (!uv_is_closing(&sock->uv_handle.handle) &&
+ uv_is_active(&sock->uv_handle.handle))
+ {
+ /*
+ * The real shutdown will be handled in the respective
+ * close functions.
+ */
+ isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL });
+ int r = uv_tcp_close_reset(&sock->uv_handle.tcp,
+ reset_shutdown);
+ UV_RUNTIME_CHECK(uv_tcp_close_reset, r);
+ } else {
+ isc__nmsocket_shutdown(sock);
+ }
+}
+
+void
+isc__nmsocket_shutdown(isc_nmsocket_t *sock) {
+ REQUIRE(VALID_NMSOCK(sock));
+ switch (sock->type) {
+ case isc_nm_udpsocket:
+ isc__nm_udp_shutdown(sock);
+ break;
+ case isc_nm_tcpsocket:
+ isc__nm_tcp_shutdown(sock);
+ break;
+ case isc_nm_tcpdnssocket:
+ isc__nm_tcpdns_shutdown(sock);
+ break;
+ case isc_nm_udplistener:
+ case isc_nm_tcplistener:
+ case isc_nm_tcpdnslistener:
+ return;
+ default:
+ UNREACHABLE();
+ }
+}
+
+static void
+shutdown_walk_cb(uv_handle_t *handle, void *arg) {
+ isc_nmsocket_t *sock = uv_handle_get_data(handle);
+ UNUSED(arg);
+
+ if (uv_is_closing(handle)) {
+ return;
+ }
+
+ switch (handle->type) {
+ case UV_UDP:
+ isc__nmsocket_shutdown(sock);
+ return;
+ case UV_TCP:
+ switch (sock->type) {
+ case isc_nm_tcpsocket:
+ case isc_nm_tcpdnssocket:
+ if (sock->parent == NULL) {
+ /* Reset the TCP connections on shutdown */
+ isc__nmsocket_reset(sock);
+ return;
+ }
+ FALLTHROUGH;
+ default:
+ isc__nmsocket_shutdown(sock);
+ }
+
+ return;
+ default:
+ return;
+ }
+}
+
+void
+isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) {
+ UNUSED(ev0);
+
+ uv_walk(&worker->loop, shutdown_walk_cb, NULL);
+}
+
+bool
+isc__nm_acquire_interlocked(isc_nm_t *mgr) {
+ if (!isc__nm_in_netthread()) {
+ return (false);
+ }
+
+ LOCK(&mgr->lock);
+ bool success = atomic_compare_exchange_strong(
+ &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED },
+ isc_nm_tid());
+
+ UNLOCK(&mgr->lock);
+ return (success);
+}
+
+void
+isc__nm_drop_interlocked(isc_nm_t *mgr) {
+ if (!isc__nm_in_netthread()) {
+ return;
+ }
+
+ LOCK(&mgr->lock);
+ int tid = atomic_exchange(&mgr->interlocked,
+ ISC_NETMGR_NON_INTERLOCKED);
+ INSIST(tid != ISC_NETMGR_NON_INTERLOCKED);
+ BROADCAST(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
+}
+
+void
+isc__nm_acquire_interlocked_force(isc_nm_t *mgr) {
+ if (!isc__nm_in_netthread()) {
+ return;
+ }
+
+ LOCK(&mgr->lock);
+ while (!atomic_compare_exchange_strong(
+ &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED },
+ isc_nm_tid()))
+ {
+ WAIT(&mgr->wkstatecond, &mgr->lock);
+ }
+ UNLOCK(&mgr->lock);
+}
+
+void
+isc_nm_setstats(isc_nm_t *mgr, isc_stats_t *stats) {
+ REQUIRE(VALID_NM(mgr));
+ REQUIRE(mgr->stats == NULL);
+ REQUIRE(isc_stats_ncounters(stats) == isc_sockstatscounter_max);
+
+ isc_stats_attach(stats, &mgr->stats);
+}
+
+void
+isc__nm_incstats(isc_nm_t *mgr, isc_statscounter_t counterid) {
+ REQUIRE(VALID_NM(mgr));
+ REQUIRE(counterid != -1);
+
+ if (mgr->stats != NULL) {
+ isc_stats_increment(mgr->stats, counterid);
+ }
+}
+
+void
+isc__nm_decstats(isc_nm_t *mgr, isc_statscounter_t counterid) {
+ REQUIRE(VALID_NM(mgr));
+ REQUIRE(counterid != -1);
+
+ if (mgr->stats != NULL) {
+ isc_stats_decrement(mgr->stats, counterid);
+ }
+}
+
+isc_result_t
+isc__nm_socket(int domain, int type, int protocol, uv_os_sock_t *sockp) {
+#ifdef WIN32
+ SOCKET sock;
+ sock = socket(domain, type, protocol);
+ if (sock == INVALID_SOCKET) {
+ char strbuf[ISC_STRERRORSIZE];
+ DWORD socket_errno = WSAGetLastError();
+ switch (socket_errno) {
+ case WSAEMFILE:
+ case WSAENOBUFS:
+ return (ISC_R_NORESOURCES);
+
+ case WSAEPROTONOSUPPORT:
+ case WSAEPFNOSUPPORT:
+ case WSAEAFNOSUPPORT:
+ return (ISC_R_FAMILYNOSUPPORT);
+ default:
+ strerror_r(socket_errno, strbuf, sizeof(strbuf));
+ UNEXPECTED_ERROR(
+ __FILE__, __LINE__,
+ "socket() failed with error code %lu: %s",
+ socket_errno, strbuf);
+ return (ISC_R_UNEXPECTED);
+ }
+ }
+#else
+ int sock = socket(domain, type, protocol);
+ if (sock < 0) {
+ return (isc_errno_toresult(errno));
+ }
+#endif
+ *sockp = (uv_os_sock_t)sock;
+ return (ISC_R_SUCCESS);
+}
+
+void
+isc__nm_closesocket(uv_os_sock_t sock) {
+#ifdef WIN32
+ closesocket(sock);
+#else
+ close(sock);
+#endif
+}
+
+#define setsockopt_on(socket, level, name) \
+ setsockopt(socket, level, name, &(int){ 1 }, sizeof(int))
+
+#define setsockopt_off(socket, level, name) \
+ setsockopt(socket, level, name, &(int){ 0 }, sizeof(int))
+
+isc_result_t
+isc__nm_socket_freebind(uv_os_sock_t fd, sa_family_t sa_family) {
+ /*
+ * Set the IP_FREEBIND (or equivalent option) on the uv_handle.
+ */
+#ifdef IP_FREEBIND
+ UNUSED(sa_family);
+ if (setsockopt_on(fd, IPPROTO_IP, IP_FREEBIND) == -1) {
+ return (ISC_R_FAILURE);
+ }
+ return (ISC_R_SUCCESS);
+#elif defined(IP_BINDANY) || defined(IPV6_BINDANY)
+ if (sa_family == AF_INET) {
+#if defined(IP_BINDANY)
+ if (setsockopt_on(fd, IPPROTO_IP, IP_BINDANY) == -1) {
+ return (ISC_R_FAILURE);
+ }
+ return (ISC_R_SUCCESS);
+#endif
+ } else if (sa_family == AF_INET6) {
+#if defined(IPV6_BINDANY)
+ if (setsockopt_on(fd, IPPROTO_IPV6, IPV6_BINDANY) == -1) {
+ return (ISC_R_FAILURE);
+ }
+ return (ISC_R_SUCCESS);
+#endif
+ }
+ return (ISC_R_NOTIMPLEMENTED);
+#elif defined(SO_BINDANY)
+ UNUSED(sa_family);
+ if (setsockopt_on(fd, SOL_SOCKET, SO_BINDANY) == -1) {
+ return (ISC_R_FAILURE);
+ }
+ return (ISC_R_SUCCESS);
+#else
+ UNUSED(fd);
+ UNUSED(sa_family);
+ return (ISC_R_NOTIMPLEMENTED);
+#endif
+}
+
+isc_result_t
+isc__nm_socket_reuse(uv_os_sock_t fd) {
+ /*
+ * Generally, the SO_REUSEADDR socket option allows reuse of
+ * local addresses.
+ *
+ * On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some
+ * additional refinements for programs that use multicast.
+ *
+ * On Linux, SO_REUSEPORT has different semantics: it _shares_ the port
+ * rather than steal it from the current listener, so we don't use it
+ * here, but rather in isc__nm_socket_reuse_lb().
+ *
+ * On Windows, it also allows a socket to forcibly bind to a port in use
+ * by another socket.
+ */
+
+#if defined(SO_REUSEPORT) && !defined(__linux__)
+ if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT) == -1) {
+ return (ISC_R_FAILURE);
+ }
+ return (ISC_R_SUCCESS);
+#elif defined(SO_REUSEADDR)
+ if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) == -1) {
+ return (ISC_R_FAILURE);
+ }
+ return (ISC_R_SUCCESS);
+#else
+ UNUSED(fd);
+ return (ISC_R_NOTIMPLEMENTED);
+#endif
+}
+
+isc_result_t
+isc__nm_socket_reuse_lb(uv_os_sock_t fd) {
+ /*
+ * On FreeBSD 12+, SO_REUSEPORT_LB socket option allows sockets to be
+ * bound to an identical socket address. For UDP sockets, the use of
+ * this option can provide better distribution of incoming datagrams to
+ * multiple processes (or threads) as compared to the traditional
+ * technique of having multiple processes compete to receive datagrams
+ * on the same socket.
+ *
+ * On Linux, the same thing is achieved simply with SO_REUSEPORT.
+ */
+#if defined(SO_REUSEPORT_LB)
+ if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT_LB) == -1) {
+ return (ISC_R_FAILURE);
+ } else {
+ return (ISC_R_SUCCESS);
+ }
+#elif defined(SO_REUSEPORT) && defined(__linux__)
+ if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT) == -1) {
+ return (ISC_R_FAILURE);
+ } else {
+ return (ISC_R_SUCCESS);
+ }
+#else
+ UNUSED(fd);
+ return (ISC_R_NOTIMPLEMENTED);
+#endif
+}
+
+isc_result_t
+isc__nm_socket_incoming_cpu(uv_os_sock_t fd) {
+#ifdef SO_INCOMING_CPU
+ if (setsockopt_on(fd, SOL_SOCKET, SO_INCOMING_CPU) == -1) {
+ return (ISC_R_FAILURE);
+ } else {
+ return (ISC_R_SUCCESS);
+ }
+#else
+ UNUSED(fd);
+#endif
+ return (ISC_R_NOTIMPLEMENTED);
+}
+
+isc_result_t
+isc__nm_socket_disable_pmtud(uv_os_sock_t fd, sa_family_t sa_family) {
+ /*
+ * Disable the Path MTU Discovery on IP packets
+ */
+ if (sa_family == AF_INET6) {
+#if defined(IPV6_DONTFRAG)
+ if (setsockopt_off(fd, IPPROTO_IPV6, IPV6_DONTFRAG) == -1) {
+ return (ISC_R_FAILURE);
+ } else {
+ return (ISC_R_SUCCESS);
+ }
+#elif defined(IPV6_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT)
+ if (setsockopt(fd, IPPROTO_IPV6, IPV6_MTU_DISCOVER,
+ &(int){ IP_PMTUDISC_OMIT }, sizeof(int)) == -1)
+ {
+ return (ISC_R_FAILURE);
+ } else {
+ return (ISC_R_SUCCESS);
+ }
+#else
+ UNUSED(fd);
+#endif
+ } else if (sa_family == AF_INET) {
+#if defined(IP_DONTFRAG)
+ if (setsockopt_off(fd, IPPROTO_IP, IP_DONTFRAG) == -1) {
+ return (ISC_R_FAILURE);
+ } else {
+ return (ISC_R_SUCCESS);
+ }
+#elif defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT)
+ if (setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER,
+ &(int){ IP_PMTUDISC_OMIT }, sizeof(int)) == -1)
+ {
+ return (ISC_R_FAILURE);
+ } else {
+ return (ISC_R_SUCCESS);
+ }
+#else
+ UNUSED(fd);
+#endif
+ } else {
+ return (ISC_R_FAMILYNOSUPPORT);
+ }
+
+ return (ISC_R_NOTIMPLEMENTED);
+}
+
+#if defined(_WIN32)
+#define TIMEOUT_TYPE DWORD
+#define TIMEOUT_DIV 1000
+#define TIMEOUT_OPTNAME TCP_MAXRT
+#elif defined(TCP_CONNECTIONTIMEOUT)
+#define TIMEOUT_TYPE int
+#define TIMEOUT_DIV 1000
+#define TIMEOUT_OPTNAME TCP_CONNECTIONTIMEOUT
+#elif defined(TCP_RXT_CONNDROPTIME)
+#define TIMEOUT_TYPE int
+#define TIMEOUT_DIV 1000
+#define TIMEOUT_OPTNAME TCP_RXT_CONNDROPTIME
+#elif defined(TCP_USER_TIMEOUT)
+#define TIMEOUT_TYPE unsigned int
+#define TIMEOUT_DIV 1
+#define TIMEOUT_OPTNAME TCP_USER_TIMEOUT
+#elif defined(TCP_KEEPINIT)
+#define TIMEOUT_TYPE int
+#define TIMEOUT_DIV 1000
+#define TIMEOUT_OPTNAME TCP_KEEPINIT
+#endif
+
+isc_result_t
+isc__nm_socket_connectiontimeout(uv_os_sock_t fd, int timeout_ms) {
+#if defined(TIMEOUT_OPTNAME)
+ TIMEOUT_TYPE timeout = timeout_ms / TIMEOUT_DIV;
+
+ if (timeout == 0) {
+ timeout = 1;
+ }
+
+ if (setsockopt(fd, IPPROTO_TCP, TIMEOUT_OPTNAME, &timeout,
+ sizeof(timeout)) == -1)
+ {
+ return (ISC_R_FAILURE);
+ }
+
+ return (ISC_R_SUCCESS);
+#else
+ UNUSED(fd);
+ UNUSED(timeout_ms);
+
+ return (ISC_R_SUCCESS);
+#endif
+}
+
+isc_result_t
+isc__nm_socket_tcp_nodelay(uv_os_sock_t fd) {
+#ifdef TCP_NODELAY
+ if (setsockopt_on(fd, IPPROTO_TCP, TCP_NODELAY) == -1) {
+ return (ISC_R_FAILURE);
+ } else {
+ return (ISC_R_SUCCESS);
+ }
+#else
+ UNUSED(fd);
+ return (ISC_R_SUCCESS);
+#endif
+}
+
+static isc_threadresult_t
+isc__nm_work_run(isc_threadarg_t arg) {
+ isc__nm_work_t *work = (isc__nm_work_t *)arg;
+
+ work->cb(work->data);
+
+ return ((isc_threadresult_t)0);
+}
+
+static void
+isc__nm_work_cb(uv_work_t *req) {
+ isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
+
+ if (isc_tid_v == SIZE_MAX) {
+ isc__trampoline_t *trampoline_arg =
+ isc__trampoline_get(isc__nm_work_run, work);
+ (void)isc__trampoline_run(trampoline_arg);
+ } else {
+ (void)isc__nm_work_run((isc_threadarg_t)work);
+ }
+}
+
+static void
+isc__nm_after_work_cb(uv_work_t *req, int status) {
+ isc_result_t result = ISC_R_SUCCESS;
+ isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
+ isc_nm_t *netmgr = work->netmgr;
+
+ if (status != 0) {
+ result = isc__nm_uverr2result(status);
+ }
+
+ work->after_cb(work->data, result);
+
+ isc_mem_put(netmgr->mctx, work, sizeof(*work));
+
+ isc_nm_detach(&netmgr);
+}
+
+void
+isc_nm_work_offload(isc_nm_t *netmgr, isc_nm_workcb_t work_cb,
+ isc_nm_after_workcb_t after_work_cb, void *data) {
+ isc__networker_t *worker = NULL;
+ isc__nm_work_t *work = NULL;
+ int r;
+
+ REQUIRE(isc__nm_in_netthread());
+ REQUIRE(VALID_NM(netmgr));
+
+ worker = &netmgr->workers[isc_nm_tid()];
+
+ work = isc_mem_get(netmgr->mctx, sizeof(*work));
+ *work = (isc__nm_work_t){
+ .cb = work_cb,
+ .after_cb = after_work_cb,
+ .data = data,
+ };
+
+ isc_nm_attach(netmgr, &work->netmgr);
+
+ uv_req_set_data((uv_req_t *)&work->req, work);
+
+ r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb,
+ isc__nm_after_work_cb);
+ UV_RUNTIME_CHECK(uv_queue_work, r);
+}
+
+void
+isc_nm_timer_create(isc_nmhandle_t *handle, isc_nm_timer_cb cb, void *cbarg,
+ isc_nm_timer_t **timerp) {
+ isc__networker_t *worker = NULL;
+ isc_nmsocket_t *sock = NULL;
+ isc_nm_timer_t *timer = NULL;
+ int r;
+
+ REQUIRE(isc__nm_in_netthread());
+ REQUIRE(VALID_NMHANDLE(handle));
+ REQUIRE(VALID_NMSOCK(handle->sock));
+
+ sock = handle->sock;
+ worker = &sock->mgr->workers[isc_nm_tid()];
+
+ timer = isc_mem_get(sock->mgr->mctx, sizeof(*timer));
+ *timer = (isc_nm_timer_t){ .cb = cb, .cbarg = cbarg };
+ isc_refcount_init(&timer->references, 1);
+ isc_nmhandle_attach(handle, &timer->handle);
+
+ r = uv_timer_init(&worker->loop, &timer->timer);
+ UV_RUNTIME_CHECK(uv_timer_init, r);
+
+ uv_handle_set_data((uv_handle_t *)&timer->timer, timer);
+
+ *timerp = timer;
+}
+
+void
+isc_nm_timer_attach(isc_nm_timer_t *timer, isc_nm_timer_t **timerp) {
+ REQUIRE(timer != NULL);
+ REQUIRE(timerp != NULL && *timerp == NULL);
+
+ isc_refcount_increment(&timer->references);
+ *timerp = timer;
+}
+
+static void
+timer_destroy(uv_handle_t *uvhandle) {
+ isc_nm_timer_t *timer = uv_handle_get_data(uvhandle);
+ isc_nmhandle_t *handle = timer->handle;
+ isc_mem_t *mctx = timer->handle->sock->mgr->mctx;
+
+ isc_mem_put(mctx, timer, sizeof(*timer));
+
+ isc_nmhandle_detach(&handle);
+}
+
+void
+isc_nm_timer_detach(isc_nm_timer_t **timerp) {
+ isc_nm_timer_t *timer = NULL;
+ isc_nmhandle_t *handle = NULL;
+
+ REQUIRE(timerp != NULL && *timerp != NULL);
+
+ timer = *timerp;
+ *timerp = NULL;
+
+ handle = timer->handle;
+
+ REQUIRE(isc__nm_in_netthread());
+ REQUIRE(VALID_NMHANDLE(handle));
+ REQUIRE(VALID_NMSOCK(handle->sock));
+
+ if (isc_refcount_decrement(&timer->references) == 1) {
+ int r = uv_timer_stop(&timer->timer);
+ UV_RUNTIME_CHECK(uv_timer_stop, r);
+ uv_close((uv_handle_t *)&timer->timer, timer_destroy);
+ }
+}
+
+static void
+timer_cb(uv_timer_t *uvtimer) {
+ isc_nm_timer_t *timer = uv_handle_get_data((uv_handle_t *)uvtimer);
+
+ REQUIRE(timer->cb != NULL);
+
+ timer->cb(timer->cbarg, ISC_R_TIMEDOUT);
+}
+
+void
+isc_nm_timer_start(isc_nm_timer_t *timer, uint64_t timeout) {
+ int r = uv_timer_start(&timer->timer, timer_cb, timeout, 0);
+ UV_RUNTIME_CHECK(uv_timer_start, r);
+}
+
+void
+isc_nm_timer_stop(isc_nm_timer_t *timer) {
+ int r = uv_timer_stop(&timer->timer);
+ UV_RUNTIME_CHECK(uv_timer_stop, r);
+}
+
+#ifdef NETMGR_TRACE
+/*
+ * Dump all active sockets in netmgr. We output to stderr
+ * as the logger might be already shut down.
+ */
+
+static const char *
+nmsocket_type_totext(isc_nmsocket_type type) {
+ switch (type) {
+ case isc_nm_udpsocket:
+ return ("isc_nm_udpsocket");
+ case isc_nm_udplistener:
+ return ("isc_nm_udplistener");
+ case isc_nm_tcpsocket:
+ return ("isc_nm_tcpsocket");
+ case isc_nm_tcplistener:
+ return ("isc_nm_tcplistener");
+ case isc_nm_tcpdnslistener:
+ return ("isc_nm_tcpdnslistener");
+ case isc_nm_tcpdnssocket:
+ return ("isc_nm_tcpdnssocket");
+ default:
+ UNREACHABLE();
+ }
+}
+
+static void
+nmhandle_dump(isc_nmhandle_t *handle) {
+ fprintf(stderr, "Active handle %p, refs %" PRIuFAST32 "\n", handle,
+ isc_refcount_current(&handle->references));
+ fprintf(stderr, "Created by:\n");
+ backtrace_symbols_fd(handle->backtrace, handle->backtrace_size,
+ STDERR_FILENO);
+ fprintf(stderr, "\n\n");
+}
+
+static void
+nmsocket_dump(isc_nmsocket_t *sock) {
+ isc_nmhandle_t *handle = NULL;
+
+ LOCK(&sock->lock);
+ fprintf(stderr, "\n=================\n");
+ fprintf(stderr, "Active %s socket %p, type %s, refs %" PRIuFAST32 "\n",
+ atomic_load(&sock->client) ? "client" : "server", sock,
+ nmsocket_type_totext(sock->type),
+ isc_refcount_current(&sock->references));
+ fprintf(stderr,
+ "Parent %p, listener %p, server %p, statichandle = "
+ "%p\n",
+ sock->parent, sock->listener, sock->server, sock->statichandle);
+ fprintf(stderr, "Flags:%s%s%s%s%s\n",
+ atomic_load(&sock->active) ? " active" : "",
+ atomic_load(&sock->closing) ? " closing" : "",
+ atomic_load(&sock->destroying) ? " destroying" : "",
+ atomic_load(&sock->connecting) ? " connecting" : "",
+ sock->accepting ? " accepting" : "");
+ fprintf(stderr, "Created by:\n");
+ backtrace_symbols_fd(sock->backtrace, sock->backtrace_size,
+ STDERR_FILENO);
+ fprintf(stderr, "\n");
+
+ for (handle = ISC_LIST_HEAD(sock->active_handles); handle != NULL;
+ handle = ISC_LIST_NEXT(handle, active_link))
+ {
+ static bool first = true;
+ if (first) {
+ fprintf(stderr, "Active handles:\n");
+ first = false;
+ }
+ nmhandle_dump(handle);
+ }
+
+ fprintf(stderr, "\n");
+ UNLOCK(&sock->lock);
+}
+
+void
+isc__nm_dump_active(isc_nm_t *nm) {
+ isc_nmsocket_t *sock = NULL;
+
+ REQUIRE(VALID_NM(nm));
+
+ LOCK(&nm->lock);
+ for (sock = ISC_LIST_HEAD(nm->active_sockets); sock != NULL;
+ sock = ISC_LIST_NEXT(sock, active_link))
+ {
+ static bool first = true;
+ if (first) {
+ fprintf(stderr, "Outstanding sockets\n");
+ first = false;
+ }
+ nmsocket_dump(sock);
+ }
+ UNLOCK(&nm->lock);
+}
+#endif