/* * Copyright (C) Internet Systems Consortium, Inc. ("ISC") * * SPDX-License-Identifier: MPL-2.0 * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, you can obtain one at https://mozilla.org/MPL/2.0/. * * See the COPYRIGHT file distributed with this work for additional * information regarding copyright ownership. */ #include #include #include #ifdef HAVE_LIBCTRACE #include #endif /* ifdef HAVE_LIBCTRACE */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "netmgr-int.h" #include "netmgr_p.h" #include "openssl_shim.h" #include "trampoline_p.h" #include "uv-compat.h" /*% * How many isc_nmhandles and isc_nm_uvreqs will we be * caching for reuse in a socket. */ #define ISC_NM_HANDLES_STACK_SIZE 600 #define ISC_NM_REQS_STACK_SIZE 600 /*% * Shortcut index arrays to get access to statistics counters. */ static const isc_statscounter_t udp4statsindex[] = { isc_sockstatscounter_udp4open, isc_sockstatscounter_udp4openfail, isc_sockstatscounter_udp4close, isc_sockstatscounter_udp4bindfail, isc_sockstatscounter_udp4connectfail, isc_sockstatscounter_udp4connect, -1, -1, isc_sockstatscounter_udp4sendfail, isc_sockstatscounter_udp4recvfail, isc_sockstatscounter_udp4active }; static const isc_statscounter_t udp6statsindex[] = { isc_sockstatscounter_udp6open, isc_sockstatscounter_udp6openfail, isc_sockstatscounter_udp6close, isc_sockstatscounter_udp6bindfail, isc_sockstatscounter_udp6connectfail, isc_sockstatscounter_udp6connect, -1, -1, isc_sockstatscounter_udp6sendfail, isc_sockstatscounter_udp6recvfail, isc_sockstatscounter_udp6active }; static const isc_statscounter_t tcp4statsindex[] = { isc_sockstatscounter_tcp4open, isc_sockstatscounter_tcp4openfail, isc_sockstatscounter_tcp4close, isc_sockstatscounter_tcp4bindfail, isc_sockstatscounter_tcp4connectfail, isc_sockstatscounter_tcp4connect, isc_sockstatscounter_tcp4acceptfail, isc_sockstatscounter_tcp4accept, isc_sockstatscounter_tcp4sendfail, isc_sockstatscounter_tcp4recvfail, isc_sockstatscounter_tcp4active }; static const isc_statscounter_t tcp6statsindex[] = { isc_sockstatscounter_tcp6open, isc_sockstatscounter_tcp6openfail, isc_sockstatscounter_tcp6close, isc_sockstatscounter_tcp6bindfail, isc_sockstatscounter_tcp6connectfail, isc_sockstatscounter_tcp6connect, isc_sockstatscounter_tcp6acceptfail, isc_sockstatscounter_tcp6accept, isc_sockstatscounter_tcp6sendfail, isc_sockstatscounter_tcp6recvfail, isc_sockstatscounter_tcp6active }; #if 0 /* XXX: not currently used */ static const isc_statscounter_t unixstatsindex[] = { isc_sockstatscounter_unixopen, isc_sockstatscounter_unixopenfail, isc_sockstatscounter_unixclose, isc_sockstatscounter_unixbindfail, isc_sockstatscounter_unixconnectfail, isc_sockstatscounter_unixconnect, isc_sockstatscounter_unixacceptfail, isc_sockstatscounter_unixaccept, isc_sockstatscounter_unixsendfail, isc_sockstatscounter_unixrecvfail, isc_sockstatscounter_unixactive }; #endif /* if 0 */ /* * libuv is not thread safe, but has mechanisms to pass messages * between threads. Each socket is owned by a thread. For UDP * sockets we have a set of sockets for each interface and we can * choose a sibling and send the message directly. For TCP, or if * we're calling from a non-networking thread, we need to pass the * request using async_cb. */ #if defined(HAVE_THREAD_LOCAL) #include static thread_local int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; #elif defined(HAVE___THREAD) static __thread int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; #elif HAVE___DECLSPEC_THREAD __declspec(thread) int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; #endif /* if defined(HAVE_THREAD_LOCAL) */ static void nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG); static void nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle); static isc_threadresult_t nm_thread(isc_threadarg_t worker0); static void async_cb(uv_async_t *handle); static bool process_netievent(isc__networker_t *worker, isc__netievent_t *ievent); static isc_result_t process_queue(isc__networker_t *worker, netievent_type_t type); static void wait_for_priority_queue(isc__networker_t *worker); static void drain_queue(isc__networker_t *worker, netievent_type_t type); static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0); static void isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0); static void isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0); static void isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0); static void isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0); static void isc__nm_threadpool_initialize(uint32_t workers); static void isc__nm_work_cb(uv_work_t *req); static void isc__nm_after_work_cb(uv_work_t *req, int status); void isc__nmsocket_reset(isc_nmsocket_t *sock); /*%< * Issue a 'handle closed' callback on the socket. */ static void nmhandle_detach_cb(isc_nmhandle_t **handlep FLARG); int isc_nm_tid(void) { return (isc__nm_tid_v); } bool isc__nm_in_netthread(void) { return (isc__nm_tid_v >= 0); } #ifdef WIN32 static void isc__nm_winsock_initialize(void) { WORD wVersionRequested = MAKEWORD(2, 2); WSADATA wsaData; int result; result = WSAStartup(wVersionRequested, &wsaData); if (result != 0) { char strbuf[ISC_STRERRORSIZE]; strerror_r(result, strbuf, sizeof(strbuf)); UNEXPECTED_ERROR(__FILE__, __LINE__, "WSAStartup() failed with error code %lu: %s", result, strbuf); } /* * Confirm that the WinSock DLL supports version 2.2. Note that if the * DLL supports versions greater than 2.2 in addition to 2.2, it will * still return 2.2 in wVersion since that is the version we requested. */ if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { UNEXPECTED_ERROR(__FILE__, __LINE__, "Unusable WinSock DLL version: %u.%u", LOBYTE(wsaData.wVersion), HIBYTE(wsaData.wVersion)); } } static void isc__nm_winsock_destroy(void) { WSACleanup(); } #endif /* WIN32 */ static void isc__nm_threadpool_initialize(uint32_t nworkers) { char buf[11]; int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf, &(size_t){ sizeof(buf) }); if (r == UV_ENOENT) { snprintf(buf, sizeof(buf), "%" PRIu32, nworkers); uv_os_setenv("UV_THREADPOOL_SIZE", buf); } } #if HAVE_DECL_UV_UDP_MMSG_FREE #define MINIMAL_UV_VERSION UV_VERSION(1, 40, 0) #elif HAVE_DECL_UV_UDP_RECVMMSG #define MAXIMAL_UV_VERSION UV_VERSION(1, 39, 99) #define MINIMAL_UV_VERSION UV_VERSION(1, 37, 0) #elif _WIN32 #define MINIMAL_UV_VERSION UV_VERSION(1, 0, 0) #else #define MAXIMAL_UV_VERSION UV_VERSION(1, 34, 99) #define MINIMAL_UV_VERSION UV_VERSION(1, 0, 0) #endif void isc__netmgr_create(isc_mem_t *mctx, uint32_t nworkers, isc_nm_t **netmgrp) { isc_nm_t *mgr = NULL; char name[32]; REQUIRE(nworkers > 0); #ifdef MAXIMAL_UV_VERSION if (uv_version() > MAXIMAL_UV_VERSION) { isc_error_fatal(__FILE__, __LINE__, "libuv version too new: running with libuv %s " "when compiled with libuv %s will lead to " "libuv failures", uv_version_string(), UV_VERSION_STRING); } #endif /* MAXIMAL_UV_VERSION */ if (uv_version() < MINIMAL_UV_VERSION) { isc_error_fatal(__FILE__, __LINE__, "libuv version too old: running with libuv %s " "when compiled with libuv %s will lead to " "libuv failures", uv_version_string(), UV_VERSION_STRING); } #ifdef WIN32 isc__nm_winsock_initialize(); #endif /* WIN32 */ isc__nm_threadpool_initialize(nworkers); mgr = isc_mem_get(mctx, sizeof(*mgr)); *mgr = (isc_nm_t){ .nworkers = nworkers * 2, .nlisteners = nworkers, }; isc_mem_attach(mctx, &mgr->mctx); isc_mutex_init(&mgr->lock); isc_condition_init(&mgr->wkstatecond); isc_condition_init(&mgr->wkpausecond); isc_refcount_init(&mgr->references, 1); atomic_init(&mgr->maxudp, 0); atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED); atomic_init(&mgr->workers_paused, 0); atomic_init(&mgr->paused, false); atomic_init(&mgr->closing, false); #if HAVE_SO_REUSEPORT_LB mgr->load_balance_sockets = true; #else mgr->load_balance_sockets = false; #endif #ifdef NETMGR_TRACE ISC_LIST_INIT(mgr->active_sockets); #endif /* * Default TCP timeout values. * May be updated by isc_nm_tcptimeouts(). */ atomic_init(&mgr->init, 30000); atomic_init(&mgr->idle, 30000); atomic_init(&mgr->keepalive, 30000); atomic_init(&mgr->advertised, 30000); isc_barrier_init(&mgr->pausing, mgr->nworkers); isc_barrier_init(&mgr->resuming, mgr->nworkers); mgr->workers = isc_mem_get(mctx, mgr->nworkers * sizeof(isc__networker_t)); for (int i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; int r; *worker = (isc__networker_t){ .mgr = mgr, .id = i, }; r = uv_loop_init(&worker->loop); UV_RUNTIME_CHECK(uv_loop_init, r); worker->loop.data = &mgr->workers[i]; r = uv_async_init(&worker->loop, &worker->async, async_cb); UV_RUNTIME_CHECK(uv_async_init, r); for (size_t type = 0; type < NETIEVENT_MAX; type++) { isc_mutex_init(&worker->ievents[type].lock); isc_condition_init(&worker->ievents[type].cond); ISC_LIST_INIT(worker->ievents[type].list); } worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE); worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE); /* * We need to do this here and not in nm_thread to avoid a * race - we could exit isc_nm_start, launch nm_destroy, * and nm_thread would still not be up. */ mgr->workers_running++; isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread); snprintf(name, sizeof(name), "isc-net-%04d", i); isc_thread_setname(worker->thread, name); } mgr->magic = NM_MAGIC; *netmgrp = mgr; } /* * Free the resources of the network manager. */ static void nm_destroy(isc_nm_t **mgr0) { REQUIRE(VALID_NM(*mgr0)); REQUIRE(!isc__nm_in_netthread()); isc_nm_t *mgr = *mgr0; *mgr0 = NULL; isc_refcount_destroy(&mgr->references); mgr->magic = 0; for (int i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; isc__netievent_t *event = isc__nm_get_netievent_stop(mgr); isc__nm_enqueue_ievent(worker, event); } LOCK(&mgr->lock); while (mgr->workers_running > 0) { WAIT(&mgr->wkstatecond, &mgr->lock); } UNLOCK(&mgr->lock); for (int i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; int r; r = uv_loop_close(&worker->loop); UV_RUNTIME_CHECK(uv_loop_close, r); for (size_t type = 0; type < NETIEVENT_MAX; type++) { INSIST(ISC_LIST_EMPTY(worker->ievents[type].list)); isc_condition_destroy(&worker->ievents[type].cond); isc_mutex_destroy(&worker->ievents[type].lock); } isc_mem_put(mgr->mctx, worker->sendbuf, ISC_NETMGR_SENDBUF_SIZE); isc_mem_put(mgr->mctx, worker->recvbuf, ISC_NETMGR_RECVBUF_SIZE); isc_thread_join(worker->thread, NULL); } if (mgr->stats != NULL) { isc_stats_detach(&mgr->stats); } isc_barrier_destroy(&mgr->resuming); isc_barrier_destroy(&mgr->pausing); isc_condition_destroy(&mgr->wkstatecond); isc_condition_destroy(&mgr->wkpausecond); isc_mutex_destroy(&mgr->lock); isc_mem_put(mgr->mctx, mgr->workers, mgr->nworkers * sizeof(isc__networker_t)); isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr)); #ifdef WIN32 isc__nm_winsock_destroy(); #endif /* WIN32 */ } static void enqueue_pause(isc__networker_t *worker) { isc__netievent_pause_t *event = isc__nm_get_netievent_pause(worker->mgr); isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event); } static void isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); REQUIRE(worker->paused == false); worker->paused = true; uv_stop(&worker->loop); } void isc_nm_pause(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); REQUIRE(!atomic_load(&mgr->paused)); isc__nm_acquire_interlocked_force(mgr); if (isc__nm_in_netthread()) { REQUIRE(isc_nm_tid() == 0); } for (int i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; if (i == isc_nm_tid()) { isc__nm_async_pause(worker, NULL); } else { enqueue_pause(worker); } } if (isc__nm_in_netthread()) { atomic_fetch_add(&mgr->workers_paused, 1); isc_barrier_wait(&mgr->pausing); } LOCK(&mgr->lock); while (atomic_load(&mgr->workers_paused) != mgr->workers_running) { WAIT(&mgr->wkstatecond, &mgr->lock); } UNLOCK(&mgr->lock); REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false }, true)); } static void enqueue_resume(isc__networker_t *worker) { isc__netievent_resume_t *event = isc__nm_get_netievent_resume(worker->mgr); isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event); } static void isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); REQUIRE(worker->paused == true); worker->paused = false; } void isc_nm_resume(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); REQUIRE(atomic_load(&mgr->paused)); if (isc__nm_in_netthread()) { REQUIRE(isc_nm_tid() == 0); drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY); } for (int i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; if (i == isc_nm_tid()) { isc__nm_async_resume(worker, NULL); } else { enqueue_resume(worker); } } if (isc__nm_in_netthread()) { drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED); atomic_fetch_sub(&mgr->workers_paused, 1); isc_barrier_wait(&mgr->resuming); } LOCK(&mgr->lock); while (atomic_load(&mgr->workers_paused) != 0) { WAIT(&mgr->wkstatecond, &mgr->lock); } UNLOCK(&mgr->lock); REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true }, false)); isc__nm_drop_interlocked(mgr); } void isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst) { REQUIRE(VALID_NM(mgr)); REQUIRE(dst != NULL && *dst == NULL); isc_refcount_increment(&mgr->references); *dst = mgr; } void isc_nm_detach(isc_nm_t **mgr0) { isc_nm_t *mgr = NULL; REQUIRE(mgr0 != NULL); REQUIRE(VALID_NM(*mgr0)); mgr = *mgr0; *mgr0 = NULL; if (isc_refcount_decrement(&mgr->references) == 1) { nm_destroy(&mgr); } } void isc__netmgr_shutdown(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); atomic_store(&mgr->closing, true); for (int i = 0; i < mgr->nworkers; i++) { isc__netievent_t *event = NULL; event = isc__nm_get_netievent_shutdown(mgr); isc__nm_enqueue_ievent(&mgr->workers[i], event); } } void isc__netmgr_destroy(isc_nm_t **netmgrp) { isc_nm_t *mgr = NULL; int counter = 0; REQUIRE(VALID_NM(*netmgrp)); mgr = *netmgrp; /* * Close active connections. */ isc__netmgr_shutdown(mgr); /* * Wait for the manager to be dereferenced elsewhere. */ while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) { uv_sleep(10); } #ifdef NETMGR_TRACE if (isc_refcount_current(&mgr->references) > 1) { isc__nm_dump_active(mgr); UNREACHABLE(); } #endif /* * Now just patiently wait */ while (isc_refcount_current(&mgr->references) > 1) { uv_sleep(10); } /* * Detach final reference. */ isc_nm_detach(netmgrp); } void isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) { REQUIRE(VALID_NM(mgr)); atomic_store(&mgr->maxudp, maxudp); } void isc_nmhandle_setwritetimeout(isc_nmhandle_t *handle, uint64_t write_timeout) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); handle->sock->write_timeout = write_timeout; } void isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle, uint32_t keepalive, uint32_t advertised) { REQUIRE(VALID_NM(mgr)); atomic_store(&mgr->init, init); atomic_store(&mgr->idle, idle); atomic_store(&mgr->keepalive, keepalive); atomic_store(&mgr->advertised, advertised); } bool isc_nm_getloadbalancesockets(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); return (mgr->load_balance_sockets); } void isc_nm_setloadbalancesockets(isc_nm_t *mgr, bool enabled) { REQUIRE(VALID_NM(mgr)); #if HAVE_SO_REUSEPORT_LB mgr->load_balance_sockets = enabled; #else UNUSED(enabled); #endif } void isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle, uint32_t *keepalive, uint32_t *advertised) { REQUIRE(VALID_NM(mgr)); if (initial != NULL) { *initial = atomic_load(&mgr->init); } if (idle != NULL) { *idle = atomic_load(&mgr->idle); } if (keepalive != NULL) { *keepalive = atomic_load(&mgr->keepalive); } if (advertised != NULL) { *advertised = atomic_load(&mgr->advertised); } } /* * nm_thread is a single worker thread, that runs uv_run event loop * until asked to stop. * * There are four queues for asynchronous events: * * 1. priority queue - netievents on the priority queue are run even when * the taskmgr enters exclusive mode and the netmgr is paused. This * is needed to properly start listening on the interfaces, free * resources on shutdown, or resume from a pause. * * 2. privileged task queue - only privileged tasks are queued here and * this is the first queue that gets processed when network manager * is unpaused using isc_nm_resume(). All netmgr workers need to * clean the privileged task queue before they all proceed to normal * operation. Both task queues are processed when the workers are * shutting down. * * 3. task queue - only (traditional) tasks are scheduled here, and this * queue and the privileged task queue are both processed when the * netmgr workers are finishing. This is needed to process the task * shutdown events. * * 4. normal queue - this is the queue with netmgr events, e.g. reading, * sending, callbacks, etc. */ static isc_threadresult_t nm_thread(isc_threadarg_t worker0) { isc__networker_t *worker = (isc__networker_t *)worker0; isc_nm_t *mgr = worker->mgr; isc__nm_tid_v = worker->id; while (true) { /* * uv_run() runs async_cb() in a loop, which processes * all four event queues until a "pause" or "stop" event * is encountered. On pause, we process only priority and * privileged events until resuming. */ int r = uv_run(&worker->loop, UV_RUN_DEFAULT); INSIST(r > 0 || worker->finished); if (worker->paused) { INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid()); atomic_fetch_add(&mgr->workers_paused, 1); if (isc_barrier_wait(&mgr->pausing) != 0) { LOCK(&mgr->lock); SIGNAL(&mgr->wkstatecond); UNLOCK(&mgr->lock); } while (worker->paused) { wait_for_priority_queue(worker); } /* * All workers must drain the privileged event * queue before we resume from pause. */ drain_queue(worker, NETIEVENT_PRIVILEGED); atomic_fetch_sub(&mgr->workers_paused, 1); if (isc_barrier_wait(&mgr->resuming) != 0) { LOCK(&mgr->lock); SIGNAL(&mgr->wkstatecond); UNLOCK(&mgr->lock); } } if (r == 0) { INSIST(worker->finished); break; } INSIST(!worker->finished); } /* * We are shutting down. Drain the queues. */ drain_queue(worker, NETIEVENT_PRIVILEGED); drain_queue(worker, NETIEVENT_TASK); for (size_t type = 0; type < NETIEVENT_MAX; type++) { LOCK(&worker->ievents[type].lock); INSIST(ISC_LIST_EMPTY(worker->ievents[type].list)); UNLOCK(&worker->ievents[type].lock); } LOCK(&mgr->lock); mgr->workers_running--; SIGNAL(&mgr->wkstatecond); UNLOCK(&mgr->lock); return ((isc_threadresult_t)0); } static bool process_all_queues(isc__networker_t *worker) { bool reschedule = false; /* * The queue processing functions will return false when the * system is pausing or stopping and we don't want to process * the other queues in such case, but we need the async event * to be rescheduled in the next uv_run(). */ for (size_t type = 0; type < NETIEVENT_MAX; type++) { isc_result_t result = process_queue(worker, type); switch (result) { case ISC_R_SUSPEND: reschedule = true; break; case ISC_R_EMPTY: /* empty queue */ break; case ISC_R_SUCCESS: reschedule = true; break; default: UNREACHABLE(); } } return (reschedule); } /* * async_cb() is a universal callback for 'async' events sent to event loop. * It's the only way to safely pass data to the libuv event loop. We use a * single async event and a set of lockless queues of 'isc__netievent_t' * structures passed from other threads. */ static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *)handle->loop->data; if (process_all_queues(worker)) { /* * If we didn't process all the events, we need to enqueue * async_cb to be run in the next iteration of the uv_loop */ uv_async_send(handle); } } static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); worker->finished = true; /* Close the async handler */ uv_close((uv_handle_t *)&worker->async, NULL); } void isc_nm_task_enqueue(isc_nm_t *nm, isc_task_t *task, int threadid) { isc__netievent_t *event = NULL; int tid; isc__networker_t *worker = NULL; if (threadid == -1) { tid = (int)isc_random_uniform(nm->nlisteners); } else if (threadid == ISC_NM_TASK_SLOW_OFFSET) { tid = nm->nlisteners + (int)isc_random_uniform(nm->nworkers - nm->nlisteners); } else if (threadid < ISC_NM_TASK_SLOW_OFFSET) { tid = nm->nlisteners + (ISC_NM_TASK_SLOW(threadid) % (nm->nworkers - nm->nlisteners)); } else { tid = threadid % nm->nlisteners; } worker = &nm->workers[tid]; if (isc_task_privileged(task)) { event = (isc__netievent_t *) isc__nm_get_netievent_privilegedtask(nm, task); } else { event = (isc__netievent_t *)isc__nm_get_netievent_task(nm, task); } isc__nm_enqueue_ievent(worker, event); } #define isc__nm_async_privilegedtask(worker, ev0) \ isc__nm_async_task(worker, ev0) static void isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_task_t *ievent = (isc__netievent_task_t *)ev0; isc_result_t result; UNUSED(worker); result = isc_task_run(ievent->task); switch (result) { case ISC_R_QUOTA: isc_task_ready(ievent->task); return; case ISC_R_SUCCESS: return; default: UNREACHABLE(); } } static void wait_for_priority_queue(isc__networker_t *worker) { isc_condition_t *cond = &worker->ievents[NETIEVENT_PRIORITY].cond; isc_mutex_t *lock = &worker->ievents[NETIEVENT_PRIORITY].lock; isc__netievent_list_t *list = &(worker->ievents[NETIEVENT_PRIORITY].list); LOCK(lock); while (ISC_LIST_EMPTY(*list)) { WAIT(cond, lock); } UNLOCK(lock); drain_queue(worker, NETIEVENT_PRIORITY); } static void drain_queue(isc__networker_t *worker, netievent_type_t type) { bool empty = false; while (!empty) { if (process_queue(worker, type) == ISC_R_EMPTY) { LOCK(&worker->ievents[type].lock); empty = ISC_LIST_EMPTY(worker->ievents[type].list); UNLOCK(&worker->ievents[type].lock); } } } /* * The two macros here generate the individual cases for the process_netievent() * function. The NETIEVENT_CASE(type) macro is the common case, and * NETIEVENT_CASE_NOMORE(type) is a macro that causes the loop in the * process_queue() to stop, e.g. it's only used for the netievent that * stops/pauses processing the enqueued netievents. */ #define NETIEVENT_CASE(type) \ case netievent_##type: { \ isc__nm_async_##type(worker, ievent); \ isc__nm_put_netievent_##type( \ worker->mgr, (isc__netievent_##type##_t *)ievent); \ return (true); \ } #define NETIEVENT_CASE_NOMORE(type) \ case netievent_##type: { \ isc__nm_async_##type(worker, ievent); \ isc__nm_put_netievent_##type(worker->mgr, ievent); \ return (false); \ } static bool process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { REQUIRE(worker->id == isc_nm_tid()); switch (ievent->type) { /* Don't process more ievents when we are stopping */ NETIEVENT_CASE_NOMORE(stop); NETIEVENT_CASE(privilegedtask); NETIEVENT_CASE(task); NETIEVENT_CASE(udpconnect); NETIEVENT_CASE(udplisten); NETIEVENT_CASE(udpstop); NETIEVENT_CASE(udpsend); NETIEVENT_CASE(udpread); NETIEVENT_CASE(udpcancel); NETIEVENT_CASE(udpclose); NETIEVENT_CASE(tcpaccept); NETIEVENT_CASE(tcpconnect); NETIEVENT_CASE(tcplisten); NETIEVENT_CASE(tcpstartread); NETIEVENT_CASE(tcppauseread); NETIEVENT_CASE(tcpsend); NETIEVENT_CASE(tcpstop); NETIEVENT_CASE(tcpcancel); NETIEVENT_CASE(tcpclose); NETIEVENT_CASE(tcpdnsaccept); NETIEVENT_CASE(tcpdnslisten); NETIEVENT_CASE(tcpdnsconnect); NETIEVENT_CASE(tcpdnssend); NETIEVENT_CASE(tcpdnscancel); NETIEVENT_CASE(tcpdnsclose); NETIEVENT_CASE(tcpdnsread); NETIEVENT_CASE(tcpdnsstop); NETIEVENT_CASE(connectcb); NETIEVENT_CASE(readcb); NETIEVENT_CASE(sendcb); NETIEVENT_CASE(close); NETIEVENT_CASE(detach); NETIEVENT_CASE(shutdown); NETIEVENT_CASE(resume); NETIEVENT_CASE_NOMORE(pause); default: UNREACHABLE(); } return (true); } static isc_result_t process_queue(isc__networker_t *worker, netievent_type_t type) { isc__netievent_t *ievent = NULL; isc__netievent_list_t list; ISC_LIST_INIT(list); LOCK(&worker->ievents[type].lock); ISC_LIST_MOVE(list, worker->ievents[type].list); UNLOCK(&worker->ievents[type].lock); ievent = ISC_LIST_HEAD(list); if (ievent == NULL) { /* There's nothing scheduled */ return (ISC_R_EMPTY); } while (ievent != NULL) { isc__netievent_t *next = ISC_LIST_NEXT(ievent, link); ISC_LIST_DEQUEUE(list, ievent, link); if (!process_netievent(worker, ievent)) { /* The netievent told us to stop */ if (!ISC_LIST_EMPTY(list)) { /* * Reschedule the rest of the unprocessed * events. */ LOCK(&worker->ievents[type].lock); ISC_LIST_PREPENDLIST(worker->ievents[type].list, list, link); UNLOCK(&worker->ievents[type].lock); } return (ISC_R_SUSPEND); } ievent = next; } /* We processed at least one */ return (ISC_R_SUCCESS); } void * isc__nm_get_netievent(isc_nm_t *mgr, isc__netievent_type type) { isc__netievent_storage_t *event = isc_mem_get(mgr->mctx, sizeof(*event)); *event = (isc__netievent_storage_t){ .ni.type = type }; ISC_LINK_INIT(&(event->ni), link); return (event); } void isc__nm_put_netievent(isc_nm_t *mgr, void *ievent) { isc_mem_put(mgr->mctx, ievent, sizeof(isc__netievent_storage_t)); } NETIEVENT_SOCKET_DEF(tcpclose); NETIEVENT_SOCKET_DEF(tcplisten); NETIEVENT_SOCKET_DEF(tcppauseread); NETIEVENT_SOCKET_DEF(tcpstartread); NETIEVENT_SOCKET_DEF(tcpstop); NETIEVENT_SOCKET_DEF(udpclose); NETIEVENT_SOCKET_DEF(udplisten); NETIEVENT_SOCKET_DEF(udpread); NETIEVENT_SOCKET_DEF(udpsend); NETIEVENT_SOCKET_DEF(udpstop); NETIEVENT_SOCKET_DEF(tcpdnsclose); NETIEVENT_SOCKET_DEF(tcpdnsread); NETIEVENT_SOCKET_DEF(tcpdnsstop); NETIEVENT_SOCKET_DEF(tcpdnslisten); NETIEVENT_SOCKET_REQ_DEF(tcpdnsconnect); NETIEVENT_SOCKET_REQ_DEF(tcpdnssend); NETIEVENT_SOCKET_HANDLE_DEF(tcpdnscancel); NETIEVENT_SOCKET_QUOTA_DEF(tcpdnsaccept); NETIEVENT_SOCKET_REQ_DEF(tcpconnect); NETIEVENT_SOCKET_REQ_DEF(tcpsend); NETIEVENT_SOCKET_REQ_DEF(udpconnect); NETIEVENT_SOCKET_REQ_RESULT_DEF(connectcb); NETIEVENT_SOCKET_REQ_RESULT_DEF(readcb); NETIEVENT_SOCKET_REQ_RESULT_DEF(sendcb); NETIEVENT_SOCKET_DEF(detach); NETIEVENT_SOCKET_HANDLE_DEF(tcpcancel); NETIEVENT_SOCKET_HANDLE_DEF(udpcancel); NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept); NETIEVENT_SOCKET_DEF(close); NETIEVENT_DEF(pause); NETIEVENT_DEF(resume); NETIEVENT_DEF(shutdown); NETIEVENT_DEF(stop); NETIEVENT_TASK_DEF(task); NETIEVENT_TASK_DEF(privilegedtask); void isc__nm_maybe_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { /* * If we are already in the matching nmthread, process the ievent * directly. */ if (worker->id == isc_nm_tid()) { process_netievent(worker, event); return; } isc__nm_enqueue_ievent(worker, event); } void isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { netievent_type_t type; if (event->type > netievent_prio) { type = NETIEVENT_PRIORITY; } else { switch (event->type) { case netievent_prio: UNREACHABLE(); break; case netievent_privilegedtask: type = NETIEVENT_PRIVILEGED; break; case netievent_task: type = NETIEVENT_TASK; break; default: type = NETIEVENT_NORMAL; break; } } /* * We need to make sure this signal will be delivered and * the queue will be processed. */ LOCK(&worker->ievents[type].lock); ISC_LIST_ENQUEUE(worker->ievents[type].list, event, link); if (type == NETIEVENT_PRIORITY) { SIGNAL(&worker->ievents[type].cond); } UNLOCK(&worker->ievents[type].lock); uv_async_send(&worker->async); } bool isc__nmsocket_active(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); if (sock->parent != NULL) { return (atomic_load(&sock->parent->active)); } return (atomic_load(&sock->active)); } bool isc__nmsocket_deactivate(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); if (sock->parent != NULL) { return (atomic_compare_exchange_strong(&sock->parent->active, &(bool){ true }, false)); } return (atomic_compare_exchange_strong(&sock->active, &(bool){ true }, false)); } void isc___nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target FLARG) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(target != NULL && *target == NULL); isc_nmsocket_t *rsock = NULL; if (sock->parent != NULL) { rsock = sock->parent; INSIST(rsock->parent == NULL); /* sanity check */ } else { rsock = sock; } NETMGR_TRACE_LOG("isc__nmsocket_attach():%p->references = %" PRIuFAST32 "\n", rsock, isc_refcount_current(&rsock->references) + 1); isc_refcount_increment0(&rsock->references); *target = sock; } /* * Free all resources inside a socket (including its children if any). */ static void nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) { isc_nmhandle_t *handle = NULL; isc__nm_uvreq_t *uvreq = NULL; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(!isc__nmsocket_active(sock)); NETMGR_TRACE_LOG("nmsocket_cleanup():%p->references = %" PRIuFAST32 "\n", sock, isc_refcount_current(&sock->references)); atomic_store(&sock->destroying, true); if (sock->parent == NULL && sock->children != NULL) { /* * We shouldn't be here unless there are no active handles, * so we can clean up and free the children. */ for (size_t i = 0; i < sock->nchildren; i++) { if (!atomic_load(&sock->children[i].destroying)) { nmsocket_cleanup(&sock->children[i], false FLARG_PASS); } } /* * This was a parent socket: destroy the listening * barriers that synchronized the children. */ isc_barrier_destroy(&sock->startlistening); isc_barrier_destroy(&sock->stoplistening); /* * Now free them. */ isc_mem_put(sock->mgr->mctx, sock->children, sock->nchildren * sizeof(*sock)); sock->children = NULL; sock->nchildren = 0; } if (sock->statsindex != NULL) { isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]); } sock->statichandle = NULL; if (sock->outerhandle != NULL) { isc__nmhandle_detach(&sock->outerhandle FLARG_PASS); } if (sock->outer != NULL) { isc___nmsocket_detach(&sock->outer FLARG_PASS); } while ((handle = isc_astack_pop(sock->inactivehandles)) != NULL) { nmhandle_free(sock, handle); } if (sock->buf != NULL) { isc_mem_free(sock->mgr->mctx, sock->buf); } if (sock->quota != NULL) { isc_quota_detach(&sock->quota); } sock->pquota = NULL; isc_astack_destroy(sock->inactivehandles); while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) { isc_mem_put(sock->mgr->mctx, uvreq, sizeof(*uvreq)); } isc_astack_destroy(sock->inactivereqs); sock->magic = 0; isc_condition_destroy(&sock->scond); isc_condition_destroy(&sock->cond); isc_mutex_destroy(&sock->lock); #ifdef NETMGR_TRACE LOCK(&sock->mgr->lock); ISC_LIST_UNLINK(sock->mgr->active_sockets, sock, active_link); UNLOCK(&sock->mgr->lock); #endif if (dofree) { isc_nm_t *mgr = sock->mgr; isc_mem_put(mgr->mctx, sock, sizeof(*sock)); isc_nm_detach(&mgr); } else { isc_nm_detach(&sock->mgr); } } static void nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) { int active_handles; bool destroy = false; NETMGR_TRACE_LOG("%s():%p->references = %" PRIuFAST32 "\n", __func__, sock, isc_refcount_current(&sock->references)); if (sock->parent != NULL) { /* * This is a child socket and cannot be destroyed except * as a side effect of destroying the parent, so let's go * see if the parent is ready to be destroyed. */ nmsocket_maybe_destroy(sock->parent FLARG_PASS); return; } /* * This is a parent socket (or a standalone). See whether the * children have active handles before deciding whether to * accept destruction. */ LOCK(&sock->lock); if (atomic_load(&sock->active) || atomic_load(&sock->destroying) || !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0) { UNLOCK(&sock->lock); return; } active_handles = atomic_load(&sock->ah); if (sock->children != NULL) { for (size_t i = 0; i < sock->nchildren; i++) { LOCK(&sock->children[i].lock); active_handles += atomic_load(&sock->children[i].ah); UNLOCK(&sock->children[i].lock); } } if (active_handles == 0 || sock->statichandle != NULL) { destroy = true; } NETMGR_TRACE_LOG("%s:%p->active_handles = %d, .statichandle = %p\n", __func__, sock, active_handles, sock->statichandle); if (destroy) { atomic_store(&sock->destroying, true); UNLOCK(&sock->lock); nmsocket_cleanup(sock, true FLARG_PASS); } else { UNLOCK(&sock->lock); } } void isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { REQUIRE(sock->parent == NULL); NETMGR_TRACE_LOG("isc___nmsocket_prep_destroy():%p->references = " "%" PRIuFAST32 "\n", sock, isc_refcount_current(&sock->references)); /* * The final external reference to the socket is gone. We can try * destroying the socket, but we have to wait for all the inflight * handles to finish first. */ atomic_store(&sock->active, false); /* * If the socket has children, they'll need to be marked inactive * so they can be cleaned up too. */ if (sock->children != NULL) { for (size_t i = 0; i < sock->nchildren; i++) { atomic_store(&sock->children[i].active, false); } } /* * If we're here then we already stopped listening; otherwise * we'd have a hanging reference from the listening process. * * If it's a regular socket we may need to close it. */ if (!atomic_load(&sock->closed)) { switch (sock->type) { case isc_nm_udpsocket: isc__nm_udp_close(sock); return; case isc_nm_tcpsocket: isc__nm_tcp_close(sock); return; case isc_nm_tcpdnssocket: isc__nm_tcpdns_close(sock); return; default: break; } } nmsocket_maybe_destroy(sock FLARG_PASS); } void isc___nmsocket_detach(isc_nmsocket_t **sockp FLARG) { REQUIRE(sockp != NULL && *sockp != NULL); REQUIRE(VALID_NMSOCK(*sockp)); isc_nmsocket_t *sock = *sockp, *rsock = NULL; *sockp = NULL; /* * If the socket is a part of a set (a child socket) we are * counting references for the whole set at the parent. */ if (sock->parent != NULL) { rsock = sock->parent; INSIST(rsock->parent == NULL); /* Sanity check */ } else { rsock = sock; } NETMGR_TRACE_LOG("isc__nmsocket_detach():%p->references = %" PRIuFAST32 "\n", rsock, isc_refcount_current(&rsock->references) - 1); if (isc_refcount_decrement(&rsock->references) == 1) { isc___nmsocket_prep_destroy(rsock FLARG_PASS); } } void isc_nmsocket_close(isc_nmsocket_t **sockp) { REQUIRE(sockp != NULL); REQUIRE(VALID_NMSOCK(*sockp)); REQUIRE((*sockp)->type == isc_nm_udplistener || (*sockp)->type == isc_nm_tcplistener || (*sockp)->type == isc_nm_tcpdnslistener); isc__nmsocket_detach(sockp); } void isc___nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, isc_sockaddr_t *iface FLARG) { uint16_t family; REQUIRE(sock != NULL); REQUIRE(mgr != NULL); REQUIRE(iface != NULL); family = iface->type.sa.sa_family; *sock = (isc_nmsocket_t){ .type = type, .iface = *iface, .fd = -1, .inactivehandles = isc_astack_new( mgr->mctx, ISC_NM_HANDLES_STACK_SIZE), .inactivereqs = isc_astack_new( mgr->mctx, ISC_NM_REQS_STACK_SIZE) }; #if NETMGR_TRACE sock->backtrace_size = backtrace(sock->backtrace, TRACE_SIZE); ISC_LINK_INIT(sock, active_link); ISC_LIST_INIT(sock->active_handles); LOCK(&mgr->lock); ISC_LIST_APPEND(mgr->active_sockets, sock, active_link); UNLOCK(&mgr->lock); #endif isc_nm_attach(mgr, &sock->mgr); sock->uv_handle.handle.data = sock; ISC_LINK_INIT(&sock->quotacb, link); switch (type) { case isc_nm_udpsocket: case isc_nm_udplistener: if (family == AF_INET) { sock->statsindex = udp4statsindex; } else { sock->statsindex = udp6statsindex; } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_ACTIVE]); break; case isc_nm_tcpsocket: case isc_nm_tcplistener: case isc_nm_tcpdnssocket: case isc_nm_tcpdnslistener: if (family == AF_INET) { sock->statsindex = tcp4statsindex; } else { sock->statsindex = tcp6statsindex; } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_ACTIVE]); break; default: break; } isc_mutex_init(&sock->lock); isc_condition_init(&sock->cond); isc_condition_init(&sock->scond); isc_refcount_init(&sock->references, 1); NETMGR_TRACE_LOG("isc__nmsocket_init():%p->references = %" PRIuFAST32 "\n", sock, isc_refcount_current(&sock->references)); atomic_init(&sock->active, true); atomic_init(&sock->sequential, false); atomic_init(&sock->readpaused, false); atomic_init(&sock->closing, false); atomic_init(&sock->listening, 0); atomic_init(&sock->closed, 0); atomic_init(&sock->destroying, 0); atomic_init(&sock->ah, 0); atomic_init(&sock->client, 0); atomic_init(&sock->connecting, false); atomic_init(&sock->keepalive, false); atomic_init(&sock->connected, false); atomic_init(&sock->timedout, false); atomic_init(&sock->active_child_connections, 0); sock->magic = NMSOCK_MAGIC; } void isc__nmsocket_clearcb(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(!isc__nm_in_netthread() || sock->tid == isc_nm_tid()); sock->recv_cb = NULL; sock->recv_cbarg = NULL; sock->accept_cb = NULL; sock->accept_cbarg = NULL; sock->connect_cb = NULL; sock->connect_cbarg = NULL; } void isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) { isc__networker_t *worker = NULL; REQUIRE(VALID_NMSOCK(sock)); worker = &sock->mgr->workers[sock->tid]; REQUIRE(buf->base == worker->recvbuf); worker->recvbuf_inuse = false; } static isc_nmhandle_t * alloc_handle(isc_nmsocket_t *sock) { isc_nmhandle_t *handle = isc_mem_get(sock->mgr->mctx, sizeof(isc_nmhandle_t) + sock->extrahandlesize); *handle = (isc_nmhandle_t){ .magic = NMHANDLE_MAGIC }; #ifdef NETMGR_TRACE ISC_LINK_INIT(handle, active_link); #endif isc_refcount_init(&handle->references, 1); return (handle); } isc_nmhandle_t * isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, isc_sockaddr_t *local FLARG) { isc_nmhandle_t *handle = NULL; REQUIRE(VALID_NMSOCK(sock)); handle = isc_astack_pop(sock->inactivehandles); if (handle == NULL) { handle = alloc_handle(sock); } else { isc_refcount_init(&handle->references, 1); INSIST(VALID_NMHANDLE(handle)); } NETMGR_TRACE_LOG( "isc__nmhandle_get():handle %p->references = %" PRIuFAST32 "\n", handle, isc_refcount_current(&handle->references)); isc___nmsocket_attach(sock, &handle->sock FLARG_PASS); #if NETMGR_TRACE handle->backtrace_size = backtrace(handle->backtrace, TRACE_SIZE); #endif if (peer != NULL) { handle->peer = *peer; } else { handle->peer = sock->peer; } if (local != NULL) { handle->local = *local; } else { handle->local = sock->iface; } (void)atomic_fetch_add(&sock->ah, 1); #ifdef NETMGR_TRACE LOCK(&sock->lock); ISC_LIST_APPEND(sock->active_handles, handle, active_link); UNLOCK(&sock->lock); #endif switch (sock->type) { case isc_nm_udpsocket: case isc_nm_tcpdnssocket: if (!atomic_load(&sock->client)) { break; } FALLTHROUGH; case isc_nm_tcpsocket: INSIST(sock->statichandle == NULL); /* * statichandle must be assigned, not attached; * otherwise, if a handle was detached elsewhere * it could never reach 0 references, and the * handle and socket would never be freed. */ sock->statichandle = handle; break; default: break; } return (handle); } void isc__nmhandle_attach(isc_nmhandle_t *handle, isc_nmhandle_t **handlep FLARG) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(handlep != NULL && *handlep == NULL); NETMGR_TRACE_LOG("isc__nmhandle_attach():handle %p->references = " "%" PRIuFAST32 "\n", handle, isc_refcount_current(&handle->references) + 1); isc_refcount_increment(&handle->references); *handlep = handle; } bool isc_nmhandle_is_stream(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); return (handle->sock->type == isc_nm_tcpsocket || handle->sock->type == isc_nm_tcpdnssocket); } static void nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { size_t extra = sock->extrahandlesize; isc_refcount_destroy(&handle->references); if (handle->dofree != NULL) { handle->dofree(handle->opaque); } *handle = (isc_nmhandle_t){ .magic = 0 }; isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra); } static void nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { bool reuse = false; /* * We do all of this under lock to avoid races with socket * destruction. We have to do this now, because at this point the * socket is either unused or still attached to event->sock. */ LOCK(&sock->lock); #ifdef NETMGR_TRACE ISC_LIST_UNLINK(sock->active_handles, handle, active_link); #endif INSIST(atomic_fetch_sub(&sock->ah, 1) > 0); #if !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ if (atomic_load(&sock->active)) { reuse = isc_astack_trypush(sock->inactivehandles, handle); } #endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */ if (!reuse) { nmhandle_free(sock, handle); } UNLOCK(&sock->lock); } void isc__nmhandle_detach(isc_nmhandle_t **handlep FLARG) { isc_nmsocket_t *sock = NULL; isc_nmhandle_t *handle = NULL; REQUIRE(handlep != NULL); REQUIRE(VALID_NMHANDLE(*handlep)); handle = *handlep; *handlep = NULL; /* * If the closehandle_cb is set, it needs to run asynchronously to * ensure correct ordering of the isc__nm_process_sock_buffer(). */ sock = handle->sock; if (sock->tid == isc_nm_tid() && sock->closehandle_cb == NULL) { nmhandle_detach_cb(&handle FLARG_PASS); } else { isc__netievent_detach_t *event = isc__nm_get_netievent_detach(sock->mgr, sock); /* * we are using implicit "attach" as the last reference * need to be destroyed explicitly in the async callback */ event->handle = handle; FLARG_IEVENT_PASS(event); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)event); } } void isc__nmsocket_shutdown(isc_nmsocket_t *sock); static void nmhandle_detach_cb(isc_nmhandle_t **handlep FLARG) { isc_nmsocket_t *sock = NULL; isc_nmhandle_t *handle = NULL; REQUIRE(handlep != NULL); REQUIRE(VALID_NMHANDLE(*handlep)); handle = *handlep; *handlep = NULL; NETMGR_TRACE_LOG("isc__nmhandle_detach():%p->references = %" PRIuFAST32 "\n", handle, isc_refcount_current(&handle->references) - 1); if (isc_refcount_decrement(&handle->references) > 1) { return; } /* We need an acquire memory barrier here */ (void)isc_refcount_current(&handle->references); sock = handle->sock; handle->sock = NULL; if (handle->doreset != NULL) { handle->doreset(handle->opaque); } nmhandle_deactivate(sock, handle); /* * The handle is gone now. If the socket has a callback configured * for that (e.g., to perform cleanup after request processing), * call it now, or schedule it to run asynchronously. */ if (sock->closehandle_cb != NULL) { if (sock->tid == isc_nm_tid()) { sock->closehandle_cb(sock); } else { isc__netievent_close_t *event = isc__nm_get_netievent_close(sock->mgr, sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)event); } } if (handle == sock->statichandle) { /* statichandle is assigned, not attached. */ sock->statichandle = NULL; } isc___nmsocket_detach(&sock FLARG_PASS); } void * isc_nmhandle_getdata(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); return (handle->opaque); } void isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg, isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree) { REQUIRE(VALID_NMHANDLE(handle)); handle->opaque = arg; handle->doreset = doreset; handle->dofree = dofree; } void isc__nm_alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { REQUIRE(len <= NM_BIG_BUF); if (sock->buf == NULL) { /* We don't have the buffer at all */ size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF; sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len); sock->buf_size = alloc_len; } else { /* We have the buffer but it's too small */ sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf, NM_BIG_BUF); sock->buf_size = NM_BIG_BUF; } } void isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, isc_result_t eresult) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(req)); if (req->cb.send != NULL) { isc__nm_sendcb(sock, req, eresult, true); } else { isc__nm_uvreq_put(&req, sock); } } void isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { REQUIRE(sock->accepting); REQUIRE(sock->server); /* * Detach the quota early to make room for other connections; * otherwise it'd be detached later asynchronously, and clog * the quota unnecessarily. */ if (sock->quota != NULL) { isc_quota_detach(&sock->quota); } isc__nmsocket_detach(&sock->server); sock->accepting = false; switch (eresult) { case ISC_R_NOTCONNECTED: /* IGNORE: The client disconnected before we could accept */ break; default: isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, "Accepting TCP connection failed: %s", isc_result_totext(eresult)); } } void isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, isc_result_t eresult, bool async) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(req)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(req->cb.connect != NULL); isc__nmsocket_timer_stop(sock); uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock); INSIST(atomic_compare_exchange_strong(&sock->connecting, &(bool){ true }, false)); isc__nmsocket_clearcb(sock); isc__nm_connectcb(sock, req, eresult, async); isc__nmsocket_prep_destroy(sock); } void isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, bool async) { REQUIRE(VALID_NMSOCK(sock)); UNUSED(async); switch (sock->type) { case isc_nm_udpsocket: isc__nm_udp_failed_read_cb(sock, result); return; case isc_nm_tcpsocket: isc__nm_tcp_failed_read_cb(sock, result); return; case isc_nm_tcpdnssocket: isc__nm_tcpdns_failed_read_cb(sock, result); return; default: UNREACHABLE(); } } void isc__nmsocket_connecttimeout_cb(uv_timer_t *timer) { uv_connect_t *uvreq = uv_handle_get_data((uv_handle_t *)timer); isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)uvreq); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(atomic_load(&sock->connecting)); REQUIRE(VALID_UVREQ(req)); REQUIRE(VALID_NMHANDLE(req->handle)); isc__nmsocket_timer_stop(sock); /* * Mark the connection as timed out and shutdown the socket. */ INSIST(atomic_compare_exchange_strong(&sock->timedout, &(bool){ false }, true)); isc__nmsocket_clearcb(sock); isc__nmsocket_shutdown(sock); } void isc__nm_accept_connection_log(isc_result_t result, bool can_log_quota) { int level; switch (result) { case ISC_R_SUCCESS: case ISC_R_NOCONN: return; case ISC_R_QUOTA: case ISC_R_SOFTQUOTA: if (!can_log_quota) { return; } level = ISC_LOG_INFO; break; case ISC_R_NOTCONNECTED: level = ISC_LOG_INFO; break; default: level = ISC_LOG_ERROR; } isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, level, "Accepting TCP connection failed: %s", isc_result_totext(result)); } void isc__nmsocket_writetimeout_cb(void *data, isc_result_t eresult) { isc__nm_uvreq_t *req = data; isc_nmsocket_t *sock = NULL; REQUIRE(eresult == ISC_R_TIMEDOUT); REQUIRE(VALID_UVREQ(req)); REQUIRE(VALID_NMSOCK(req->sock)); sock = req->sock; isc__nmsocket_reset(sock); } void isc__nmsocket_readtimeout_cb(uv_timer_t *timer) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)timer); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->reading); if (atomic_load(&sock->client)) { uv_timer_stop(timer); if (sock->recv_cb != NULL) { isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); isc__nm_readcb(sock, req, ISC_R_TIMEDOUT); } if (!isc__nmsocket_timer_running(sock)) { isc__nmsocket_clearcb(sock); isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false); } } else { isc__nm_failed_read_cb(sock, ISC_R_TIMEDOUT, false); } } void isc__nmsocket_timer_restart(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); if (atomic_load(&sock->connecting)) { int r; if (sock->connect_timeout == 0) { return; } r = uv_timer_start(&sock->read_timer, isc__nmsocket_connecttimeout_cb, sock->connect_timeout + 10, 0); UV_RUNTIME_CHECK(uv_timer_start, r); } else { int r; if (sock->read_timeout == 0) { return; } r = uv_timer_start(&sock->read_timer, isc__nmsocket_readtimeout_cb, sock->read_timeout, 0); UV_RUNTIME_CHECK(uv_timer_start, r); } } bool isc__nmsocket_timer_running(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); return (uv_is_active((uv_handle_t *)&sock->read_timer)); } void isc__nmsocket_timer_start(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); if (isc__nmsocket_timer_running(sock)) { return; } isc__nmsocket_timer_restart(sock); } void isc__nmsocket_timer_stop(isc_nmsocket_t *sock) { int r; REQUIRE(VALID_NMSOCK(sock)); /* uv_timer_stop() is idempotent, no need to check if running */ r = uv_timer_stop(&sock->read_timer); UV_RUNTIME_CHECK(uv_timer_stop, r); } isc__nm_uvreq_t * isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) { isc__nm_uvreq_t *req = NULL; req = isc__nm_uvreq_get(sock->mgr, sock); req->cb.recv = sock->recv_cb; req->cbarg = sock->recv_cbarg; switch (sock->type) { case isc_nm_tcpsocket: isc_nmhandle_attach(sock->statichandle, &req->handle); break; default: if (atomic_load(&sock->client)) { isc_nmhandle_attach(sock->statichandle, &req->handle); } else { req->handle = isc__nmhandle_get(sock, sockaddr, NULL); } break; } return (req); } /*%< * Allocator callback for read operations. * * Note this doesn't actually allocate anything, it just assigns the * worker's receive buffer to a socket, and marks it as "in use". */ void isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { isc_nmsocket_t *sock = uv_handle_get_data(handle); isc__networker_t *worker = NULL; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(isc__nm_in_netthread()); /* * The size provided by libuv is only suggested size, and it always * defaults to 64 * 1024 in the current versions of libuv (see * src/unix/udp.c and src/unix/stream.c). */ UNUSED(size); worker = &sock->mgr->workers[sock->tid]; INSIST(!worker->recvbuf_inuse); INSIST(worker->recvbuf != NULL); switch (sock->type) { case isc_nm_udpsocket: buf->len = ISC_NETMGR_UDP_RECVBUF_SIZE; break; case isc_nm_tcpsocket: case isc_nm_tcpdnssocket: buf->len = ISC_NETMGR_TCP_RECVBUF_SIZE; break; default: UNREACHABLE(); } REQUIRE(buf->len <= ISC_NETMGR_RECVBUF_SIZE); buf->base = worker->recvbuf; worker->recvbuf_inuse = true; } isc_result_t isc__nm_start_reading(isc_nmsocket_t *sock) { isc_result_t result = ISC_R_SUCCESS; int r; if (sock->reading) { return (ISC_R_SUCCESS); } switch (sock->type) { case isc_nm_udpsocket: r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb, isc__nm_udp_read_cb); break; case isc_nm_tcpsocket: r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, isc__nm_tcp_read_cb); break; case isc_nm_tcpdnssocket: r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, isc__nm_tcpdns_read_cb); break; default: UNREACHABLE(); } if (r != 0) { result = isc__nm_uverr2result(r); } else { sock->reading = true; } return (result); } void isc__nm_stop_reading(isc_nmsocket_t *sock) { int r; if (!sock->reading) { return; } switch (sock->type) { case isc_nm_udpsocket: r = uv_udp_recv_stop(&sock->uv_handle.udp); UV_RUNTIME_CHECK(uv_udp_recv_stop, r); break; case isc_nm_tcpsocket: case isc_nm_tcpdnssocket: r = uv_read_stop(&sock->uv_handle.stream); UV_RUNTIME_CHECK(uv_read_stop, r); break; default: UNREACHABLE(); } sock->reading = false; } bool isc__nm_closing(isc_nmsocket_t *sock) { return (atomic_load(&sock->mgr->closing)); } bool isc__nmsocket_closing(isc_nmsocket_t *sock) { return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) || atomic_load(&sock->mgr->closing) || (sock->server != NULL && !isc__nmsocket_active(sock->server))); } static isc_result_t processbuffer(isc_nmsocket_t *sock) { switch (sock->type) { case isc_nm_tcpdnssocket: return (isc__nm_tcpdns_processbuffer(sock)); default: UNREACHABLE(); } } /* * Process a DNS message. * * If we only have an incomplete DNS message, we don't touch any * timers. If we do have a full message, reset the timer. * * Stop reading if this is a client socket, or if the server socket * has been set to sequential mode, or the number of queries we are * processing simultaneously has reached the clients-per-connection * limit. In this case we'll be called again by resume_processing() * later. */ isc_result_t isc__nm_process_sock_buffer(isc_nmsocket_t *sock) { for (;;) { int_fast32_t ah = atomic_load(&sock->ah); isc_result_t result = processbuffer(sock); switch (result) { case ISC_R_NOMORE: /* * Don't reset the timer until we have a * full DNS message. */ result = isc__nm_start_reading(sock); if (result != ISC_R_SUCCESS) { return (result); } /* * Start the timer only if there are no externally used * active handles, there's always one active handle * attached internally to sock->recv_handle in * accept_connection() */ if (ah == 1) { isc__nmsocket_timer_start(sock); } goto done; case ISC_R_CANCELED: isc__nmsocket_timer_stop(sock); isc__nm_stop_reading(sock); goto done; case ISC_R_SUCCESS: /* * Stop the timer on the successful message read, this * also allows to restart the timer when we have no more * data. */ isc__nmsocket_timer_stop(sock); if (atomic_load(&sock->client) || atomic_load(&sock->sequential) || ah >= STREAM_CLIENTS_PER_CONN) { isc__nm_stop_reading(sock); goto done; } break; default: UNREACHABLE(); } } done: return (ISC_R_SUCCESS); } void isc__nm_resume_processing(void *arg) { isc_nmsocket_t *sock = (isc_nmsocket_t *)arg; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(!atomic_load(&sock->client)); if (isc__nmsocket_closing(sock)) { return; } isc__nm_process_sock_buffer(sock); } void isc_nmhandle_cleartimeout(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); switch (handle->sock->type) { default: handle->sock->read_timeout = 0; if (uv_is_active((uv_handle_t *)&handle->sock->read_timer)) { isc__nmsocket_timer_stop(handle->sock); } } } void isc_nmhandle_settimeout(isc_nmhandle_t *handle, uint32_t timeout) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); switch (handle->sock->type) { default: handle->sock->read_timeout = timeout; isc__nmsocket_timer_restart(handle->sock); } } void isc_nmhandle_keepalive(isc_nmhandle_t *handle, bool value) { isc_nmsocket_t *sock = NULL; REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); sock = handle->sock; switch (sock->type) { case isc_nm_tcpsocket: case isc_nm_tcpdnssocket: atomic_store(&sock->keepalive, value); sock->read_timeout = value ? atomic_load(&sock->mgr->keepalive) : atomic_load(&sock->mgr->idle); sock->write_timeout = value ? atomic_load(&sock->mgr->keepalive) : atomic_load(&sock->mgr->idle); break; default: /* * For any other protocol, this is a no-op. */ return; } } void * isc_nmhandle_getextra(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); return (handle->extra); } isc_sockaddr_t isc_nmhandle_peeraddr(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); return (handle->peer); } isc_sockaddr_t isc_nmhandle_localaddr(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); return (handle->local); } isc_nm_t * isc_nmhandle_netmgr(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); return (handle->sock->mgr); } isc__nm_uvreq_t * isc___nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock FLARG) { isc__nm_uvreq_t *req = NULL; REQUIRE(VALID_NM(mgr)); REQUIRE(VALID_NMSOCK(sock)); if (sock != NULL && isc__nmsocket_active(sock)) { /* Try to reuse one */ req = isc_astack_pop(sock->inactivereqs); } if (req == NULL) { req = isc_mem_get(mgr->mctx, sizeof(*req)); } *req = (isc__nm_uvreq_t){ .magic = 0 }; ISC_LINK_INIT(req, link); req->uv_req.req.data = req; isc___nmsocket_attach(sock, &req->sock FLARG_PASS); req->magic = UVREQ_MAGIC; return (req); } void isc___nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock FLARG) { isc__nm_uvreq_t *req = NULL; isc_nmhandle_t *handle = NULL; REQUIRE(req0 != NULL); REQUIRE(VALID_UVREQ(*req0)); req = *req0; *req0 = NULL; INSIST(sock == req->sock); req->magic = 0; /* * We need to save this first to make sure that handle, * sock, and the netmgr won't all disappear. */ handle = req->handle; req->handle = NULL; #if !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ if (!isc__nmsocket_active(sock) || !isc_astack_trypush(sock->inactivereqs, req)) { isc_mem_put(sock->mgr->mctx, req, sizeof(*req)); } #else /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */ isc_mem_put(sock->mgr->mctx, req, sizeof(*req)); #endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */ if (handle != NULL) { isc__nmhandle_detach(&handle FLARG_PASS); } isc___nmsocket_detach(&sock FLARG_PASS); } void isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); switch (handle->sock->type) { case isc_nm_udpsocket: case isc_nm_udplistener: isc__nm_udp_send(handle, region, cb, cbarg); break; case isc_nm_tcpsocket: isc__nm_tcp_send(handle, region, cb, cbarg); break; case isc_nm_tcpdnssocket: isc__nm_tcpdns_send(handle, region, cb, cbarg); break; default: UNREACHABLE(); } } void isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); /* * This is always called via callback (from accept or connect), and * caller must attach to the handle, so the references always need to be * at least 2. */ REQUIRE(isc_refcount_current(&handle->references) >= 2); switch (handle->sock->type) { case isc_nm_udpsocket: isc__nm_udp_read(handle, cb, cbarg); break; case isc_nm_tcpsocket: isc__nm_tcp_read(handle, cb, cbarg); break; case isc_nm_tcpdnssocket: isc__nm_tcpdns_read(handle, cb, cbarg); break; default: UNREACHABLE(); } } void isc_nm_cancelread(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); switch (handle->sock->type) { case isc_nm_udpsocket: isc__nm_udp_cancelread(handle); break; case isc_nm_tcpsocket: isc__nm_tcp_cancelread(handle); break; case isc_nm_tcpdnssocket: isc__nm_tcpdns_cancelread(handle); break; default: UNREACHABLE(); } } void isc_nm_pauseread(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); isc_nmsocket_t *sock = handle->sock; switch (sock->type) { case isc_nm_tcpsocket: isc__nm_tcp_pauseread(handle); break; default: UNREACHABLE(); } } void isc_nm_resumeread(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); isc_nmsocket_t *sock = handle->sock; switch (sock->type) { case isc_nm_tcpsocket: isc__nm_tcp_resumeread(handle); break; default: UNREACHABLE(); } } void isc_nm_stoplistening(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); switch (sock->type) { case isc_nm_udplistener: isc__nm_udp_stoplistening(sock); break; case isc_nm_tcpdnslistener: isc__nm_tcpdns_stoplistening(sock); break; case isc_nm_tcplistener: isc__nm_tcp_stoplistening(sock); break; default: UNREACHABLE(); } } void isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, isc_result_t eresult, bool async) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); if (!async) { isc__netievent_connectcb_t ievent = { .sock = sock, .req = uvreq, .result = eresult }; isc__nm_async_connectcb(NULL, (isc__netievent_t *)&ievent); } else { isc__netievent_connectcb_t *ievent = isc__nm_get_netievent_connectcb(sock->mgr, sock, uvreq, eresult); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } } void isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_connectcb_t *ievent = (isc__netievent_connectcb_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *uvreq = ievent->req; isc_result_t eresult = ievent->result; UNUSED(worker); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); REQUIRE(ievent->sock->tid == isc_nm_tid()); REQUIRE(uvreq->cb.connect != NULL); uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg); isc__nm_uvreq_put(&uvreq, sock); } void isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, isc_result_t eresult) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); if (eresult == ISC_R_SUCCESS || eresult == ISC_R_TIMEDOUT) { isc__netievent_readcb_t ievent = { .sock = sock, .req = uvreq, .result = eresult }; isc__nm_async_readcb(NULL, (isc__netievent_t *)&ievent); } else { isc__netievent_readcb_t *ievent = isc__nm_get_netievent_readcb( sock->mgr, sock, uvreq, eresult); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } } void isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_readcb_t *ievent = (isc__netievent_readcb_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *uvreq = ievent->req; isc_result_t eresult = ievent->result; isc_region_t region; UNUSED(worker); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); REQUIRE(sock->tid == isc_nm_tid()); region.base = (unsigned char *)uvreq->uvbuf.base; region.length = uvreq->uvbuf.len; uvreq->cb.recv(uvreq->handle, eresult, ®ion, uvreq->cbarg); isc__nm_uvreq_put(&uvreq, sock); } void isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, isc_result_t eresult, bool async) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); if (!async) { isc__netievent_sendcb_t ievent = { .sock = sock, .req = uvreq, .result = eresult }; isc__nm_async_sendcb(NULL, (isc__netievent_t *)&ievent); return; } isc__netievent_sendcb_t *ievent = isc__nm_get_netievent_sendcb(sock->mgr, sock, uvreq, eresult); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } void isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_sendcb_t *ievent = (isc__netievent_sendcb_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *uvreq = ievent->req; isc_result_t eresult = ievent->result; UNUSED(worker); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); REQUIRE(sock->tid == isc_nm_tid()); uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg); isc__nm_uvreq_put(&uvreq, sock); } static void isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_close_t *ievent = (isc__netievent_close_t *)ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->closehandle_cb != NULL); UNUSED(worker); ievent->sock->closehandle_cb(sock); } void isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_detach_t *ievent = (isc__netievent_detach_t *)ev0; FLARG_IEVENT(ievent); REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(VALID_NMHANDLE(ievent->handle)); REQUIRE(ievent->sock->tid == isc_nm_tid()); UNUSED(worker); nmhandle_detach_cb(&ievent->handle FLARG_PASS); } static void reset_shutdown(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); isc__nmsocket_shutdown(sock); isc__nmsocket_detach(&sock); } void isc__nmsocket_reset(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); switch (sock->type) { case isc_nm_tcpsocket: case isc_nm_tcpdnssocket: /* * This can be called from the TCP write timeout. */ REQUIRE(sock->parent == NULL); break; default: UNREACHABLE(); break; } if (!uv_is_closing(&sock->uv_handle.handle) && uv_is_active(&sock->uv_handle.handle)) { /* * The real shutdown will be handled in the respective * close functions. */ isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL }); int r = uv_tcp_close_reset(&sock->uv_handle.tcp, reset_shutdown); UV_RUNTIME_CHECK(uv_tcp_close_reset, r); } else { isc__nmsocket_shutdown(sock); } } void isc__nmsocket_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); switch (sock->type) { case isc_nm_udpsocket: isc__nm_udp_shutdown(sock); break; case isc_nm_tcpsocket: isc__nm_tcp_shutdown(sock); break; case isc_nm_tcpdnssocket: isc__nm_tcpdns_shutdown(sock); break; case isc_nm_udplistener: case isc_nm_tcplistener: case isc_nm_tcpdnslistener: return; default: UNREACHABLE(); } } static void shutdown_walk_cb(uv_handle_t *handle, void *arg) { isc_nmsocket_t *sock = uv_handle_get_data(handle); UNUSED(arg); if (uv_is_closing(handle)) { return; } switch (handle->type) { case UV_UDP: isc__nmsocket_shutdown(sock); return; case UV_TCP: switch (sock->type) { case isc_nm_tcpsocket: case isc_nm_tcpdnssocket: if (sock->parent == NULL) { /* Reset the TCP connections on shutdown */ isc__nmsocket_reset(sock); return; } FALLTHROUGH; default: isc__nmsocket_shutdown(sock); } return; default: return; } } void isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); uv_walk(&worker->loop, shutdown_walk_cb, NULL); } bool isc__nm_acquire_interlocked(isc_nm_t *mgr) { if (!isc__nm_in_netthread()) { return (false); } LOCK(&mgr->lock); bool success = atomic_compare_exchange_strong( &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED }, isc_nm_tid()); UNLOCK(&mgr->lock); return (success); } void isc__nm_drop_interlocked(isc_nm_t *mgr) { if (!isc__nm_in_netthread()) { return; } LOCK(&mgr->lock); int tid = atomic_exchange(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED); INSIST(tid != ISC_NETMGR_NON_INTERLOCKED); BROADCAST(&mgr->wkstatecond); UNLOCK(&mgr->lock); } void isc__nm_acquire_interlocked_force(isc_nm_t *mgr) { if (!isc__nm_in_netthread()) { return; } LOCK(&mgr->lock); while (!atomic_compare_exchange_strong( &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED }, isc_nm_tid())) { WAIT(&mgr->wkstatecond, &mgr->lock); } UNLOCK(&mgr->lock); } void isc_nm_setstats(isc_nm_t *mgr, isc_stats_t *stats) { REQUIRE(VALID_NM(mgr)); REQUIRE(mgr->stats == NULL); REQUIRE(isc_stats_ncounters(stats) == isc_sockstatscounter_max); isc_stats_attach(stats, &mgr->stats); } void isc__nm_incstats(isc_nm_t *mgr, isc_statscounter_t counterid) { REQUIRE(VALID_NM(mgr)); REQUIRE(counterid != -1); if (mgr->stats != NULL) { isc_stats_increment(mgr->stats, counterid); } } void isc__nm_decstats(isc_nm_t *mgr, isc_statscounter_t counterid) { REQUIRE(VALID_NM(mgr)); REQUIRE(counterid != -1); if (mgr->stats != NULL) { isc_stats_decrement(mgr->stats, counterid); } } isc_result_t isc__nm_socket(int domain, int type, int protocol, uv_os_sock_t *sockp) { #ifdef WIN32 SOCKET sock; sock = socket(domain, type, protocol); if (sock == INVALID_SOCKET) { char strbuf[ISC_STRERRORSIZE]; DWORD socket_errno = WSAGetLastError(); switch (socket_errno) { case WSAEMFILE: case WSAENOBUFS: return (ISC_R_NORESOURCES); case WSAEPROTONOSUPPORT: case WSAEPFNOSUPPORT: case WSAEAFNOSUPPORT: return (ISC_R_FAMILYNOSUPPORT); default: strerror_r(socket_errno, strbuf, sizeof(strbuf)); UNEXPECTED_ERROR( __FILE__, __LINE__, "socket() failed with error code %lu: %s", socket_errno, strbuf); return (ISC_R_UNEXPECTED); } } #else int sock = socket(domain, type, protocol); if (sock < 0) { return (isc_errno_toresult(errno)); } #endif *sockp = (uv_os_sock_t)sock; return (ISC_R_SUCCESS); } void isc__nm_closesocket(uv_os_sock_t sock) { #ifdef WIN32 closesocket(sock); #else close(sock); #endif } #define setsockopt_on(socket, level, name) \ setsockopt(socket, level, name, &(int){ 1 }, sizeof(int)) #define setsockopt_off(socket, level, name) \ setsockopt(socket, level, name, &(int){ 0 }, sizeof(int)) isc_result_t isc__nm_socket_freebind(uv_os_sock_t fd, sa_family_t sa_family) { /* * Set the IP_FREEBIND (or equivalent option) on the uv_handle. */ #ifdef IP_FREEBIND UNUSED(sa_family); if (setsockopt_on(fd, IPPROTO_IP, IP_FREEBIND) == -1) { return (ISC_R_FAILURE); } return (ISC_R_SUCCESS); #elif defined(IP_BINDANY) || defined(IPV6_BINDANY) if (sa_family == AF_INET) { #if defined(IP_BINDANY) if (setsockopt_on(fd, IPPROTO_IP, IP_BINDANY) == -1) { return (ISC_R_FAILURE); } return (ISC_R_SUCCESS); #endif } else if (sa_family == AF_INET6) { #if defined(IPV6_BINDANY) if (setsockopt_on(fd, IPPROTO_IPV6, IPV6_BINDANY) == -1) { return (ISC_R_FAILURE); } return (ISC_R_SUCCESS); #endif } return (ISC_R_NOTIMPLEMENTED); #elif defined(SO_BINDANY) UNUSED(sa_family); if (setsockopt_on(fd, SOL_SOCKET, SO_BINDANY) == -1) { return (ISC_R_FAILURE); } return (ISC_R_SUCCESS); #else UNUSED(fd); UNUSED(sa_family); return (ISC_R_NOTIMPLEMENTED); #endif } isc_result_t isc__nm_socket_reuse(uv_os_sock_t fd) { /* * Generally, the SO_REUSEADDR socket option allows reuse of * local addresses. * * On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some * additional refinements for programs that use multicast. * * On Linux, SO_REUSEPORT has different semantics: it _shares_ the port * rather than steal it from the current listener, so we don't use it * here, but rather in isc__nm_socket_reuse_lb(). * * On Windows, it also allows a socket to forcibly bind to a port in use * by another socket. */ #if defined(SO_REUSEPORT) && !defined(__linux__) if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT) == -1) { return (ISC_R_FAILURE); } return (ISC_R_SUCCESS); #elif defined(SO_REUSEADDR) if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) == -1) { return (ISC_R_FAILURE); } return (ISC_R_SUCCESS); #else UNUSED(fd); return (ISC_R_NOTIMPLEMENTED); #endif } isc_result_t isc__nm_socket_reuse_lb(uv_os_sock_t fd) { /* * On FreeBSD 12+, SO_REUSEPORT_LB socket option allows sockets to be * bound to an identical socket address. For UDP sockets, the use of * this option can provide better distribution of incoming datagrams to * multiple processes (or threads) as compared to the traditional * technique of having multiple processes compete to receive datagrams * on the same socket. * * On Linux, the same thing is achieved simply with SO_REUSEPORT. */ #if defined(SO_REUSEPORT_LB) if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT_LB) == -1) { return (ISC_R_FAILURE); } else { return (ISC_R_SUCCESS); } #elif defined(SO_REUSEPORT) && defined(__linux__) if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT) == -1) { return (ISC_R_FAILURE); } else { return (ISC_R_SUCCESS); } #else UNUSED(fd); return (ISC_R_NOTIMPLEMENTED); #endif } isc_result_t isc__nm_socket_incoming_cpu(uv_os_sock_t fd) { #ifdef SO_INCOMING_CPU if (setsockopt_on(fd, SOL_SOCKET, SO_INCOMING_CPU) == -1) { return (ISC_R_FAILURE); } else { return (ISC_R_SUCCESS); } #else UNUSED(fd); #endif return (ISC_R_NOTIMPLEMENTED); } isc_result_t isc__nm_socket_disable_pmtud(uv_os_sock_t fd, sa_family_t sa_family) { /* * Disable the Path MTU Discovery on IP packets */ if (sa_family == AF_INET6) { #if defined(IPV6_DONTFRAG) if (setsockopt_off(fd, IPPROTO_IPV6, IPV6_DONTFRAG) == -1) { return (ISC_R_FAILURE); } else { return (ISC_R_SUCCESS); } #elif defined(IPV6_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT) if (setsockopt(fd, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &(int){ IP_PMTUDISC_OMIT }, sizeof(int)) == -1) { return (ISC_R_FAILURE); } else { return (ISC_R_SUCCESS); } #else UNUSED(fd); #endif } else if (sa_family == AF_INET) { #if defined(IP_DONTFRAG) if (setsockopt_off(fd, IPPROTO_IP, IP_DONTFRAG) == -1) { return (ISC_R_FAILURE); } else { return (ISC_R_SUCCESS); } #elif defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT) if (setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER, &(int){ IP_PMTUDISC_OMIT }, sizeof(int)) == -1) { return (ISC_R_FAILURE); } else { return (ISC_R_SUCCESS); } #else UNUSED(fd); #endif } else { return (ISC_R_FAMILYNOSUPPORT); } return (ISC_R_NOTIMPLEMENTED); } #if defined(_WIN32) #define TIMEOUT_TYPE DWORD #define TIMEOUT_DIV 1000 #define TIMEOUT_OPTNAME TCP_MAXRT #elif defined(TCP_CONNECTIONTIMEOUT) #define TIMEOUT_TYPE int #define TIMEOUT_DIV 1000 #define TIMEOUT_OPTNAME TCP_CONNECTIONTIMEOUT #elif defined(TCP_RXT_CONNDROPTIME) #define TIMEOUT_TYPE int #define TIMEOUT_DIV 1000 #define TIMEOUT_OPTNAME TCP_RXT_CONNDROPTIME #elif defined(TCP_USER_TIMEOUT) #define TIMEOUT_TYPE unsigned int #define TIMEOUT_DIV 1 #define TIMEOUT_OPTNAME TCP_USER_TIMEOUT #elif defined(TCP_KEEPINIT) #define TIMEOUT_TYPE int #define TIMEOUT_DIV 1000 #define TIMEOUT_OPTNAME TCP_KEEPINIT #endif isc_result_t isc__nm_socket_connectiontimeout(uv_os_sock_t fd, int timeout_ms) { #if defined(TIMEOUT_OPTNAME) TIMEOUT_TYPE timeout = timeout_ms / TIMEOUT_DIV; if (timeout == 0) { timeout = 1; } if (setsockopt(fd, IPPROTO_TCP, TIMEOUT_OPTNAME, &timeout, sizeof(timeout)) == -1) { return (ISC_R_FAILURE); } return (ISC_R_SUCCESS); #else UNUSED(fd); UNUSED(timeout_ms); return (ISC_R_SUCCESS); #endif } isc_result_t isc__nm_socket_tcp_nodelay(uv_os_sock_t fd) { #ifdef TCP_NODELAY if (setsockopt_on(fd, IPPROTO_TCP, TCP_NODELAY) == -1) { return (ISC_R_FAILURE); } else { return (ISC_R_SUCCESS); } #else UNUSED(fd); return (ISC_R_SUCCESS); #endif } static isc_threadresult_t isc__nm_work_run(isc_threadarg_t arg) { isc__nm_work_t *work = (isc__nm_work_t *)arg; work->cb(work->data); return ((isc_threadresult_t)0); } static void isc__nm_work_cb(uv_work_t *req) { isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req); if (isc_tid_v == SIZE_MAX) { isc__trampoline_t *trampoline_arg = isc__trampoline_get(isc__nm_work_run, work); (void)isc__trampoline_run(trampoline_arg); } else { (void)isc__nm_work_run((isc_threadarg_t)work); } } static void isc__nm_after_work_cb(uv_work_t *req, int status) { isc_result_t result = ISC_R_SUCCESS; isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req); isc_nm_t *netmgr = work->netmgr; if (status != 0) { result = isc__nm_uverr2result(status); } work->after_cb(work->data, result); isc_mem_put(netmgr->mctx, work, sizeof(*work)); isc_nm_detach(&netmgr); } void isc_nm_work_offload(isc_nm_t *netmgr, isc_nm_workcb_t work_cb, isc_nm_after_workcb_t after_work_cb, void *data) { isc__networker_t *worker = NULL; isc__nm_work_t *work = NULL; int r; REQUIRE(isc__nm_in_netthread()); REQUIRE(VALID_NM(netmgr)); worker = &netmgr->workers[isc_nm_tid()]; work = isc_mem_get(netmgr->mctx, sizeof(*work)); *work = (isc__nm_work_t){ .cb = work_cb, .after_cb = after_work_cb, .data = data, }; isc_nm_attach(netmgr, &work->netmgr); uv_req_set_data((uv_req_t *)&work->req, work); r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb, isc__nm_after_work_cb); UV_RUNTIME_CHECK(uv_queue_work, r); } void isc_nm_timer_create(isc_nmhandle_t *handle, isc_nm_timer_cb cb, void *cbarg, isc_nm_timer_t **timerp) { isc__networker_t *worker = NULL; isc_nmsocket_t *sock = NULL; isc_nm_timer_t *timer = NULL; int r; REQUIRE(isc__nm_in_netthread()); REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); sock = handle->sock; worker = &sock->mgr->workers[isc_nm_tid()]; timer = isc_mem_get(sock->mgr->mctx, sizeof(*timer)); *timer = (isc_nm_timer_t){ .cb = cb, .cbarg = cbarg }; isc_refcount_init(&timer->references, 1); isc_nmhandle_attach(handle, &timer->handle); r = uv_timer_init(&worker->loop, &timer->timer); UV_RUNTIME_CHECK(uv_timer_init, r); uv_handle_set_data((uv_handle_t *)&timer->timer, timer); *timerp = timer; } void isc_nm_timer_attach(isc_nm_timer_t *timer, isc_nm_timer_t **timerp) { REQUIRE(timer != NULL); REQUIRE(timerp != NULL && *timerp == NULL); isc_refcount_increment(&timer->references); *timerp = timer; } static void timer_destroy(uv_handle_t *uvhandle) { isc_nm_timer_t *timer = uv_handle_get_data(uvhandle); isc_nmhandle_t *handle = timer->handle; isc_mem_t *mctx = timer->handle->sock->mgr->mctx; isc_mem_put(mctx, timer, sizeof(*timer)); isc_nmhandle_detach(&handle); } void isc_nm_timer_detach(isc_nm_timer_t **timerp) { isc_nm_timer_t *timer = NULL; isc_nmhandle_t *handle = NULL; REQUIRE(timerp != NULL && *timerp != NULL); timer = *timerp; *timerp = NULL; handle = timer->handle; REQUIRE(isc__nm_in_netthread()); REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); if (isc_refcount_decrement(&timer->references) == 1) { int r = uv_timer_stop(&timer->timer); UV_RUNTIME_CHECK(uv_timer_stop, r); uv_close((uv_handle_t *)&timer->timer, timer_destroy); } } static void timer_cb(uv_timer_t *uvtimer) { isc_nm_timer_t *timer = uv_handle_get_data((uv_handle_t *)uvtimer); REQUIRE(timer->cb != NULL); timer->cb(timer->cbarg, ISC_R_TIMEDOUT); } void isc_nm_timer_start(isc_nm_timer_t *timer, uint64_t timeout) { int r = uv_timer_start(&timer->timer, timer_cb, timeout, 0); UV_RUNTIME_CHECK(uv_timer_start, r); } void isc_nm_timer_stop(isc_nm_timer_t *timer) { int r = uv_timer_stop(&timer->timer); UV_RUNTIME_CHECK(uv_timer_stop, r); } #ifdef NETMGR_TRACE /* * Dump all active sockets in netmgr. We output to stderr * as the logger might be already shut down. */ static const char * nmsocket_type_totext(isc_nmsocket_type type) { switch (type) { case isc_nm_udpsocket: return ("isc_nm_udpsocket"); case isc_nm_udplistener: return ("isc_nm_udplistener"); case isc_nm_tcpsocket: return ("isc_nm_tcpsocket"); case isc_nm_tcplistener: return ("isc_nm_tcplistener"); case isc_nm_tcpdnslistener: return ("isc_nm_tcpdnslistener"); case isc_nm_tcpdnssocket: return ("isc_nm_tcpdnssocket"); default: UNREACHABLE(); } } static void nmhandle_dump(isc_nmhandle_t *handle) { fprintf(stderr, "Active handle %p, refs %" PRIuFAST32 "\n", handle, isc_refcount_current(&handle->references)); fprintf(stderr, "Created by:\n"); backtrace_symbols_fd(handle->backtrace, handle->backtrace_size, STDERR_FILENO); fprintf(stderr, "\n\n"); } static void nmsocket_dump(isc_nmsocket_t *sock) { isc_nmhandle_t *handle = NULL; LOCK(&sock->lock); fprintf(stderr, "\n=================\n"); fprintf(stderr, "Active %s socket %p, type %s, refs %" PRIuFAST32 "\n", atomic_load(&sock->client) ? "client" : "server", sock, nmsocket_type_totext(sock->type), isc_refcount_current(&sock->references)); fprintf(stderr, "Parent %p, listener %p, server %p, statichandle = " "%p\n", sock->parent, sock->listener, sock->server, sock->statichandle); fprintf(stderr, "Flags:%s%s%s%s%s\n", atomic_load(&sock->active) ? " active" : "", atomic_load(&sock->closing) ? " closing" : "", atomic_load(&sock->destroying) ? " destroying" : "", atomic_load(&sock->connecting) ? " connecting" : "", sock->accepting ? " accepting" : ""); fprintf(stderr, "Created by:\n"); backtrace_symbols_fd(sock->backtrace, sock->backtrace_size, STDERR_FILENO); fprintf(stderr, "\n"); for (handle = ISC_LIST_HEAD(sock->active_handles); handle != NULL; handle = ISC_LIST_NEXT(handle, active_link)) { static bool first = true; if (first) { fprintf(stderr, "Active handles:\n"); first = false; } nmhandle_dump(handle); } fprintf(stderr, "\n"); UNLOCK(&sock->lock); } void isc__nm_dump_active(isc_nm_t *nm) { isc_nmsocket_t *sock = NULL; REQUIRE(VALID_NM(nm)); LOCK(&nm->lock); for (sock = ISC_LIST_HEAD(nm->active_sockets); sock != NULL; sock = ISC_LIST_NEXT(sock, active_link)) { static bool first = true; if (first) { fprintf(stderr, "Outstanding sockets\n"); first = false; } nmsocket_dump(sock); } UNLOCK(&nm->lock); } #endif