From be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 04:57:58 +0200 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- web/server/h2o/libh2o/lib/common/socket/evloop.c.h | 624 +++++++++++++++++++++ .../h2o/libh2o/lib/common/socket/evloop/epoll.c.h | 203 +++++++ .../h2o/libh2o/lib/common/socket/evloop/kqueue.c.h | 186 ++++++ .../h2o/libh2o/lib/common/socket/evloop/poll.c.h | 178 ++++++ .../h2o/libh2o/lib/common/socket/uv-binding.c.h | 283 ++++++++++ 5 files changed, 1474 insertions(+) create mode 100644 web/server/h2o/libh2o/lib/common/socket/evloop.c.h create mode 100644 web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h create mode 100644 web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h create mode 100644 web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h create mode 100644 web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h (limited to 'web/server/h2o/libh2o/lib/common/socket') diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop.c.h new file mode 100644 index 00000000..754ed23b --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop.c.h @@ -0,0 +1,624 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Fastly, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include +#include +#include +#include +#include +#include +#include +#include "cloexec.h" +#include "h2o/linklist.h" + +#if !defined(H2O_USE_ACCEPT4) +#ifdef __linux__ +#define H2O_USE_ACCEPT4 1 +#elif __FreeBSD__ >= 10 +#define H2O_USE_ACCEPT4 1 +#else +#define H2O_USE_ACCEPT4 0 +#endif +#endif + +struct st_h2o_evloop_socket_t { + h2o_socket_t super; + int fd; + int _flags; + h2o_evloop_t *loop; + struct { + size_t cnt; + h2o_iovec_t *bufs; + union { + h2o_iovec_t *alloced_ptr; + h2o_iovec_t smallbufs[4]; + }; + } _wreq; + struct st_h2o_evloop_socket_t *_next_pending; + struct st_h2o_evloop_socket_t *_next_statechanged; +}; + +static void link_to_pending(struct st_h2o_evloop_socket_t *sock); +static void write_pending(struct st_h2o_evloop_socket_t *sock); +static h2o_evloop_t *create_evloop(size_t sz); +static void update_now(h2o_evloop_t *loop); +static int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait); + +/* functions to be defined in the backends */ +static int evloop_do_proceed(h2o_evloop_t *loop, int32_t max_wait); +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock); +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock); +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock); + +#if H2O_USE_POLL || H2O_USE_EPOLL || H2O_USE_KQUEUE +/* explicitly specified */ +#else +#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) +#define H2O_USE_KQUEUE 1 +#elif defined(__linux) +#define H2O_USE_EPOLL 1 +#else +#define H2O_USE_POLL 1 +#endif +#endif + +#if H2O_USE_POLL +#include "evloop/poll.c.h" +#elif H2O_USE_EPOLL +#include "evloop/epoll.c.h" +#elif H2O_USE_KQUEUE +#include "evloop/kqueue.c.h" +#else +#error "poller not specified" +#endif + +void link_to_pending(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_next_pending == sock) { + struct st_h2o_evloop_socket_t **slot = (sock->_flags & H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION) != 0 + ? &sock->loop->_pending_as_server + : &sock->loop->_pending_as_client; + sock->_next_pending = *slot; + *slot = sock; + } +} + +static void link_to_statechanged(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_next_statechanged == sock) { + sock->_next_statechanged = NULL; + *sock->loop->_statechanged.tail_ref = sock; + sock->loop->_statechanged.tail_ref = &sock->_next_statechanged; + } +} + +static const char *on_read_core(int fd, h2o_buffer_t **input) +{ + int read_any = 0; + + while (1) { + ssize_t rret; + h2o_iovec_t buf = h2o_buffer_reserve(input, 4096); + if (buf.base == NULL) { + /* memory allocation failed */ + return h2o_socket_error_out_of_memory; + } + while ((rret = read(fd, buf.base, buf.len <= INT_MAX / 2 ? buf.len : INT_MAX / 2 + 1)) == -1 && errno == EINTR) + ; + if (rret == -1) { + if (errno == EAGAIN) + break; + else + return h2o_socket_error_io; + } else if (rret == 0) { + if (!read_any) + return h2o_socket_error_closed; /* TODO notify close */ + break; + } + (*input)->size += rret; + if (buf.len != rret) + break; + read_any = 1; + } + return NULL; +} + +static void wreq_free_buffer_if_allocated(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_wreq.smallbufs <= sock->_wreq.bufs && + sock->_wreq.bufs <= sock->_wreq.smallbufs + sizeof(sock->_wreq.smallbufs) / sizeof(sock->_wreq.smallbufs[0])) { + /* no need to free */ + } else { + free(sock->_wreq.alloced_ptr); + sock->_wreq.bufs = sock->_wreq.smallbufs; + } +} + +static int write_core(int fd, h2o_iovec_t **bufs, size_t *bufcnt) +{ + int iovcnt; + ssize_t wret; + + if (*bufcnt != 0) { + do { + /* write */ + iovcnt = IOV_MAX; + if (*bufcnt < iovcnt) + iovcnt = (int)*bufcnt; + while ((wret = writev(fd, (struct iovec *)*bufs, iovcnt)) == -1 && errno == EINTR) + ; + if (wret == -1) { + if (errno != EAGAIN) + return -1; + break; + } + /* adjust the buffer */ + while ((*bufs)->len < wret) { + wret -= (*bufs)->len; + ++*bufs; + --*bufcnt; + assert(*bufcnt != 0); + } + if (((*bufs)->len -= wret) == 0) { + ++*bufs; + --*bufcnt; + } else { + (*bufs)->base += wret; + } + } while (*bufcnt != 0 && iovcnt == IOV_MAX); + } + + return 0; +} + +void write_pending(struct st_h2o_evloop_socket_t *sock) +{ + assert(sock->super._cb.write != NULL); + + /* DONT_WRITE poll */ + if (sock->_wreq.cnt == 0) + goto Complete; + + /* write */ + if (write_core(sock->fd, &sock->_wreq.bufs, &sock->_wreq.cnt) == 0 && sock->_wreq.cnt != 0) { + /* partial write */ + return; + } + + /* either completed or failed */ + wreq_free_buffer_if_allocated(sock); + +Complete: + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + link_to_statechanged(sock); /* might need to disable the write polling */ +} + +static void read_on_ready(struct st_h2o_evloop_socket_t *sock) +{ + const char *err = 0; + size_t prev_bytes_read = sock->super.input->size; + + if ((sock->_flags & H2O_SOCKET_FLAG_DONT_READ) != 0) + goto Notify; + + if ((err = on_read_core(sock->fd, sock->super.ssl == NULL ? &sock->super.input : &sock->super.ssl->input.encrypted)) != NULL) + goto Notify; + + if (sock->super.ssl != NULL && sock->super.ssl->handshake.cb == NULL) + err = decode_ssl_input(&sock->super); + +Notify: + /* the application may get notified even if no new data is avaiable. The + * behavior is intentional; it is designed as such so that the applications + * can update their timeout counters when a partial SSL record arrives. + */ + sock->super.bytes_read = sock->super.input->size - prev_bytes_read; + sock->super._cb.read(&sock->super, err); +} + +void do_dispose_socket(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + evloop_do_on_socket_close(sock); + wreq_free_buffer_if_allocated(sock); + if (sock->fd != -1) { + close(sock->fd); + sock->fd = -1; + } + sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED; + link_to_statechanged(sock); +} + +void do_write(h2o_socket_t *_sock, h2o_iovec_t *_bufs, size_t bufcnt, h2o_socket_cb cb) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + h2o_iovec_t *bufs; + h2o_iovec_t *tofree = NULL; + + assert(sock->super._cb.write == NULL); + assert(sock->_wreq.cnt == 0); + sock->super._cb.write = cb; + + /* cap the number of buffers, since we're using alloca */ + if (bufcnt > 10000) + bufs = tofree = h2o_mem_alloc(sizeof(*bufs) * bufcnt); + else + bufs = alloca(sizeof(*bufs) * bufcnt); + + memcpy(bufs, _bufs, sizeof(*bufs) * bufcnt); + + /* try to write now */ + if (write_core(sock->fd, &bufs, &bufcnt) != 0) { + /* fill in _wreq.bufs with fake data to indicate error */ + sock->_wreq.bufs = sock->_wreq.smallbufs; + sock->_wreq.cnt = 1; + *sock->_wreq.bufs = h2o_iovec_init(H2O_STRLIT("deadbeef")); + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + goto Out; + } + if (bufcnt == 0) { + /* write complete, schedule the callback */ + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + goto Out; + } + + + /* setup the buffer to send pending data */ + if (bufcnt <= sizeof(sock->_wreq.smallbufs) / sizeof(sock->_wreq.smallbufs[0])) { + sock->_wreq.bufs = sock->_wreq.smallbufs; + } else { + sock->_wreq.bufs = h2o_mem_alloc(sizeof(h2o_iovec_t) * bufcnt); + sock->_wreq.alloced_ptr = sock->_wreq.bufs; + } + memcpy(sock->_wreq.bufs, bufs, sizeof(h2o_iovec_t) * bufcnt); + sock->_wreq.cnt = bufcnt; + + /* schedule the write */ + link_to_statechanged(sock); +Out: + free(tofree); +} + +int h2o_socket_get_fd(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + return sock->fd; +} + +void do_read_start(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + link_to_statechanged(sock); +} + +void do_read_stop(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY; + link_to_statechanged(sock); +} + +void h2o_socket_dont_read(h2o_socket_t *_sock, int dont_read) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + if (dont_read) { + sock->_flags |= H2O_SOCKET_FLAG_DONT_READ; + } else { + sock->_flags &= ~H2O_SOCKET_FLAG_DONT_READ; + } +} + +int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + + assert((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) == 0); + evloop_do_on_socket_export(sock); + sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED; + + info->fd = sock->fd; + sock->fd = -1; + + return 0; +} + +h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info) +{ + return h2o_evloop_socket_create(loop, info->fd, 0); +} + +h2o_loop_t *h2o_socket_get_loop(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + return sock->loop; +} + +socklen_t h2o_socket_getsockname(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + socklen_t len = sizeof(struct sockaddr_storage); + if (getsockname(sock->fd, sa, &len) != 0) + return 0; + return len; +} + +socklen_t get_peername_uncached(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + socklen_t len = sizeof(struct sockaddr_storage); + if (getpeername(sock->fd, sa, &len) != 0) + return 0; + return len; +} + +static struct st_h2o_evloop_socket_t *create_socket(h2o_evloop_t *loop, int fd, int flags) +{ + struct st_h2o_evloop_socket_t *sock; + + fcntl(fd, F_SETFL, O_NONBLOCK); + + sock = h2o_mem_alloc(sizeof(*sock)); + memset(sock, 0, sizeof(*sock)); + h2o_buffer_init(&sock->super.input, &h2o_socket_buffer_prototype); + sock->loop = loop; + sock->fd = fd; + sock->_flags = flags; + sock->_wreq.bufs = sock->_wreq.smallbufs; + sock->_next_pending = sock; + sock->_next_statechanged = sock; + + evloop_do_on_socket_create(sock); + + return sock; +} + +static struct st_h2o_evloop_socket_t *create_socket_set_nodelay(h2o_evloop_t *loop, int fd, int flags) +{ + int on = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); + return create_socket(loop, fd, flags); +} + +h2o_socket_t *h2o_evloop_socket_create(h2o_evloop_t *loop, int fd, int flags) +{ + fcntl(fd, F_SETFL, O_NONBLOCK); + return &create_socket(loop, fd, flags)->super; +} + +h2o_socket_t *h2o_evloop_socket_accept(h2o_socket_t *_listener) +{ + struct st_h2o_evloop_socket_t *listener = (struct st_h2o_evloop_socket_t *)_listener; + int fd; + +#if H2O_USE_ACCEPT4 + if ((fd = accept4(listener->fd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)) == -1) + return NULL; +#else + if ((fd = cloexec_accept(listener->fd, NULL, NULL)) == -1) + return NULL; + fcntl(fd, F_SETFL, O_NONBLOCK); +#endif + + return &create_socket_set_nodelay(listener->loop, fd, H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION)->super; +} + +h2o_socket_t *h2o_socket_connect(h2o_loop_t *loop, struct sockaddr *addr, socklen_t addrlen, h2o_socket_cb cb) +{ + int fd; + struct st_h2o_evloop_socket_t *sock; + + if ((fd = cloexec_socket(addr->sa_family, SOCK_STREAM, 0)) == -1) + return NULL; + fcntl(fd, F_SETFL, O_NONBLOCK); + if (!(connect(fd, addr, addrlen) == 0 || errno == EINPROGRESS)) { + close(fd); + return NULL; + } + + sock = create_socket_set_nodelay(loop, fd, H2O_SOCKET_FLAG_IS_CONNECTING); + h2o_socket_notify_write(&sock->super, cb); + return &sock->super; +} + +h2o_evloop_t *create_evloop(size_t sz) +{ + h2o_evloop_t *loop = h2o_mem_alloc(sz); + + memset(loop, 0, sz); + loop->_statechanged.tail_ref = &loop->_statechanged.head; + h2o_linklist_init_anchor(&loop->_timeouts); + + update_now(loop); + + return loop; +} + +void update_now(h2o_evloop_t *loop) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + loop->_now = (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; +} + +int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait) +{ + uint64_t wake_at = h2o_timeout_get_wake_at(&loop->_timeouts); + + update_now(loop); + + if (wake_at <= loop->_now) { + max_wait = 0; + } else { + uint64_t delta = wake_at - loop->_now; + if (delta < max_wait) + max_wait = (int32_t)delta; + } + + return max_wait; +} + +void h2o_socket_notify_write(h2o_socket_t *_sock, h2o_socket_cb cb) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + assert(sock->super._cb.write == NULL); + assert(sock->_wreq.cnt == 0); + + sock->super._cb.write = cb; + link_to_statechanged(sock); +} + +static void run_socket(struct st_h2o_evloop_socket_t *sock) +{ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + /* is freed in updatestates phase */ + return; + } + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_READ_READY) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY; + read_on_ready(sock); + } + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_WRITE_NOTIFY) != 0) { + const char *err = NULL; + assert(sock->super._cb.write != NULL); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + if (sock->_wreq.cnt != 0) { + /* error */ + err = h2o_socket_error_io; + sock->_wreq.cnt = 0; + } else if ((sock->_flags & H2O_SOCKET_FLAG_IS_CONNECTING) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_CONNECTING; + int so_err = 0; + socklen_t l = sizeof(so_err); + so_err = 0; + if (getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &so_err, &l) != 0 || so_err != 0) { + /* FIXME lookup the error table */ + err = h2o_socket_error_conn_fail; + } + } + on_write_complete(&sock->super, err); + } +} + +static void run_pending(h2o_evloop_t *loop) +{ + struct st_h2o_evloop_socket_t *sock; + + while (loop->_pending_as_server != NULL || loop->_pending_as_client != NULL) { + while ((sock = loop->_pending_as_client) != NULL) { + loop->_pending_as_client = sock->_next_pending; + sock->_next_pending = sock; + run_socket(sock); + } + if ((sock = loop->_pending_as_server) != NULL) { + loop->_pending_as_server = sock->_next_pending; + sock->_next_pending = sock; + run_socket(sock); + } + } +} + +void h2o_evloop_destroy(h2o_evloop_t *loop) +{ + struct st_h2o_evloop_socket_t *sock; + + /* timeouts are governed by the application and MUST be destroyed prior to destroying the loop */ + assert(h2o_linklist_is_empty(&loop->_timeouts)); + + /* dispose all socket */ + while ((sock = loop->_pending_as_client) != NULL) { + loop->_pending_as_client = sock->_next_pending; + sock->_next_pending = sock; + h2o_socket_close((h2o_socket_t *)sock); + } + while ((sock = loop->_pending_as_server) != NULL) { + loop->_pending_as_server = sock->_next_pending; + sock->_next_pending = sock; + h2o_socket_close((h2o_socket_t *)sock); + } + + /* now all socket are disposedand and placed in linked list statechanged + * we can freeing memory in cycle by next_statechanged, + */ + while ((sock = loop->_statechanged.head) != NULL) { + loop->_statechanged.head = sock->_next_statechanged; + free(sock); + } + + /* lastly we need to free loop memory */ + free(loop); +} + +int h2o_evloop_run(h2o_evloop_t *loop, int32_t max_wait) +{ + h2o_linklist_t *node; + + /* update socket states, poll, set readable flags, perform pending writes */ + if (evloop_do_proceed(loop, max_wait) != 0) + return -1; + + /* run the pending callbacks */ + run_pending(loop); + + /* run the timeouts */ + for (node = loop->_timeouts.next; node != &loop->_timeouts; node = node->next) { + h2o_timeout_t *timeout = H2O_STRUCT_FROM_MEMBER(h2o_timeout_t, _link, node); + h2o_timeout_run(loop, timeout, loop->_now); + } + /* assert h2o_timeout_run has called run_pending */ + assert(loop->_pending_as_client == NULL); + assert(loop->_pending_as_server == NULL); + + if (h2o_sliding_counter_is_running(&loop->exec_time_counter)) { + update_now(loop); + h2o_sliding_counter_stop(&loop->exec_time_counter, loop->_now); + } + + return 0; +} + +void h2o_timeout__do_init(h2o_evloop_t *loop, h2o_timeout_t *timeout) +{ + h2o_linklist_insert(&loop->_timeouts, &timeout->_link); +} + +void h2o_timeout__do_dispose(h2o_evloop_t *loop, h2o_timeout_t *timeout) +{ + h2o_linklist_unlink(&timeout->_link); +} + +void h2o_timeout__do_link(h2o_evloop_t *loop, h2o_timeout_t *timeout, h2o_timeout_entry_t *entry) +{ + /* nothing to do */ +} + +void h2o_timeout__do_post_callback(h2o_evloop_t *loop) +{ + run_pending(loop); +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h new file mode 100644 index 00000000..247dac89 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include +#include +#include +#include + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_evloop_epoll_t { + h2o_evloop_t super; + int ep; +}; + +static int update_status(struct st_h2o_evloop_epoll_t *loop) +{ + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + free(sock); + } else { + int changed = 0, op, ret; + struct epoll_event ev; + ev.events = 0; + if (h2o_socket_is_reading(&sock->super)) { + ev.events |= EPOLLIN; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + changed = 1; + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + changed = 1; + } + } + if (h2o_socket_is_writing(&sock->super)) { + ev.events |= EPOLLOUT; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + changed = 1; + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + changed = 1; + } + } + if (changed) { + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) != 0) { + if (ev.events != 0) + op = EPOLL_CTL_MOD; + else + op = EPOLL_CTL_DEL; + } else { + assert(ev.events != 0); + op = EPOLL_CTL_ADD; + } + ev.data.ptr = sock; + while ((ret = epoll_ctl(loop->ep, op, sock->fd, &ev)) != 0 && errno == EINTR) + ; + if (ret != 0) + return -1; + if (op == EPOLL_CTL_DEL) + sock->_flags &= ~H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED; + else + sock->_flags |= H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED; + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; + + return 0; +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop; + struct epoll_event events[256]; + int nevents, i; + + /* collect (and update) status */ + if (update_status(loop) != 0) + return -1; + + /* poll */ + max_wait = adjust_max_wait(&loop->super, max_wait); + nevents = epoll_wait(loop->ep, events, sizeof(events) / sizeof(events[0]), max_wait); + update_now(&loop->super); + if (nevents == -1) + return -1; + + if (nevents != 0) + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + + /* update readable flags, perform writes */ + for (i = 0; i != nevents; ++i) { + struct st_h2o_evloop_socket_t *sock = events[i].data.ptr; + int notified = 0; + if ((events[i].events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + notified = 1; + } + } + if ((events[i].events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + write_pending(sock); + notified = 1; + } + } + if (!notified) { + static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + static time_t last_reported = 0; + time_t now = time(NULL); + pthread_mutex_lock(&lock); + if (last_reported + 60 < now) { + last_reported = now; + fprintf(stderr, "ignoring epoll event (fd:%d,event:%x)\n", sock->fd, (int)events[i].events); + } + pthread_mutex_unlock(&lock); + } + } + + return 0; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop; + int ret; + + if (sock->fd == -1) + return; + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) + return; + while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR) + ; + if (ret != 0) + fprintf(stderr, "socket_close: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop; + int ret; + + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) + return; + while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR) + ; + if (ret != 0) + fprintf(stderr, "socket_export: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)create_evloop(sizeof(*loop)); + + pthread_mutex_lock(&cloexec_mutex); + loop->ep = epoll_create(10); + while (fcntl(loop->ep, F_SETFD, FD_CLOEXEC) == -1) { + if (errno != EAGAIN) { + fprintf(stderr, "h2o_evloop_create: failed to set FD_CLOEXEC to the epoll fd (errno=%d)\n", errno); + abort(); + } + } + pthread_mutex_unlock(&cloexec_mutex); + + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h new file mode 100644 index 00000000..21288ed7 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include +#include +#include +#include +#include + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_socket_loop_kqueue_t { + h2o_evloop_t super; + int kq; +}; + +static void ev_set(struct kevent *ev, int fd, int filter, int flags, struct st_h2o_evloop_socket_t *sock) +{ +#ifdef __NetBSD__ + EV_SET(ev, fd, filter, flags, 0, 0, (intptr_t)sock); +#else + EV_SET(ev, fd, filter, flags, 0, 0, sock); +#endif +} + +static int collect_status(struct st_h2o_socket_loop_kqueue_t *loop, struct kevent *changelist, int changelist_capacity) +{ + int change_index = 0; + +#define SET_AND_UPDATE(filter, flags) \ + do { \ + ev_set(changelist + change_index++, sock->fd, filter, flags, sock); \ + if (change_index == changelist_capacity) { \ + int ret; \ + while ((ret = kevent(loop->kq, changelist, change_index, NULL, 0, NULL)) != 0 && errno == EINTR) \ + ; \ + if (ret == -1) \ + return -1; \ + change_index = 0; \ + } \ + } while (0) + + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + free(sock); + } else { + if (h2o_socket_is_reading(&sock->super)) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + SET_AND_UPDATE(EVFILT_READ, EV_ADD); + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + SET_AND_UPDATE(EVFILT_READ, EV_DELETE); + } + } + if (h2o_socket_is_writing(&sock->super)) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + SET_AND_UPDATE(EVFILT_WRITE, EV_ADD); + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + SET_AND_UPDATE(EVFILT_WRITE, EV_DELETE); + } + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; + + return change_index; + +#undef SET_AND_UPDATE +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)_loop; + struct kevent changelist[64], events[128]; + int nchanges, nevents, i; + struct timespec ts; + + /* collect (and update) status */ + if ((nchanges = collect_status(loop, changelist, sizeof(changelist) / sizeof(changelist[0]))) == -1) + return -1; + + /* poll */ + max_wait = adjust_max_wait(&loop->super, max_wait); + ts.tv_sec = max_wait / 1000; + ts.tv_nsec = max_wait % 1000 * 1000 * 1000; + nevents = kevent(loop->kq, changelist, nchanges, events, sizeof(events) / sizeof(events[0]), &ts); + + update_now(&loop->super); + if (nevents == -1) + return -1; + + if (nevents != 0) + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + + /* update readable flags, perform writes */ + for (i = 0; i != nevents; ++i) { + struct st_h2o_evloop_socket_t *sock = (void *)events[i].udata; + assert(sock->fd == events[i].ident); + switch (events[i].filter) { + case EVFILT_READ: + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + } + break; + case EVFILT_WRITE: + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + write_pending(sock); + } + break; + default: + break; /* ??? */ + } + } + + return 0; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (void *)sock->loop; + struct kevent changelist[2]; + int change_index = 0, ret; + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) + ev_set(changelist + change_index++, sock->fd, EVFILT_READ, EV_DELETE, 0); + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) + ev_set(changelist + change_index++, sock->fd, EVFILT_WRITE, EV_DELETE, 0); + if (change_index == 0) + return; + while ((ret = kevent(loop->kq, changelist, change_index, NULL, 0, NULL)) != 0 && errno == EINTR) + ; + if (ret == -1) + fprintf(stderr, "kevent returned error %d (fd=%d)", errno, sock->fd); +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)create_evloop(sizeof(*loop)); + + loop->kq = kqueue(); + + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h new file mode 100644 index 00000000..8b3f3d14 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2014,2015 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include +#include + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_evloop_poll_t { + h2o_evloop_t super; + H2O_VECTOR(struct st_h2o_evloop_socket_t *) socks; +}; + +static void update_socks(struct st_h2o_evloop_poll_t *loop) +{ + /* update loop->socks */ + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + assert(sock->fd == -1); + free(sock); + } else { + assert(sock->fd < loop->socks.size); + if (loop->socks.entries[sock->fd] == NULL) { + loop->socks.entries[sock->fd] = sock; + } else { + assert(loop->socks.entries[sock->fd] == sock); + } + if (h2o_socket_is_reading(&sock->super)) { + DEBUG_LOG("setting READ for fd: %d\n", sock->fd); + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + } else { + DEBUG_LOG("clearing READ for fd: %d\n", sock->fd); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + } + if (h2o_socket_is_writing(&sock->super)) { + DEBUG_LOG("setting WRITE for fd: %d\n", sock->fd); + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + } else { + DEBUG_LOG("clearing WRITE for fd: %d\n", sock->fd); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)_loop; + H2O_VECTOR(struct pollfd) pollfds = {NULL}; + int fd, ret; + + /* update status */ + update_socks(loop); + + /* build list of fds to be polled */ + for (fd = 0; fd != loop->socks.size; ++fd) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[fd]; + if (sock == NULL) + continue; + assert(fd == sock->fd); + if ((sock->_flags & (H2O_SOCKET_FLAG_IS_POLLED_FOR_READ | H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE)) != 0) { + h2o_vector_reserve(NULL, &pollfds, pollfds.size + 1); + struct pollfd *slot = pollfds.entries + pollfds.size++; + slot->fd = fd; + slot->events = 0; + slot->revents = 0; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) + slot->events |= POLLIN; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) + slot->events |= POLLOUT; + } + } + + /* call */ + max_wait = adjust_max_wait(&loop->super, max_wait); + ret = poll(pollfds.entries, (nfds_t)pollfds.size, max_wait); + update_now(&loop->super); + if (ret == -1) + goto Exit; + DEBUG_LOG("poll returned: %d\n", ret); + + /* update readable flags, perform writes */ + if (ret > 0) { + size_t i; + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + for (i = 0; i != pollfds.size; ++i) { + /* set read_ready flag before calling the write cb, since app. code invoked by the latter may close the socket, clearing + * the former flag */ + if ((pollfds.entries[i].revents & POLLIN) != 0) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[pollfds.entries[i].fd]; + assert(sock != NULL); + assert(sock->fd == pollfds.entries[i].fd); + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + DEBUG_LOG("added fd %d as read_ready\n", sock->fd); + } + } + if ((pollfds.entries[i].revents & POLLOUT) != 0) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[pollfds.entries[i].fd]; + assert(sock != NULL); + assert(sock->fd == pollfds.entries[i].fd); + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + DEBUG_LOG("handling pending writes on fd %d\n", fd); + write_pending(sock); + } + } + } + ret = 0; + } + +Exit: + free(pollfds.entries); + return ret; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + + if (sock->fd >= loop->socks.size) { + h2o_vector_reserve(NULL, &loop->socks, sock->fd + 1); + memset(loop->socks.entries + loop->socks.size, 0, (sock->fd + 1 - loop->socks.size) * sizeof(loop->socks.entries[0])); + loop->socks.size = sock->fd + 1; + } + + if (loop->socks.entries[sock->fd] != NULL) + assert(loop->socks.entries[sock->fd]->_flags == H2O_SOCKET_FLAG_IS_DISPOSED); +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + + if (sock->fd != -1) + loop->socks.entries[sock->fd] = NULL; +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + evloop_do_on_socket_close(sock); + loop->socks.entries[sock->fd] = NULL; +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)create_evloop(sizeof(*loop)); + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h b/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h new file mode 100644 index 00000000..44c71c16 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Fastly, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +struct st_h2o_uv_socket_t { + h2o_socket_t super; + struct { + uv_stream_t *stream; + uv_close_cb close_cb; + } uv; + union { + uv_connect_t _creq; + uv_write_t _wreq; + }; +}; + +static void schedule_timer(h2o_timeout_t *timeout); + +static void alloc_inbuf_tcp(uv_handle_t *handle, size_t suggested_size, uv_buf_t *_buf) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + + h2o_iovec_t buf = h2o_buffer_reserve(&sock->super.input, 4096); + memcpy(_buf, &buf, sizeof(buf)); +} + +static void alloc_inbuf_ssl(uv_handle_t *handle, size_t suggested_size, uv_buf_t *_buf) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + + h2o_iovec_t buf = h2o_buffer_reserve(&sock->super.ssl->input.encrypted, 4096); + memcpy(_buf, &buf, sizeof(buf)); +} + +static void on_read_tcp(uv_stream_t *stream, ssize_t nread, const uv_buf_t *_unused) +{ + struct st_h2o_uv_socket_t *sock = stream->data; + + if (nread < 0) { + sock->super.bytes_read = 0; + sock->super._cb.read(&sock->super, h2o_socket_error_closed); + return; + } + + sock->super.input->size += nread; + sock->super.bytes_read = nread; + sock->super._cb.read(&sock->super, NULL); +} + +static void on_read_ssl(uv_stream_t *stream, ssize_t nread, const uv_buf_t *_unused) +{ + struct st_h2o_uv_socket_t *sock = stream->data; + size_t prev_bytes_read = sock->super.input->size; + const char *err = h2o_socket_error_io; + + if (nread > 0) { + sock->super.ssl->input.encrypted->size += nread; + if (sock->super.ssl->handshake.cb == NULL) + err = decode_ssl_input(&sock->super); + else + err = NULL; + } + sock->super.bytes_read = sock->super.input->size - prev_bytes_read; + sock->super._cb.read(&sock->super, err); +} + +static void on_do_write_complete(uv_write_t *wreq, int status) +{ + struct st_h2o_uv_socket_t *sock = H2O_STRUCT_FROM_MEMBER(struct st_h2o_uv_socket_t, _wreq, wreq); + if (sock->super._cb.write != NULL) + on_write_complete(&sock->super, status == 0 ? NULL : h2o_socket_error_io); +} + +static void free_sock(uv_handle_t *handle) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + uv_close_cb cb = sock->uv.close_cb; + free(sock); + cb(handle); +} + +void do_dispose_socket(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + sock->super._cb.write = NULL; /* avoid the write callback getting called when closing the socket (#1249) */ + uv_close((uv_handle_t *)sock->uv.stream, free_sock); +} + +int h2o_socket_get_fd(h2o_socket_t *_sock) +{ + int fd, ret; + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + ret = uv_fileno((uv_handle_t *)sock->uv.stream, (uv_os_fd_t *)&fd); + if (ret) + return -1; + + return fd; +} + +void do_read_start(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + if (sock->super.ssl == NULL) + uv_read_start(sock->uv.stream, alloc_inbuf_tcp, on_read_tcp); + else + uv_read_start(sock->uv.stream, alloc_inbuf_ssl, on_read_ssl); +} + +void do_read_stop(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + uv_read_stop(sock->uv.stream); +} + +void do_write(h2o_socket_t *_sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + assert(sock->super._cb.write == NULL); + sock->super._cb.write = cb; + + uv_write(&sock->_wreq, sock->uv.stream, (uv_buf_t *)bufs, (int)bufcnt, on_do_write_complete); +} + +static struct st_h2o_uv_socket_t *create_socket(h2o_loop_t *loop) +{ + uv_tcp_t *tcp = h2o_mem_alloc(sizeof(*tcp)); + + if (uv_tcp_init(loop, tcp) != 0) { + free(tcp); + return NULL; + } + return (void *)h2o_uv_socket_create((void *)tcp, (uv_close_cb)free); +} + +int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + uv_os_fd_t fd; + + if (uv_fileno((uv_handle_t *)sock->uv.stream, &fd) != 0) + return -1; + /* FIXME: consider how to overcome the epoll(2) problem; man says, + * "even after a file descriptor that is part of an epoll set has been closed, + * events may be reported for that file descriptor if other file descriptors + * referring to the same underlying file description remain open" + */ + if ((info->fd = dup(fd)) == -1) + return -1; + return 0; +} + +h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info) +{ + struct st_h2o_uv_socket_t *sock = create_socket(loop); + + if (sock == NULL) + return NULL; + if (uv_tcp_open((uv_tcp_t *)sock->uv.stream, info->fd) != 0) { + h2o_socket_close(&sock->super); + return NULL; + } + + return &sock->super; +} + +h2o_socket_t *h2o_uv_socket_create(uv_stream_t *stream, uv_close_cb close_cb) +{ + struct st_h2o_uv_socket_t *sock = h2o_mem_alloc(sizeof(*sock)); + + memset(sock, 0, sizeof(*sock)); + h2o_buffer_init(&sock->super.input, &h2o_socket_buffer_prototype); + sock->uv.stream = stream; + sock->uv.close_cb = close_cb; + stream->data = sock; + return &sock->super; +} + +static void on_connect(uv_connect_t *conn, int status) +{ + if (status == UV_ECANCELED) + return; + struct st_h2o_uv_socket_t *sock = H2O_STRUCT_FROM_MEMBER(struct st_h2o_uv_socket_t, _creq, conn); + h2o_socket_cb cb = sock->super._cb.write; + sock->super._cb.write = NULL; + cb(&sock->super, status == 0 ? NULL : h2o_socket_error_conn_fail); +} + +h2o_loop_t *h2o_socket_get_loop(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + return sock->uv.stream->loop; +} + +h2o_socket_t *h2o_socket_connect(h2o_loop_t *loop, struct sockaddr *addr, socklen_t addrlen, h2o_socket_cb cb) +{ + struct st_h2o_uv_socket_t *sock = create_socket(loop); + + if (sock == NULL) + return NULL; + if (uv_tcp_connect(&sock->_creq, (void *)sock->uv.stream, addr, on_connect) != 0) { + h2o_socket_close(&sock->super); + return NULL; + } + sock->super._cb.write = cb; + return &sock->super; +} + +socklen_t h2o_socket_getsockname(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + int len = sizeof(struct sockaddr_storage); + if (uv_tcp_getsockname((void *)sock->uv.stream, sa, &len) != 0) + return 0; + return (socklen_t)len; +} + +socklen_t get_peername_uncached(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + int len = sizeof(struct sockaddr_storage); + if (uv_tcp_getpeername((void *)sock->uv.stream, sa, &len) != 0) + return 0; + return (socklen_t)len; +} + +static void on_timeout(uv_timer_t *timer) +{ + h2o_timeout_t *timeout = H2O_STRUCT_FROM_MEMBER(h2o_timeout_t, _backend.timer, timer); + + h2o_timeout_run(timer->loop, timeout, h2o_now(timer->loop)); + if (!h2o_linklist_is_empty(&timeout->_entries)) + schedule_timer(timeout); +} + +void schedule_timer(h2o_timeout_t *timeout) +{ + h2o_timeout_entry_t *entry = H2O_STRUCT_FROM_MEMBER(h2o_timeout_entry_t, _link, timeout->_entries.next); + uv_timer_start(&timeout->_backend.timer, on_timeout, + entry->registered_at + timeout->timeout - h2o_now(timeout->_backend.timer.loop), 0); +} + +void h2o_timeout__do_init(h2o_loop_t *loop, h2o_timeout_t *timeout) +{ + uv_timer_init(loop, &timeout->_backend.timer); +} + +void h2o_timeout__do_dispose(h2o_loop_t *loop, h2o_timeout_t *timeout) +{ + uv_close((uv_handle_t *)&timeout->_backend.timer, NULL); +} + +void h2o_timeout__do_link(h2o_loop_t *loop, h2o_timeout_t *timeout, h2o_timeout_entry_t *entry) +{ + /* register the timer if the entry just being added is the only entry */ + if (timeout->_entries.next == &entry->_link) + schedule_timer(timeout); +} + +void h2o_timeout__do_post_callback(h2o_loop_t *loop) +{ + /* nothing to do */ +} -- cgit v1.2.3