From be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 04:57:58 +0200 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- web/server/h2o/libh2o/lib/common/socketpool.c | 342 ++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 web/server/h2o/libh2o/lib/common/socketpool.c (limited to 'web/server/h2o/libh2o/lib/common/socketpool.c') diff --git a/web/server/h2o/libh2o/lib/common/socketpool.c b/web/server/h2o/libh2o/lib/common/socketpool.c new file mode 100644 index 00000000..da69933f --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socketpool.c @@ -0,0 +1,342 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include "h2o/hostinfo.h" +#include "h2o/linklist.h" +#include "h2o/socketpool.h" +#include "h2o/string_.h" +#include "h2o/timeout.h" + +struct pool_entry_t { + h2o_socket_export_t sockinfo; + h2o_linklist_t link; + uint64_t added_at; +}; + +struct st_h2o_socketpool_connect_request_t { + void *data; + h2o_socketpool_connect_cb cb; + h2o_socketpool_t *pool; + h2o_loop_t *loop; + h2o_hostinfo_getaddr_req_t *getaddr_req; + h2o_socket_t *sock; +}; + +static void destroy_detached(struct pool_entry_t *entry) +{ + h2o_socket_dispose_export(&entry->sockinfo); + free(entry); +} + +static void destroy_attached(struct pool_entry_t *entry) +{ + h2o_linklist_unlink(&entry->link); + destroy_detached(entry); +} + +static void destroy_expired(h2o_socketpool_t *pool) +{ + /* caller should lock the mutex */ + uint64_t expire_before = h2o_now(pool->_interval_cb.loop) - pool->timeout; + while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { + struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + if (entry->added_at > expire_before) + break; + destroy_attached(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + } +} + +static void on_timeout(h2o_timeout_entry_t *timeout_entry) +{ + /* FIXME decrease the frequency of this function being called; the expiration + * check can be (should be) performed in the `connect` fuction as well + */ + h2o_socketpool_t *pool = H2O_STRUCT_FROM_MEMBER(h2o_socketpool_t, _interval_cb.entry, timeout_entry); + + if (pthread_mutex_trylock(&pool->_shared.mutex) == 0) { + destroy_expired(pool); + pthread_mutex_unlock(&pool->_shared.mutex); + } + + h2o_timeout_link(pool->_interval_cb.loop, &pool->_interval_cb.timeout, &pool->_interval_cb.entry); +} + +static void common_init(h2o_socketpool_t *pool, h2o_socketpool_type_t type, h2o_iovec_t host, int is_ssl, size_t capacity) +{ + memset(pool, 0, sizeof(*pool)); + + pool->type = type; + pool->peer.host = h2o_strdup(NULL, host.base, host.len); + pool->is_ssl = is_ssl; + pool->capacity = capacity; + pool->timeout = UINT64_MAX; + + pthread_mutex_init(&pool->_shared.mutex, NULL); + h2o_linklist_init_anchor(&pool->_shared.sockets); +} + +void h2o_socketpool_init_by_address(h2o_socketpool_t *pool, struct sockaddr *sa, socklen_t salen, int is_ssl, size_t capacity) +{ + char host[NI_MAXHOST]; + size_t host_len; + + assert(salen <= sizeof(pool->peer.sockaddr.bytes)); + + if ((host_len = h2o_socket_getnumerichost(sa, salen, host)) == SIZE_MAX) { + if (sa->sa_family != AF_UNIX) + h2o_fatal("failed to convert a non-unix socket address to a numerical representation"); + /* use the sockaddr_un::sun_path as the SNI indicator (is that the right thing to do?) */ + strcpy(host, ((struct sockaddr_un *)sa)->sun_path); + host_len = strlen(host); + } + + common_init(pool, H2O_SOCKETPOOL_TYPE_SOCKADDR, h2o_iovec_init(host, host_len), is_ssl, capacity); + memcpy(&pool->peer.sockaddr.bytes, sa, salen); + pool->peer.sockaddr.len = salen; +} + +void h2o_socketpool_init_by_hostport(h2o_socketpool_t *pool, h2o_iovec_t host, uint16_t port, int is_ssl, size_t capacity) +{ + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + + if (h2o_hostinfo_aton(host, &sin.sin_addr) == 0) { + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + h2o_socketpool_init_by_address(pool, (void *)&sin, sizeof(sin), is_ssl, capacity); + return; + } + + common_init(pool, H2O_SOCKETPOOL_TYPE_NAMED, host, is_ssl, capacity); + pool->peer.named_serv.base = h2o_mem_alloc(sizeof(H2O_UINT16_LONGEST_STR)); + pool->peer.named_serv.len = sprintf(pool->peer.named_serv.base, "%u", (unsigned)port); +} + +void h2o_socketpool_dispose(h2o_socketpool_t *pool) +{ + pthread_mutex_lock(&pool->_shared.mutex); + while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { + struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + destroy_attached(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + } + pthread_mutex_unlock(&pool->_shared.mutex); + pthread_mutex_destroy(&pool->_shared.mutex); + + if (pool->_interval_cb.loop != NULL) { + h2o_timeout_unlink(&pool->_interval_cb.entry); + h2o_timeout_dispose(pool->_interval_cb.loop, &pool->_interval_cb.timeout); + } + free(pool->peer.host.base); + switch (pool->type) { + case H2O_SOCKETPOOL_TYPE_NAMED: + free(pool->peer.named_serv.base); + break; + case H2O_SOCKETPOOL_TYPE_SOCKADDR: + break; + } +} + +void h2o_socketpool_set_timeout(h2o_socketpool_t *pool, h2o_loop_t *loop, uint64_t msec) +{ + pool->timeout = msec; + + pool->_interval_cb.loop = loop; + h2o_timeout_init(loop, &pool->_interval_cb.timeout, 1000); + pool->_interval_cb.entry.cb = on_timeout; + + h2o_timeout_link(loop, &pool->_interval_cb.timeout, &pool->_interval_cb.entry); +} + +static void call_connect_cb(h2o_socketpool_connect_request_t *req, const char *errstr) +{ + h2o_socketpool_connect_cb cb = req->cb; + h2o_socket_t *sock = req->sock; + void *data = req->data; + + free(req); + cb(sock, errstr, data); +} + +static void on_connect(h2o_socket_t *sock, const char *err) +{ + h2o_socketpool_connect_request_t *req = sock->data; + const char *errstr = NULL; + + assert(req->sock == sock); + + if (err != NULL) { + h2o_socket_close(sock); + req->sock = NULL; + errstr = "connection failed"; + } + call_connect_cb(req, errstr); +} + +static void on_close(void *data) +{ + h2o_socketpool_t *pool = data; + __sync_sub_and_fetch(&pool->_shared.count, 1); +} + +static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen) +{ + req->sock = h2o_socket_connect(req->loop, addr, addrlen, on_connect); + if (req->sock == NULL) { + __sync_sub_and_fetch(&req->pool->_shared.count, 1); + call_connect_cb(req, "failed to connect to host"); + return; + } + req->sock->data = req; + req->sock->on_close.cb = on_close; + req->sock->on_close.data = req->pool; +} + +static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req) +{ + h2o_socketpool_connect_request_t *req = _req; + + assert(getaddr_req == req->getaddr_req); + req->getaddr_req = NULL; + + if (errstr != NULL) { + __sync_sub_and_fetch(&req->pool->_shared.count, 1); + call_connect_cb(req, errstr); + return; + } + + struct addrinfo *selected = h2o_hostinfo_select_one(res); + start_connect(req, selected->ai_addr, selected->ai_addrlen); +} + +void h2o_socketpool_connect(h2o_socketpool_connect_request_t **_req, h2o_socketpool_t *pool, h2o_loop_t *loop, + h2o_multithread_receiver_t *getaddr_receiver, h2o_socketpool_connect_cb cb, void *data) +{ + struct pool_entry_t *entry = NULL; + + if (_req != NULL) + *_req = NULL; + + /* fetch an entry and return it */ + pthread_mutex_lock(&pool->_shared.mutex); + destroy_expired(pool); + while (1) { + if (h2o_linklist_is_empty(&pool->_shared.sockets)) + break; + entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + h2o_linklist_unlink(&entry->link); + pthread_mutex_unlock(&pool->_shared.mutex); + + /* test if the connection is still alive */ + char buf[1]; + ssize_t rret = recv(entry->sockinfo.fd, buf, 1, MSG_PEEK); + if (rret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* yes! return it */ + h2o_socket_t *sock = h2o_socket_import(loop, &entry->sockinfo); + free(entry); + sock->on_close.cb = on_close; + sock->on_close.data = pool; + cb(sock, NULL, data); + return; + } + + /* connection is dead, report, close, and retry */ + if (rret <= 0) { + static long counter = 0; + if (__sync_fetch_and_add(&counter, 1) == 0) + fprintf(stderr, "[WARN] detected close by upstream before the expected timeout (see issue #679)\n"); + } else { + static long counter = 0; + if (__sync_fetch_and_add(&counter, 1) == 0) + fprintf(stderr, "[WARN] unexpectedly received data to a pooled socket (see issue #679)\n"); + } + destroy_detached(entry); + pthread_mutex_lock(&pool->_shared.mutex); + } + pthread_mutex_unlock(&pool->_shared.mutex); + + /* FIXME repsect `capacity` */ + __sync_add_and_fetch(&pool->_shared.count, 1); + + /* prepare request object */ + h2o_socketpool_connect_request_t *req = h2o_mem_alloc(sizeof(*req)); + *req = (h2o_socketpool_connect_request_t){data, cb, pool, loop}; + if (_req != NULL) + *_req = req; + + switch (pool->type) { + case H2O_SOCKETPOOL_TYPE_NAMED: + /* resolve the name, and connect */ + req->getaddr_req = h2o_hostinfo_getaddr(getaddr_receiver, pool->peer.host, pool->peer.named_serv, AF_UNSPEC, SOCK_STREAM, + IPPROTO_TCP, AI_ADDRCONFIG | AI_NUMERICSERV, on_getaddr, req); + break; + case H2O_SOCKETPOOL_TYPE_SOCKADDR: + /* connect (using sockaddr_in) */ + start_connect(req, (void *)&pool->peer.sockaddr.bytes, pool->peer.sockaddr.len); + break; + } +} + +void h2o_socketpool_cancel_connect(h2o_socketpool_connect_request_t *req) +{ + if (req->getaddr_req != NULL) { + h2o_hostinfo_getaddr_cancel(req->getaddr_req); + req->getaddr_req = NULL; + } + if (req->sock != NULL) + h2o_socket_close(req->sock); + free(req); +} + +int h2o_socketpool_return(h2o_socketpool_t *pool, h2o_socket_t *sock) +{ + struct pool_entry_t *entry; + + /* reset the on_close callback */ + assert(sock->on_close.data == pool); + sock->on_close.cb = NULL; + sock->on_close.data = NULL; + + entry = h2o_mem_alloc(sizeof(*entry)); + if (h2o_socket_export(sock, &entry->sockinfo) != 0) { + free(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + return -1; + } + memset(&entry->link, 0, sizeof(entry->link)); + entry->added_at = h2o_now(h2o_socket_get_loop(sock)); + + pthread_mutex_lock(&pool->_shared.mutex); + destroy_expired(pool); + h2o_linklist_insert(&pool->_shared.sockets, &entry->link); + pthread_mutex_unlock(&pool->_shared.mutex); + + return 0; +} -- cgit v1.2.3