diff options
Diffstat (limited to '')
-rw-r--r-- | web/server/h2o/libh2o/lib/common/socket.c | 1433 | ||||
-rw-r--r-- | web/server/h2o/libh2o/lib/common/socket/evloop.c.h | 624 | ||||
-rw-r--r-- | web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h | 203 | ||||
-rw-r--r-- | web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h | 186 | ||||
-rw-r--r-- | web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h | 178 | ||||
-rw-r--r-- | web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h | 283 | ||||
-rw-r--r-- | web/server/h2o/libh2o/lib/common/socketpool.c | 342 |
7 files changed, 3249 insertions, 0 deletions
diff --git a/web/server/h2o/libh2o/lib/common/socket.c b/web/server/h2o/libh2o/lib/common/socket.c new file mode 100644 index 00000000..5b1c37e0 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket.c @@ -0,0 +1,1433 @@ +/* + * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku, Justin Zhu + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <errno.h> +#include <fcntl.h> +#include <inttypes.h> +#include <limits.h> +#include <netdb.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <string.h> +#include <sys/un.h> +#include <unistd.h> +#include <openssl/err.h> +#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) +#include <sys/ioctl.h> +#endif +#if H2O_USE_PICOTLS +#include "picotls.h" +#endif +#include "h2o/socket.h" +#include "h2o/timeout.h" + +#if defined(__APPLE__) && defined(__clang__) +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif + +#ifndef IOV_MAX +#define IOV_MAX UIO_MAXIOV +#endif + +/* kernel-headers bundled with Ubuntu 14.04 does not have the constant defined in netinet/tcp.h */ +#if defined(__linux__) && !defined(TCP_NOTSENT_LOWAT) +#define TCP_NOTSENT_LOWAT 25 +#endif + +#define OPENSSL_HOSTNAME_VALIDATION_LINKAGE static +#include "../../deps/ssl-conservatory/openssl/openssl_hostname_validation.c" + +struct st_h2o_socket_ssl_t { + SSL_CTX *ssl_ctx; + SSL *ossl; +#if H2O_USE_PICOTLS + ptls_t *ptls; +#endif + int *did_write_in_read; /* used for detecting and closing the connection upon renegotiation (FIXME implement renegotiation) */ + size_t record_overhead; + struct { + h2o_socket_cb cb; + union { + struct { + struct { + enum { + ASYNC_RESUMPTION_STATE_COMPLETE = 0, /* just pass thru */ + ASYNC_RESUMPTION_STATE_RECORD, /* record first input, restore SSL state if it changes to REQUEST_SENT + */ + ASYNC_RESUMPTION_STATE_REQUEST_SENT /* async request has been sent, and is waiting for response */ + } state; + SSL_SESSION *session_data; + } async_resumption; + } server; + struct { + char *server_name; + h2o_cache_t *session_cache; + h2o_iovec_t session_cache_key; + h2o_cache_hashcode_t session_cache_key_hash; + } client; + }; + } handshake; + struct { + h2o_buffer_t *encrypted; + } input; + struct { + H2O_VECTOR(h2o_iovec_t) bufs; + h2o_mem_pool_t pool; /* placed at the last */ + } output; +}; + +struct st_h2o_ssl_context_t { + SSL_CTX *ctx; + const h2o_iovec_t *protocols; + h2o_iovec_t _npn_list_of_protocols; +}; + +/* backend functions */ +static void do_dispose_socket(h2o_socket_t *sock); +static void do_write(h2o_socket_t *sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb); +static void do_read_start(h2o_socket_t *sock); +static void do_read_stop(h2o_socket_t *sock); +static int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info); +static h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info); +static socklen_t get_peername_uncached(h2o_socket_t *sock, struct sockaddr *sa); + +/* internal functions called from the backend */ +static const char *decode_ssl_input(h2o_socket_t *sock); +static void on_write_complete(h2o_socket_t *sock, const char *err); + +#if H2O_USE_LIBUV +#include "socket/uv-binding.c.h" +#else +#include "socket/evloop.c.h" +#endif + +h2o_buffer_mmap_settings_t h2o_socket_buffer_mmap_settings = { + 32 * 1024 * 1024, /* 32MB, should better be greater than max frame size of HTTP2 for performance reasons */ + "/tmp/h2o.b.XXXXXX"}; + +__thread h2o_buffer_prototype_t h2o_socket_buffer_prototype = { + {16}, /* keep 16 recently used chunks */ + {H2O_SOCKET_INITIAL_INPUT_BUFFER_SIZE * 2}, /* minimum initial capacity */ + &h2o_socket_buffer_mmap_settings}; + +const char *h2o_socket_error_out_of_memory = "out of memory"; +const char *h2o_socket_error_io = "I/O error"; +const char *h2o_socket_error_closed = "socket closed by peer"; +const char *h2o_socket_error_conn_fail = "connection failure"; +const char *h2o_socket_error_ssl_no_cert = "no certificate"; +const char *h2o_socket_error_ssl_cert_invalid = "invalid certificate"; +const char *h2o_socket_error_ssl_cert_name_mismatch = "certificate name mismatch"; +const char *h2o_socket_error_ssl_decode = "SSL decode error"; + +static void (*resumption_get_async)(h2o_socket_t *sock, h2o_iovec_t session_id); +static void (*resumption_new)(h2o_iovec_t session_id, h2o_iovec_t session_data); + +static int read_bio(BIO *b, char *out, int len) +{ + h2o_socket_t *sock = BIO_get_data(b); + + if (len == 0) + return 0; + + if (sock->ssl->input.encrypted->size == 0) { + BIO_set_retry_read(b); + return -1; + } + + if (sock->ssl->input.encrypted->size < len) { + len = (int)sock->ssl->input.encrypted->size; + } + memcpy(out, sock->ssl->input.encrypted->bytes, len); + h2o_buffer_consume(&sock->ssl->input.encrypted, len); + + return len; +} + +static void write_ssl_bytes(h2o_socket_t *sock, const void *in, size_t len) +{ + if (len != 0) { + void *bytes_alloced = h2o_mem_alloc_pool(&sock->ssl->output.pool, len); + memcpy(bytes_alloced, in, len); + h2o_vector_reserve(&sock->ssl->output.pool, &sock->ssl->output.bufs, sock->ssl->output.bufs.size + 1); + sock->ssl->output.bufs.entries[sock->ssl->output.bufs.size++] = h2o_iovec_init(bytes_alloced, len); + } +} + +static int write_bio(BIO *b, const char *in, int len) +{ + h2o_socket_t *sock = BIO_get_data(b); + + /* FIXME no support for SSL renegotiation (yet) */ + if (sock->ssl->did_write_in_read != NULL) { + *sock->ssl->did_write_in_read = 1; + return -1; + } + + write_ssl_bytes(sock, in, len); + return len; +} + +static int puts_bio(BIO *b, const char *str) +{ + return write_bio(b, str, (int)strlen(str)); +} + +static long ctrl_bio(BIO *b, int cmd, long num, void *ptr) +{ + switch (cmd) { + case BIO_CTRL_GET_CLOSE: + return BIO_get_shutdown(b); + case BIO_CTRL_SET_CLOSE: + BIO_set_shutdown(b, (int)num); + return 1; + case BIO_CTRL_FLUSH: + return 1; + default: + return 0; + } +} + +static void setup_bio(h2o_socket_t *sock) +{ + static BIO_METHOD *bio_methods = NULL; + if (bio_methods == NULL) { + static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&init_lock); + if (bio_methods == NULL) { + BIO_METHOD *biom = BIO_meth_new(BIO_TYPE_FD, "h2o_socket"); + BIO_meth_set_write(biom, write_bio); + BIO_meth_set_read(biom, read_bio); + BIO_meth_set_puts(biom, puts_bio); + BIO_meth_set_ctrl(biom, ctrl_bio); + __sync_synchronize(); + bio_methods = biom; + } + pthread_mutex_unlock(&init_lock); + } + + BIO *bio = BIO_new(bio_methods); + if (bio == NULL) + h2o_fatal("no memory"); + BIO_set_data(bio, sock); + BIO_set_init(bio, 1); + SSL_set_bio(sock->ssl->ossl, bio, bio); +} + +const char *decode_ssl_input(h2o_socket_t *sock) +{ + assert(sock->ssl != NULL); + assert(sock->ssl->handshake.cb == NULL); + +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + if (sock->ssl->input.encrypted->size != 0) { + const char *src = sock->ssl->input.encrypted->bytes, *src_end = src + sock->ssl->input.encrypted->size; + h2o_iovec_t reserved; + ptls_buffer_t rbuf; + int ret; + if ((reserved = h2o_buffer_reserve(&sock->input, sock->ssl->input.encrypted->size)).base == NULL) + return h2o_socket_error_out_of_memory; + ptls_buffer_init(&rbuf, reserved.base, reserved.len); + do { + size_t consumed = src_end - src; + if ((ret = ptls_receive(sock->ssl->ptls, &rbuf, src, &consumed)) != 0) + break; + src += consumed; + } while (src != src_end); + h2o_buffer_consume(&sock->ssl->input.encrypted, sock->ssl->input.encrypted->size - (src_end - src)); + if (rbuf.is_allocated) { + if ((reserved = h2o_buffer_reserve(&sock->input, rbuf.off)).base == NULL) + return h2o_socket_error_out_of_memory; + memcpy(reserved.base, rbuf.base, rbuf.off); + sock->input->size += rbuf.off; + ptls_buffer_dispose(&rbuf); + } else { + sock->input->size += rbuf.off; + } + if (!(ret == 0 || ret == PTLS_ERROR_IN_PROGRESS)) + return h2o_socket_error_ssl_decode; + } + return NULL; + } +#endif + + while (sock->ssl->input.encrypted->size != 0 || SSL_pending(sock->ssl->ossl)) { + int rlen; + h2o_iovec_t buf = h2o_buffer_reserve(&sock->input, 4096); + if (buf.base == NULL) + return h2o_socket_error_out_of_memory; + { /* call SSL_read (while detecting SSL renegotiation and reporting it as error) */ + int did_write_in_read = 0; + sock->ssl->did_write_in_read = &did_write_in_read; + ERR_clear_error(); + rlen = SSL_read(sock->ssl->ossl, buf.base, (int)buf.len); + sock->ssl->did_write_in_read = NULL; + if (did_write_in_read) + return "ssl renegotiation not supported"; + } + if (rlen == -1) { + if (SSL_get_error(sock->ssl->ossl, rlen) != SSL_ERROR_WANT_READ) { + return h2o_socket_error_ssl_decode; + } + break; + } else if (rlen == 0) { + break; + } else { + sock->input->size += rlen; + } + } + + return 0; +} + +static void flush_pending_ssl(h2o_socket_t *sock, h2o_socket_cb cb) +{ + do_write(sock, sock->ssl->output.bufs.entries, sock->ssl->output.bufs.size, cb); +} + +static void clear_output_buffer(struct st_h2o_socket_ssl_t *ssl) +{ + memset(&ssl->output.bufs, 0, sizeof(ssl->output.bufs)); + h2o_mem_clear_pool(&ssl->output.pool); +} + +static void destroy_ssl(struct st_h2o_socket_ssl_t *ssl) +{ +#if H2O_USE_PICOTLS + if (ssl->ptls != NULL) { + ptls_free(ssl->ptls); + ssl->ptls = NULL; + } +#endif + if (ssl->ossl != NULL) { + if (!SSL_is_server(ssl->ossl)) { + free(ssl->handshake.client.server_name); + free(ssl->handshake.client.session_cache_key.base); + } + SSL_free(ssl->ossl); + ssl->ossl = NULL; + } + h2o_buffer_dispose(&ssl->input.encrypted); + clear_output_buffer(ssl); + free(ssl); +} + +static void dispose_socket(h2o_socket_t *sock, const char *err) +{ + void (*close_cb)(void *data); + void *close_cb_data; + + if (sock->ssl != NULL) { + destroy_ssl(sock->ssl); + sock->ssl = NULL; + } + h2o_buffer_dispose(&sock->input); + if (sock->_peername != NULL) { + free(sock->_peername); + sock->_peername = NULL; + } + + close_cb = sock->on_close.cb; + close_cb_data = sock->on_close.data; + + do_dispose_socket(sock); + + if (close_cb != NULL) + close_cb(close_cb_data); +} + +static void shutdown_ssl(h2o_socket_t *sock, const char *err) +{ + int ret; + + if (err != NULL) + goto Close; + + if (sock->_cb.write != NULL) { + /* note: libuv calls the write callback after the socket is closed by uv_close (with status set to 0 if the write succeeded) + */ + sock->_cb.write = NULL; + goto Close; + } + +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + ptls_buffer_t wbuf; + uint8_t wbuf_small[32]; + ptls_buffer_init(&wbuf, wbuf_small, sizeof(wbuf_small)); + if ((ret = ptls_send_alert(sock->ssl->ptls, &wbuf, PTLS_ALERT_LEVEL_WARNING, PTLS_ALERT_CLOSE_NOTIFY)) != 0) + goto Close; + write_ssl_bytes(sock, wbuf.base, wbuf.off); + ptls_buffer_dispose(&wbuf); + ret = 1; /* close the socket after sending close_notify */ + } else +#endif + if (sock->ssl->ossl != NULL) { + ERR_clear_error(); + if ((ret = SSL_shutdown(sock->ssl->ossl)) == -1) + goto Close; + } else { + goto Close; + } + + if (sock->ssl->output.bufs.size != 0) { + h2o_socket_read_stop(sock); + flush_pending_ssl(sock, ret == 1 ? dispose_socket : shutdown_ssl); + } else if (ret == 2 && SSL_get_error(sock->ssl->ossl, ret) == SSL_ERROR_WANT_READ) { + h2o_socket_read_start(sock, shutdown_ssl); + } else { + goto Close; + } + + return; +Close: + dispose_socket(sock, err); +} + +void h2o_socket_dispose_export(h2o_socket_export_t *info) +{ + assert(info->fd != -1); + if (info->ssl != NULL) { + destroy_ssl(info->ssl); + info->ssl = NULL; + } + h2o_buffer_dispose(&info->input); + close(info->fd); + info->fd = -1; +} + +int h2o_socket_export(h2o_socket_t *sock, h2o_socket_export_t *info) +{ + static h2o_buffer_prototype_t nonpooling_prototype; + + assert(!h2o_socket_is_writing(sock)); + + if (do_export(sock, info) == -1) + return -1; + + if ((info->ssl = sock->ssl) != NULL) { + sock->ssl = NULL; + h2o_buffer_set_prototype(&info->ssl->input.encrypted, &nonpooling_prototype); + } + info->input = sock->input; + h2o_buffer_set_prototype(&info->input, &nonpooling_prototype); + h2o_buffer_init(&sock->input, &h2o_socket_buffer_prototype); + + h2o_socket_close(sock); + + return 0; +} + +h2o_socket_t *h2o_socket_import(h2o_loop_t *loop, h2o_socket_export_t *info) +{ + h2o_socket_t *sock; + + assert(info->fd != -1); + + sock = do_import(loop, info); + info->fd = -1; /* just in case */ + if ((sock->ssl = info->ssl) != NULL) { + setup_bio(sock); + h2o_buffer_set_prototype(&sock->ssl->input.encrypted, &h2o_socket_buffer_prototype); + } + sock->input = info->input; + h2o_buffer_set_prototype(&sock->input, &h2o_socket_buffer_prototype); + return sock; +} + +void h2o_socket_close(h2o_socket_t *sock) +{ + if (sock->ssl == NULL) { + dispose_socket(sock, 0); + } else { + shutdown_ssl(sock, 0); + } +} + +static uint16_t calc_suggested_tls_payload_size(h2o_socket_t *sock, uint16_t suggested_tls_record_size) +{ + uint16_t ps = suggested_tls_record_size; + if (sock->ssl != NULL && sock->ssl->record_overhead < ps) + ps -= sock->ssl->record_overhead; + return ps; +} + +static void disable_latency_optimized_write(h2o_socket_t *sock, int (*adjust_notsent_lowat)(h2o_socket_t *, unsigned)) +{ + if (sock->_latency_optimization.notsent_is_minimized) { + adjust_notsent_lowat(sock, 0); + sock->_latency_optimization.notsent_is_minimized = 0; + } + sock->_latency_optimization.state = H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_DISABLED; + sock->_latency_optimization.suggested_tls_payload_size = 16384; + sock->_latency_optimization.suggested_write_size = SIZE_MAX; +} + +static inline void prepare_for_latency_optimized_write(h2o_socket_t *sock, + const h2o_socket_latency_optimization_conditions_t *conditions, uint32_t rtt, + uint32_t mss, uint32_t cwnd_size, uint32_t cwnd_avail, uint64_t loop_time, + int (*adjust_notsent_lowat)(h2o_socket_t *, unsigned)) +{ + /* check RTT */ + if (rtt < conditions->min_rtt * (uint64_t)1000) + goto Disable; + if (rtt * conditions->max_additional_delay < loop_time * 1000 * 100) + goto Disable; + + /* latency-optimization is enabled */ + sock->_latency_optimization.state = H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_DETERMINED; + + /* no need to: + * 1) adjust the write size if single_write_size << cwnd_size + * 2) align TLS record boundary to TCP packet boundary if packet loss-rate is low and BW isn't small (implied by cwnd size) + */ + if (mss * cwnd_size < conditions->max_cwnd) { + if (!sock->_latency_optimization.notsent_is_minimized) { + if (adjust_notsent_lowat(sock, 1 /* cannot be set to zero on Linux */) != 0) + goto Disable; + sock->_latency_optimization.notsent_is_minimized = 1; + } + sock->_latency_optimization.suggested_tls_payload_size = calc_suggested_tls_payload_size(sock, mss); + sock->_latency_optimization.suggested_write_size = + cwnd_avail * (size_t)sock->_latency_optimization.suggested_tls_payload_size; + } else { + if (sock->_latency_optimization.notsent_is_minimized) { + if (adjust_notsent_lowat(sock, 0) != 0) + goto Disable; + sock->_latency_optimization.notsent_is_minimized = 0; + } + sock->_latency_optimization.suggested_tls_payload_size = 16384; + sock->_latency_optimization.suggested_write_size = SIZE_MAX; + } + return; + +Disable: + disable_latency_optimized_write(sock, adjust_notsent_lowat); +} + +/** + * Obtains RTT, MSS, size of CWND (in the number of packets). + * Also writes to cwnd_avail minimum number of packets (of MSS size) sufficient to shut up poll-for-write under the precondition + * that TCP_NOTSENT_LOWAT is set to 1. + */ +static int obtain_tcp_info(int fd, uint32_t *rtt, uint32_t *mss, uint32_t *cwnd_size, uint32_t *cwnd_avail) +{ +#define CALC_CWND_PAIR_FROM_BYTE_UNITS(cwnd_bytes, inflight_bytes) \ + do { \ + *cwnd_size = (cwnd_bytes + *mss / 2) / *mss; \ + *cwnd_avail = cwnd_bytes > inflight_bytes ? (cwnd_bytes - inflight_bytes) / *mss + 2 : 2; \ + } while (0) + +#if defined(__linux__) && defined(TCP_INFO) + + struct tcp_info tcpi; + socklen_t tcpisz = sizeof(tcpi); + if (getsockopt(fd, IPPROTO_TCP, TCP_INFO, &tcpi, &tcpisz) != 0) + return -1; + *rtt = tcpi.tcpi_rtt; + *mss = tcpi.tcpi_snd_mss; + *cwnd_size = tcpi.tcpi_snd_cwnd; + *cwnd_avail = tcpi.tcpi_snd_cwnd > tcpi.tcpi_unacked ? tcpi.tcpi_snd_cwnd - tcpi.tcpi_unacked + 2 : 2; + return 0; + +#elif defined(__FreeBSD__) && defined(TCP_INFO) && 0 /* disabled since we wouldn't use it anyways; OS lacks TCP_NOTSENT_LOWAT */ + + struct tcp_info tcpi; + socklen_t tcpisz = sizeof(tcpi); + int bytes_inflight; + if (getsockopt(fd, IPPROTO_TCP, TCP_INFO, &tcpi, &tcpisz) != 0 || ioctl(fd, FIONWRITE, &bytes_inflight) == -1) + return -1; + *rtt = tcpi.tcpi_rtt; + *mss = tcpi.tcpi_snd_mss; + CALC_CWND_PAIR_FROM_BYTE_UNITS(tcpi.tcpi_snd_cwnd, bytes_inflight); + return 0; + +#elif defined(__APPLE__) && defined(TCP_CONNECTION_INFO) + + struct tcp_connection_info tcpi; + socklen_t tcpisz = sizeof(tcpi); + if (getsockopt(fd, IPPROTO_TCP, TCP_CONNECTION_INFO, &tcpi, &tcpisz) != 0 || tcpi.tcpi_maxseg == 0) + return -1; + *rtt = tcpi.tcpi_srtt * 1000; + *mss = tcpi.tcpi_maxseg; + CALC_CWND_PAIR_FROM_BYTE_UNITS(tcpi.tcpi_snd_cwnd, tcpi.tcpi_snd_sbbytes); + return 0; + +#else + /* TODO add support for NetBSD; note that the OS returns the number of packets for tcpi_snd_cwnd; see + * http://twitter.com/n_soda/status/740719125878575105 + */ + return -1; +#endif + +#undef CALC_CWND_PAIR_FROM_BYTE_UNITS +} + +#ifdef TCP_NOTSENT_LOWAT +static int adjust_notsent_lowat(h2o_socket_t *sock, unsigned notsent_lowat) +{ + return setsockopt(h2o_socket_get_fd(sock), IPPROTO_TCP, TCP_NOTSENT_LOWAT, ¬sent_lowat, sizeof(notsent_lowat)); +} +#else +#define adjust_notsent_lowat NULL +#endif + +size_t h2o_socket_do_prepare_for_latency_optimized_write(h2o_socket_t *sock, + const h2o_socket_latency_optimization_conditions_t *conditions) +{ + uint32_t rtt = 0, mss = 0, cwnd_size = 0, cwnd_avail = 0; + uint64_t loop_time = UINT64_MAX; + int can_prepare = 1; + +#if !defined(TCP_NOTSENT_LOWAT) + /* the feature cannot be setup unless TCP_NOTSENT_LOWAT is available */ + can_prepare = 0; +#endif + +#if H2O_USE_LIBUV + /* poll-then-write is impossible with libuv */ + can_prepare = 0; +#else + if (can_prepare) + loop_time = h2o_evloop_get_execution_time(h2o_socket_get_loop(sock)); +#endif + + /* obtain TCP states */ + if (can_prepare && obtain_tcp_info(h2o_socket_get_fd(sock), &rtt, &mss, &cwnd_size, &cwnd_avail) != 0) + can_prepare = 0; + + /* determine suggested_write_size, suggested_tls_record_size and adjust TCP_NOTSENT_LOWAT based on the obtained information */ + if (can_prepare) { + prepare_for_latency_optimized_write(sock, conditions, rtt, mss, cwnd_size, cwnd_avail, loop_time, adjust_notsent_lowat); + } else { + disable_latency_optimized_write(sock, adjust_notsent_lowat); + } + + return sock->_latency_optimization.suggested_write_size; + +#undef CALC_CWND_PAIR_FROM_BYTE_UNITS +} + +void h2o_socket_write(h2o_socket_t *sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb) +{ + size_t i, prev_bytes_written = sock->bytes_written; + + for (i = 0; i != bufcnt; ++i) { + sock->bytes_written += bufs[i].len; +#if H2O_SOCKET_DUMP_WRITE + fprintf(stderr, "writing %zu bytes to fd:%d\n", bufs[i].len, h2o_socket_get_fd(sock)); + h2o_dump_memory(stderr, bufs[i].base, bufs[i].len); +#endif + } + + if (sock->ssl == NULL) { + do_write(sock, bufs, bufcnt, cb); + } else { + assert(sock->ssl->output.bufs.size == 0); + /* fill in the data */ + size_t ssl_record_size; + switch (sock->_latency_optimization.state) { + case H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_TBD: + case H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_DISABLED: + ssl_record_size = prev_bytes_written < 200 * 1024 ? calc_suggested_tls_payload_size(sock, 1400) : 16384; + break; + case H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_DETERMINED: + sock->_latency_optimization.state = H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_NEEDS_UPDATE; + /* fallthru */ + default: + ssl_record_size = sock->_latency_optimization.suggested_tls_payload_size; + break; + } + for (; bufcnt != 0; ++bufs, --bufcnt) { + size_t off = 0; + while (off != bufs[0].len) { + int ret; + size_t sz = bufs[0].len - off; + if (sz > ssl_record_size) + sz = ssl_record_size; +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + size_t dst_size = sz + ptls_get_record_overhead(sock->ssl->ptls); + void *dst = h2o_mem_alloc_pool(&sock->ssl->output.pool, dst_size); + ptls_buffer_t wbuf; + ptls_buffer_init(&wbuf, dst, dst_size); + ret = ptls_send(sock->ssl->ptls, &wbuf, bufs[0].base + off, sz); + assert(ret == 0); + assert(!wbuf.is_allocated); + h2o_vector_reserve(&sock->ssl->output.pool, &sock->ssl->output.bufs, sock->ssl->output.bufs.size + 1); + sock->ssl->output.bufs.entries[sock->ssl->output.bufs.size++] = h2o_iovec_init(dst, wbuf.off); + } else +#endif + { + ret = SSL_write(sock->ssl->ossl, bufs[0].base + off, (int)sz); + if (ret != sz) { + /* The error happens if SSL_write is called after SSL_read returns a fatal error (e.g. due to corrupt TCP + * packet being received). We need to take care of this since some protocol implementations send data after + * the read-side of the connection gets closed (note that protocol implementations are (yet) incapable of + * distinguishing a normal shutdown and close due to an error using the `status` value of the read + * callback). + */ + clear_output_buffer(sock->ssl); + flush_pending_ssl(sock, cb); +#ifndef H2O_USE_LIBUV + ((struct st_h2o_evloop_socket_t *)sock)->_flags |= H2O_SOCKET_FLAG_IS_WRITE_ERROR; +#endif + return; + } + } + off += sz; + } + } + flush_pending_ssl(sock, cb); + } +} + +void on_write_complete(h2o_socket_t *sock, const char *err) +{ + h2o_socket_cb cb; + + if (sock->ssl != NULL) + clear_output_buffer(sock->ssl); + + cb = sock->_cb.write; + sock->_cb.write = NULL; + cb(sock, err); +} + +void h2o_socket_read_start(h2o_socket_t *sock, h2o_socket_cb cb) +{ + sock->_cb.read = cb; + do_read_start(sock); +} + +void h2o_socket_read_stop(h2o_socket_t *sock) +{ + sock->_cb.read = NULL; + do_read_stop(sock); +} + +void h2o_socket_setpeername(h2o_socket_t *sock, struct sockaddr *sa, socklen_t len) +{ + if (sock->_peername != NULL) + free(sock->_peername); + sock->_peername = h2o_mem_alloc(offsetof(struct st_h2o_socket_peername_t, addr) + len); + sock->_peername->len = len; + memcpy(&sock->_peername->addr, sa, len); +} + +socklen_t h2o_socket_getpeername(h2o_socket_t *sock, struct sockaddr *sa) +{ + /* return cached, if exists */ + if (sock->_peername != NULL) { + memcpy(sa, &sock->_peername->addr, sock->_peername->len); + return sock->_peername->len; + } + /* call, copy to cache, and return */ + socklen_t len = get_peername_uncached(sock, sa); + h2o_socket_setpeername(sock, sa, len); + return len; +} + +const char *h2o_socket_get_ssl_protocol_version(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) + return "TLSv1.3"; +#endif + if (sock->ssl->ossl != NULL) + return SSL_get_version(sock->ssl->ossl); + } + return NULL; +} + +int h2o_socket_get_ssl_session_reused(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) + return ptls_is_psk_handshake(sock->ssl->ptls); +#endif + if (sock->ssl->ossl != NULL) + return (int)SSL_session_reused(sock->ssl->ossl); + } + return -1; +} + +const char *h2o_socket_get_ssl_cipher(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + ptls_cipher_suite_t *cipher = ptls_get_cipher(sock->ssl->ptls); + if (cipher != NULL) + return cipher->aead->name; + } else +#endif + if (sock->ssl->ossl != NULL) + return SSL_get_cipher_name(sock->ssl->ossl); + } + return NULL; +} + +int h2o_socket_get_ssl_cipher_bits(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + ptls_cipher_suite_t *cipher = ptls_get_cipher(sock->ssl->ptls); + if (cipher == NULL) + return 0; + return (int)cipher->aead->key_size; + } else +#endif + if (sock->ssl->ossl != NULL) + return SSL_get_cipher_bits(sock->ssl->ossl, NULL); + } + return 0; +} + +h2o_iovec_t h2o_socket_get_ssl_session_id(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + /* FIXME */ + } else +#endif + if (sock->ssl->ossl != NULL) { + SSL_SESSION *session; + if (sock->ssl->handshake.server.async_resumption.state == ASYNC_RESUMPTION_STATE_COMPLETE && + (session = SSL_get_session(sock->ssl->ossl)) != NULL) { + unsigned id_len; + const unsigned char *id = SSL_SESSION_get_id(session, &id_len); + return h2o_iovec_init(id, id_len); + } + } + } + + return h2o_iovec_init(NULL, 0); +} + +h2o_iovec_t h2o_socket_log_ssl_session_id(h2o_socket_t *sock, h2o_mem_pool_t *pool) +{ + h2o_iovec_t base64id, rawid = h2o_socket_get_ssl_session_id(sock); + + if (rawid.base == NULL) + return h2o_iovec_init(NULL, 0); + + base64id.base = pool != NULL ? h2o_mem_alloc_pool(pool, h2o_base64_encode_capacity(rawid.len)) + : h2o_mem_alloc(h2o_base64_encode_capacity(rawid.len)); + base64id.len = h2o_base64_encode(base64id.base, rawid.base, rawid.len, 1); + return base64id; +} + +h2o_iovec_t h2o_socket_log_ssl_cipher_bits(h2o_socket_t *sock, h2o_mem_pool_t *pool) +{ + int bits = h2o_socket_get_ssl_cipher_bits(sock); + if (bits != 0) { + char *s = (char *)(pool != NULL ? h2o_mem_alloc_pool(pool, sizeof(H2O_INT16_LONGEST_STR)) + : h2o_mem_alloc(sizeof(H2O_INT16_LONGEST_STR))); + size_t len = sprintf(s, "%" PRId16, (int16_t)bits); + return h2o_iovec_init(s, len); + } else { + return h2o_iovec_init(NULL, 0); + } +} + +int h2o_socket_compare_address(struct sockaddr *x, struct sockaddr *y) +{ +#define CMP(a, b) \ + if (a != b) \ + return a < b ? -1 : 1 + + CMP(x->sa_family, y->sa_family); + + if (x->sa_family == AF_UNIX) { + struct sockaddr_un *xun = (void *)x, *yun = (void *)y; + int r = strcmp(xun->sun_path, yun->sun_path); + if (r != 0) + return r; + } else if (x->sa_family == AF_INET) { + struct sockaddr_in *xin = (void *)x, *yin = (void *)y; + CMP(ntohl(xin->sin_addr.s_addr), ntohl(yin->sin_addr.s_addr)); + CMP(ntohs(xin->sin_port), ntohs(yin->sin_port)); + } else if (x->sa_family == AF_INET6) { + struct sockaddr_in6 *xin6 = (void *)x, *yin6 = (void *)y; + int r = memcmp(xin6->sin6_addr.s6_addr, yin6->sin6_addr.s6_addr, sizeof(xin6->sin6_addr.s6_addr)); + if (r != 0) + return r; + CMP(ntohs(xin6->sin6_port), ntohs(yin6->sin6_port)); + CMP(xin6->sin6_flowinfo, yin6->sin6_flowinfo); + CMP(xin6->sin6_scope_id, yin6->sin6_scope_id); + } else { + assert(!"unknown sa_family"); + } + +#undef CMP + return 0; +} + +size_t h2o_socket_getnumerichost(struct sockaddr *sa, socklen_t salen, char *buf) +{ + if (sa->sa_family == AF_INET) { + /* fast path for IPv4 addresses */ + struct sockaddr_in *sin = (void *)sa; + uint32_t addr; + addr = htonl(sin->sin_addr.s_addr); + return sprintf(buf, "%d.%d.%d.%d", addr >> 24, (addr >> 16) & 255, (addr >> 8) & 255, addr & 255); + } + + if (getnameinfo(sa, salen, buf, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) != 0) + return SIZE_MAX; + return strlen(buf); +} + +int32_t h2o_socket_getport(struct sockaddr *sa) +{ + switch (sa->sa_family) { + case AF_INET: + return htons(((struct sockaddr_in *)sa)->sin_port); + case AF_INET6: + return htons(((struct sockaddr_in6 *)sa)->sin6_port); + default: + return -1; + } +} + +static void create_ossl(h2o_socket_t *sock) +{ + sock->ssl->ossl = SSL_new(sock->ssl->ssl_ctx); + setup_bio(sock); +} + +static SSL_SESSION *on_async_resumption_get(SSL *ssl, +#if OPENSSL_VERSION_NUMBER >= 0x1010000fL && !defined(LIBRESSL_VERSION_NUMBER) + const +#endif + unsigned char *data, + int len, int *copy) +{ + h2o_socket_t *sock = BIO_get_data(SSL_get_rbio(ssl)); + + switch (sock->ssl->handshake.server.async_resumption.state) { + case ASYNC_RESUMPTION_STATE_RECORD: + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_REQUEST_SENT; + resumption_get_async(sock, h2o_iovec_init(data, len)); + return NULL; + case ASYNC_RESUMPTION_STATE_COMPLETE: + *copy = 1; + return sock->ssl->handshake.server.async_resumption.session_data; + default: + assert(!"FIXME"); + return NULL; + } +} + +static int on_async_resumption_new(SSL *ssl, SSL_SESSION *session) +{ + h2o_iovec_t data; + const unsigned char *id; + unsigned id_len; + unsigned char *p; + + /* build data */ + data.len = i2d_SSL_SESSION(session, NULL); + data.base = alloca(data.len); + p = (void *)data.base; + i2d_SSL_SESSION(session, &p); + + id = SSL_SESSION_get_id(session, &id_len); + resumption_new(h2o_iovec_init(id, id_len), data); + return 0; +} + +static void on_handshake_complete(h2o_socket_t *sock, const char *err) +{ + if (err == NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + sock->ssl->record_overhead = ptls_get_record_overhead(sock->ssl->ptls); + } else +#endif + { + const SSL_CIPHER *cipher = SSL_get_current_cipher(sock->ssl->ossl); + switch (SSL_CIPHER_get_id(cipher)) { + case TLS1_CK_RSA_WITH_AES_128_GCM_SHA256: + case TLS1_CK_DHE_RSA_WITH_AES_128_GCM_SHA256: + case TLS1_CK_ECDHE_RSA_WITH_AES_128_GCM_SHA256: + case TLS1_CK_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: + case TLS1_CK_RSA_WITH_AES_256_GCM_SHA384: + case TLS1_CK_DHE_RSA_WITH_AES_256_GCM_SHA384: + case TLS1_CK_ECDHE_RSA_WITH_AES_256_GCM_SHA384: + case TLS1_CK_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384: + sock->ssl->record_overhead = 5 /* header */ + 8 /* record_iv_length (RFC 5288 3) */ + 16 /* tag (RFC 5116 5.1) */; + break; +#if defined(TLS1_CK_DHE_RSA_CHACHA20_POLY1305) + case TLS1_CK_DHE_RSA_CHACHA20_POLY1305: + case TLS1_CK_ECDHE_RSA_CHACHA20_POLY1305: + case TLS1_CK_ECDHE_ECDSA_CHACHA20_POLY1305: + sock->ssl->record_overhead = 5 /* header */ + 16 /* tag */; + break; +#endif + default: + sock->ssl->record_overhead = 32; /* sufficiently large number that can hold most payloads */ + break; + } + } + } + + /* set ssl session into the cache */ + if (sock->ssl->ossl != NULL && !SSL_is_server(sock->ssl->ossl) && sock->ssl->handshake.client.session_cache != NULL) { + if (err == NULL || err == h2o_socket_error_ssl_cert_name_mismatch) { + SSL_SESSION *session = SSL_get1_session(sock->ssl->ossl); + h2o_cache_set(sock->ssl->handshake.client.session_cache, h2o_now(h2o_socket_get_loop(sock)), + sock->ssl->handshake.client.session_cache_key, sock->ssl->handshake.client.session_cache_key_hash, + h2o_iovec_init(session, 1)); + } + } + + h2o_socket_cb handshake_cb = sock->ssl->handshake.cb; + sock->_cb.write = NULL; + sock->ssl->handshake.cb = NULL; + if (err == NULL) + decode_ssl_input(sock); + handshake_cb(sock, err); +} + +static void proceed_handshake(h2o_socket_t *sock, const char *err) +{ + h2o_iovec_t first_input = {NULL}; + int ret = 0; + + sock->_cb.write = NULL; + + if (err != NULL) { + goto Complete; + } + + if (sock->ssl->ossl == NULL) { +#if H2O_USE_PICOTLS + /* prepare I/O */ + size_t consumed = sock->ssl->input.encrypted->size; + ptls_buffer_t wbuf; + ptls_buffer_init(&wbuf, "", 0); + + if (sock->ssl->ptls != NULL) { + /* picotls in action, proceed the handshake */ + ret = ptls_handshake(sock->ssl->ptls, &wbuf, sock->ssl->input.encrypted->bytes, &consumed, NULL); + } else { + /* start using picotls if the first packet contains TLS 1.3 CH */ + ptls_context_t *ptls_ctx = h2o_socket_ssl_get_picotls_context(sock->ssl->ssl_ctx); + if (ptls_ctx != NULL) { + ptls_t *ptls = ptls_new(ptls_ctx, 1); + if (ptls == NULL) + h2o_fatal("no memory"); + ret = ptls_handshake(ptls, &wbuf, sock->ssl->input.encrypted->bytes, &consumed, NULL); + if ((ret == 0 || ret == PTLS_ERROR_IN_PROGRESS) && wbuf.off != 0) { + sock->ssl->ptls = ptls; + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_COMPLETE; + } else { + ptls_free(ptls); + } + } + } + + if (sock->ssl->ptls != NULL) { + /* complete I/O done by picotls */ + h2o_buffer_consume(&sock->ssl->input.encrypted, consumed); + switch (ret) { + case 0: + case PTLS_ERROR_IN_PROGRESS: + if (wbuf.off != 0) { + h2o_socket_read_stop(sock); + write_ssl_bytes(sock, wbuf.base, wbuf.off); + flush_pending_ssl(sock, ret == 0 ? on_handshake_complete : proceed_handshake); + } else { + h2o_socket_read_start(sock, proceed_handshake); + } + break; + default: + /* FIXME send alert in wbuf before calling the callback */ + on_handshake_complete(sock, "picotls handshake error"); + break; + } + ptls_buffer_dispose(&wbuf); + return; + } + ptls_buffer_dispose(&wbuf); +#endif + + /* fallback to openssl if the attempt failed */ + create_ossl(sock); + } + + if (sock->ssl->ossl != NULL && SSL_is_server(sock->ssl->ossl) && + sock->ssl->handshake.server.async_resumption.state == ASYNC_RESUMPTION_STATE_RECORD) { + if (sock->ssl->input.encrypted->size <= 1024) { + /* retain a copy of input if performing async resumption */ + first_input = h2o_iovec_init(alloca(sock->ssl->input.encrypted->size), sock->ssl->input.encrypted->size); + memcpy(first_input.base, sock->ssl->input.encrypted->bytes, first_input.len); + } else { + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_COMPLETE; + } + } + +Redo: + ERR_clear_error(); + if (SSL_is_server(sock->ssl->ossl)) { + ret = SSL_accept(sock->ssl->ossl); + switch (sock->ssl->handshake.server.async_resumption.state) { + case ASYNC_RESUMPTION_STATE_COMPLETE: + break; + case ASYNC_RESUMPTION_STATE_RECORD: + /* async resumption has not been triggered; proceed the state to complete */ + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_COMPLETE; + break; + case ASYNC_RESUMPTION_STATE_REQUEST_SENT: { + /* sent async request, reset the ssl state, and wait for async response */ + assert(ret < 0); + SSL_free(sock->ssl->ossl); + create_ossl(sock); + clear_output_buffer(sock->ssl); + h2o_buffer_consume(&sock->ssl->input.encrypted, sock->ssl->input.encrypted->size); + h2o_buffer_reserve(&sock->ssl->input.encrypted, first_input.len); + memcpy(sock->ssl->input.encrypted->bytes, first_input.base, first_input.len); + sock->ssl->input.encrypted->size = first_input.len; + h2o_socket_read_stop(sock); + return; + } + default: + h2o_fatal("unexpected async resumption state"); + break; + } + } else { + ret = SSL_connect(sock->ssl->ossl); + } + + if (ret == 0 || (ret < 0 && SSL_get_error(sock->ssl->ossl, ret) != SSL_ERROR_WANT_READ)) { + /* failed */ + long verify_result = SSL_get_verify_result(sock->ssl->ossl); + if (verify_result != X509_V_OK) { + err = X509_verify_cert_error_string(verify_result); + } else { + err = "ssl handshake failure"; + } + goto Complete; + } + + if (sock->ssl->output.bufs.size != 0) { + h2o_socket_read_stop(sock); + flush_pending_ssl(sock, ret == 1 ? on_handshake_complete : proceed_handshake); + } else { + if (ret == 1) { + if (!SSL_is_server(sock->ssl->ossl)) { + X509 *cert = SSL_get_peer_certificate(sock->ssl->ossl); + if (cert != NULL) { + switch (validate_hostname(sock->ssl->handshake.client.server_name, cert)) { + case MatchFound: + /* ok */ + break; + case MatchNotFound: + err = h2o_socket_error_ssl_cert_name_mismatch; + break; + default: + err = h2o_socket_error_ssl_cert_invalid; + break; + } + X509_free(cert); + } else { + err = h2o_socket_error_ssl_no_cert; + } + } + goto Complete; + } + if (sock->ssl->input.encrypted->size != 0) + goto Redo; + h2o_socket_read_start(sock, proceed_handshake); + } + return; + +Complete: + h2o_socket_read_stop(sock); + on_handshake_complete(sock, err); +} + +void h2o_socket_ssl_handshake(h2o_socket_t *sock, SSL_CTX *ssl_ctx, const char *server_name, h2o_socket_cb handshake_cb) +{ + sock->ssl = h2o_mem_alloc(sizeof(*sock->ssl)); + memset(sock->ssl, 0, offsetof(struct st_h2o_socket_ssl_t, output.pool)); + + sock->ssl->ssl_ctx = ssl_ctx; + + /* setup the buffers; sock->input should be empty, sock->ssl->input.encrypted should contain the initial input, if any */ + h2o_buffer_init(&sock->ssl->input.encrypted, &h2o_socket_buffer_prototype); + if (sock->input->size != 0) { + h2o_buffer_t *tmp = sock->input; + sock->input = sock->ssl->input.encrypted; + sock->ssl->input.encrypted = tmp; + } + + h2o_mem_init_pool(&sock->ssl->output.pool); + + sock->ssl->handshake.cb = handshake_cb; + if (server_name == NULL) { + /* is server */ + if (SSL_CTX_sess_get_get_cb(sock->ssl->ssl_ctx) != NULL) + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_RECORD; + if (sock->ssl->input.encrypted->size != 0) + proceed_handshake(sock, 0); + else + h2o_socket_read_start(sock, proceed_handshake); + } else { + create_ossl(sock); + h2o_cache_t *session_cache = h2o_socket_ssl_get_session_cache(sock->ssl->ssl_ctx); + if (session_cache != NULL) { + struct sockaddr_storage sa; + int32_t port; + if (h2o_socket_getpeername(sock, (struct sockaddr *)&sa) != 0 && + (port = h2o_socket_getport((struct sockaddr *)&sa)) != -1) { + /* session cache is available */ + h2o_iovec_t session_cache_key; + session_cache_key.base = h2o_mem_alloc(strlen(server_name) + sizeof(":" H2O_UINT16_LONGEST_STR)); + session_cache_key.len = sprintf(session_cache_key.base, "%s:%" PRIu16, server_name, (uint16_t)port); + sock->ssl->handshake.client.session_cache = session_cache; + sock->ssl->handshake.client.session_cache_key = session_cache_key; + sock->ssl->handshake.client.session_cache_key_hash = + h2o_cache_calchash(session_cache_key.base, session_cache_key.len); + + /* fetch from session cache */ + h2o_cache_ref_t *cacheref = h2o_cache_fetch(session_cache, h2o_now(h2o_socket_get_loop(sock)), + sock->ssl->handshake.client.session_cache_key, + sock->ssl->handshake.client.session_cache_key_hash); + if (cacheref != NULL) { + SSL_set_session(sock->ssl->ossl, (SSL_SESSION *)cacheref->value.base); + h2o_cache_release(session_cache, cacheref); + } + } + } + sock->ssl->handshake.client.server_name = h2o_strdup(NULL, server_name, SIZE_MAX).base; + SSL_set_tlsext_host_name(sock->ssl->ossl, sock->ssl->handshake.client.server_name); + proceed_handshake(sock, 0); + } +} + +void h2o_socket_ssl_resume_server_handshake(h2o_socket_t *sock, h2o_iovec_t session_data) +{ + if (session_data.len != 0) { + const unsigned char *p = (void *)session_data.base; + sock->ssl->handshake.server.async_resumption.session_data = d2i_SSL_SESSION(NULL, &p, (long)session_data.len); + /* FIXME warn on failure */ + } + + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_COMPLETE; + proceed_handshake(sock, 0); + + if (sock->ssl->handshake.server.async_resumption.session_data != NULL) { + SSL_SESSION_free(sock->ssl->handshake.server.async_resumption.session_data); + sock->ssl->handshake.server.async_resumption.session_data = NULL; + } +} + +void h2o_socket_ssl_async_resumption_init(h2o_socket_ssl_resumption_get_async_cb get_async_cb, + h2o_socket_ssl_resumption_new_cb new_cb) +{ + resumption_get_async = get_async_cb; + resumption_new = new_cb; +} + +void h2o_socket_ssl_async_resumption_setup_ctx(SSL_CTX *ctx) +{ + SSL_CTX_sess_set_get_cb(ctx, on_async_resumption_get); + SSL_CTX_sess_set_new_cb(ctx, on_async_resumption_new); + /* if necessary, it is the responsibility of the caller to disable the internal cache */ +} + +#if H2O_USE_PICOTLS + +static int get_ptls_index(void) +{ + static int index = -1; + + if (index == -1) { + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&mutex); + if (index == -1) { + index = SSL_CTX_get_ex_new_index(0, NULL, NULL, NULL, NULL); + assert(index != -1); + } + pthread_mutex_unlock(&mutex); + } + + return index; +} + +ptls_context_t *h2o_socket_ssl_get_picotls_context(SSL_CTX *ossl) +{ + return SSL_CTX_get_ex_data(ossl, get_ptls_index()); +} + +void h2o_socket_ssl_set_picotls_context(SSL_CTX *ossl, ptls_context_t *ptls) +{ + SSL_CTX_set_ex_data(ossl, get_ptls_index(), ptls); +} + +#endif + +static void on_dispose_ssl_ctx_session_cache(void *parent, void *ptr, CRYPTO_EX_DATA *ad, int idx, long argl, void *argp) +{ + h2o_cache_t *ssl_session_cache = (h2o_cache_t *)ptr; + if (ssl_session_cache != NULL) + h2o_cache_destroy(ssl_session_cache); +} + +static int get_ssl_session_cache_index(void) +{ + static int index = -1; + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&mutex); + if (index == -1) { + index = SSL_CTX_get_ex_new_index(0, NULL, NULL, NULL, on_dispose_ssl_ctx_session_cache); + assert(index != -1); + } + pthread_mutex_unlock(&mutex); + return index; +} + +h2o_cache_t *h2o_socket_ssl_get_session_cache(SSL_CTX *ctx) +{ + return (h2o_cache_t *)SSL_CTX_get_ex_data(ctx, get_ssl_session_cache_index()); +} + +void h2o_socket_ssl_set_session_cache(SSL_CTX *ctx, h2o_cache_t *cache) +{ + SSL_CTX_set_ex_data(ctx, get_ssl_session_cache_index(), cache); +} + +void h2o_socket_ssl_destroy_session_cache_entry(h2o_iovec_t value) +{ + SSL_SESSION *session = (SSL_SESSION *)value.base; + SSL_SESSION_free(session); +} + +h2o_iovec_t h2o_socket_ssl_get_selected_protocol(h2o_socket_t *sock) +{ + const unsigned char *data = NULL; + unsigned len = 0; + + assert(sock->ssl != NULL); + +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + const char *proto = ptls_get_negotiated_protocol(sock->ssl->ptls); + return proto != NULL ? h2o_iovec_init(proto, strlen(proto)) : h2o_iovec_init(NULL, 0); + } +#endif + +#if H2O_USE_ALPN + if (len == 0) + SSL_get0_alpn_selected(sock->ssl->ossl, &data, &len); +#endif +#if H2O_USE_NPN + if (len == 0) + SSL_get0_next_proto_negotiated(sock->ssl->ossl, &data, &len); +#endif + + return h2o_iovec_init(data, len); +} + +static int on_alpn_select(SSL *ssl, const unsigned char **out, unsigned char *outlen, const unsigned char *_in, unsigned int inlen, + void *_protocols) +{ + const h2o_iovec_t *protocols = _protocols; + size_t i; + + for (i = 0; protocols[i].len != 0; ++i) { + const unsigned char *in = _in, *in_end = in + inlen; + while (in != in_end) { + size_t cand_len = *in++; + if (in_end - in < cand_len) { + /* broken request */ + return SSL_TLSEXT_ERR_NOACK; + } + if (cand_len == protocols[i].len && memcmp(in, protocols[i].base, cand_len) == 0) { + goto Found; + } + in += cand_len; + } + } + /* not found */ + return SSL_TLSEXT_ERR_NOACK; + +Found: + *out = (const unsigned char *)protocols[i].base; + *outlen = (unsigned char)protocols[i].len; + return SSL_TLSEXT_ERR_OK; +} + +#if H2O_USE_ALPN + +void h2o_ssl_register_alpn_protocols(SSL_CTX *ctx, const h2o_iovec_t *protocols) +{ + SSL_CTX_set_alpn_select_cb(ctx, on_alpn_select, (void *)protocols); +} + +#endif + +#if H2O_USE_NPN + +static int on_npn_advertise(SSL *ssl, const unsigned char **out, unsigned *outlen, void *protocols) +{ + *out = protocols; + *outlen = (unsigned)strlen(protocols); + return SSL_TLSEXT_ERR_OK; +} + +void h2o_ssl_register_npn_protocols(SSL_CTX *ctx, const char *protocols) +{ + SSL_CTX_set_next_protos_advertised_cb(ctx, on_npn_advertise, (void *)protocols); +} + +#endif + +void h2o_sliding_counter_stop(h2o_sliding_counter_t *counter, uint64_t now) +{ + uint64_t elapsed; + + assert(counter->cur.start_at != 0); + + /* calculate the time used, and reset cur */ + if (now <= counter->cur.start_at) + elapsed = 0; + else + elapsed = now - counter->cur.start_at; + counter->cur.start_at = 0; + + /* adjust prev */ + counter->prev.sum += elapsed; + counter->prev.sum -= counter->prev.slots[counter->prev.index]; + counter->prev.slots[counter->prev.index] = elapsed; + if (++counter->prev.index >= sizeof(counter->prev.slots) / sizeof(counter->prev.slots[0])) + counter->prev.index = 0; + + /* recalc average */ + counter->average = counter->prev.sum / (sizeof(counter->prev.slots) / sizeof(counter->prev.slots[0])); +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop.c.h new file mode 100644 index 00000000..754ed23b --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop.c.h @@ -0,0 +1,624 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Fastly, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdlib.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <sys/uio.h> +#include <unistd.h> +#include "cloexec.h" +#include "h2o/linklist.h" + +#if !defined(H2O_USE_ACCEPT4) +#ifdef __linux__ +#define H2O_USE_ACCEPT4 1 +#elif __FreeBSD__ >= 10 +#define H2O_USE_ACCEPT4 1 +#else +#define H2O_USE_ACCEPT4 0 +#endif +#endif + +struct st_h2o_evloop_socket_t { + h2o_socket_t super; + int fd; + int _flags; + h2o_evloop_t *loop; + struct { + size_t cnt; + h2o_iovec_t *bufs; + union { + h2o_iovec_t *alloced_ptr; + h2o_iovec_t smallbufs[4]; + }; + } _wreq; + struct st_h2o_evloop_socket_t *_next_pending; + struct st_h2o_evloop_socket_t *_next_statechanged; +}; + +static void link_to_pending(struct st_h2o_evloop_socket_t *sock); +static void write_pending(struct st_h2o_evloop_socket_t *sock); +static h2o_evloop_t *create_evloop(size_t sz); +static void update_now(h2o_evloop_t *loop); +static int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait); + +/* functions to be defined in the backends */ +static int evloop_do_proceed(h2o_evloop_t *loop, int32_t max_wait); +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock); +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock); +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock); + +#if H2O_USE_POLL || H2O_USE_EPOLL || H2O_USE_KQUEUE +/* explicitly specified */ +#else +#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) +#define H2O_USE_KQUEUE 1 +#elif defined(__linux) +#define H2O_USE_EPOLL 1 +#else +#define H2O_USE_POLL 1 +#endif +#endif + +#if H2O_USE_POLL +#include "evloop/poll.c.h" +#elif H2O_USE_EPOLL +#include "evloop/epoll.c.h" +#elif H2O_USE_KQUEUE +#include "evloop/kqueue.c.h" +#else +#error "poller not specified" +#endif + +void link_to_pending(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_next_pending == sock) { + struct st_h2o_evloop_socket_t **slot = (sock->_flags & H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION) != 0 + ? &sock->loop->_pending_as_server + : &sock->loop->_pending_as_client; + sock->_next_pending = *slot; + *slot = sock; + } +} + +static void link_to_statechanged(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_next_statechanged == sock) { + sock->_next_statechanged = NULL; + *sock->loop->_statechanged.tail_ref = sock; + sock->loop->_statechanged.tail_ref = &sock->_next_statechanged; + } +} + +static const char *on_read_core(int fd, h2o_buffer_t **input) +{ + int read_any = 0; + + while (1) { + ssize_t rret; + h2o_iovec_t buf = h2o_buffer_reserve(input, 4096); + if (buf.base == NULL) { + /* memory allocation failed */ + return h2o_socket_error_out_of_memory; + } + while ((rret = read(fd, buf.base, buf.len <= INT_MAX / 2 ? buf.len : INT_MAX / 2 + 1)) == -1 && errno == EINTR) + ; + if (rret == -1) { + if (errno == EAGAIN) + break; + else + return h2o_socket_error_io; + } else if (rret == 0) { + if (!read_any) + return h2o_socket_error_closed; /* TODO notify close */ + break; + } + (*input)->size += rret; + if (buf.len != rret) + break; + read_any = 1; + } + return NULL; +} + +static void wreq_free_buffer_if_allocated(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_wreq.smallbufs <= sock->_wreq.bufs && + sock->_wreq.bufs <= sock->_wreq.smallbufs + sizeof(sock->_wreq.smallbufs) / sizeof(sock->_wreq.smallbufs[0])) { + /* no need to free */ + } else { + free(sock->_wreq.alloced_ptr); + sock->_wreq.bufs = sock->_wreq.smallbufs; + } +} + +static int write_core(int fd, h2o_iovec_t **bufs, size_t *bufcnt) +{ + int iovcnt; + ssize_t wret; + + if (*bufcnt != 0) { + do { + /* write */ + iovcnt = IOV_MAX; + if (*bufcnt < iovcnt) + iovcnt = (int)*bufcnt; + while ((wret = writev(fd, (struct iovec *)*bufs, iovcnt)) == -1 && errno == EINTR) + ; + if (wret == -1) { + if (errno != EAGAIN) + return -1; + break; + } + /* adjust the buffer */ + while ((*bufs)->len < wret) { + wret -= (*bufs)->len; + ++*bufs; + --*bufcnt; + assert(*bufcnt != 0); + } + if (((*bufs)->len -= wret) == 0) { + ++*bufs; + --*bufcnt; + } else { + (*bufs)->base += wret; + } + } while (*bufcnt != 0 && iovcnt == IOV_MAX); + } + + return 0; +} + +void write_pending(struct st_h2o_evloop_socket_t *sock) +{ + assert(sock->super._cb.write != NULL); + + /* DONT_WRITE poll */ + if (sock->_wreq.cnt == 0) + goto Complete; + + /* write */ + if (write_core(sock->fd, &sock->_wreq.bufs, &sock->_wreq.cnt) == 0 && sock->_wreq.cnt != 0) { + /* partial write */ + return; + } + + /* either completed or failed */ + wreq_free_buffer_if_allocated(sock); + +Complete: + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + link_to_statechanged(sock); /* might need to disable the write polling */ +} + +static void read_on_ready(struct st_h2o_evloop_socket_t *sock) +{ + const char *err = 0; + size_t prev_bytes_read = sock->super.input->size; + + if ((sock->_flags & H2O_SOCKET_FLAG_DONT_READ) != 0) + goto Notify; + + if ((err = on_read_core(sock->fd, sock->super.ssl == NULL ? &sock->super.input : &sock->super.ssl->input.encrypted)) != NULL) + goto Notify; + + if (sock->super.ssl != NULL && sock->super.ssl->handshake.cb == NULL) + err = decode_ssl_input(&sock->super); + +Notify: + /* the application may get notified even if no new data is avaiable. The + * behavior is intentional; it is designed as such so that the applications + * can update their timeout counters when a partial SSL record arrives. + */ + sock->super.bytes_read = sock->super.input->size - prev_bytes_read; + sock->super._cb.read(&sock->super, err); +} + +void do_dispose_socket(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + evloop_do_on_socket_close(sock); + wreq_free_buffer_if_allocated(sock); + if (sock->fd != -1) { + close(sock->fd); + sock->fd = -1; + } + sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED; + link_to_statechanged(sock); +} + +void do_write(h2o_socket_t *_sock, h2o_iovec_t *_bufs, size_t bufcnt, h2o_socket_cb cb) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + h2o_iovec_t *bufs; + h2o_iovec_t *tofree = NULL; + + assert(sock->super._cb.write == NULL); + assert(sock->_wreq.cnt == 0); + sock->super._cb.write = cb; + + /* cap the number of buffers, since we're using alloca */ + if (bufcnt > 10000) + bufs = tofree = h2o_mem_alloc(sizeof(*bufs) * bufcnt); + else + bufs = alloca(sizeof(*bufs) * bufcnt); + + memcpy(bufs, _bufs, sizeof(*bufs) * bufcnt); + + /* try to write now */ + if (write_core(sock->fd, &bufs, &bufcnt) != 0) { + /* fill in _wreq.bufs with fake data to indicate error */ + sock->_wreq.bufs = sock->_wreq.smallbufs; + sock->_wreq.cnt = 1; + *sock->_wreq.bufs = h2o_iovec_init(H2O_STRLIT("deadbeef")); + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + goto Out; + } + if (bufcnt == 0) { + /* write complete, schedule the callback */ + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + goto Out; + } + + + /* setup the buffer to send pending data */ + if (bufcnt <= sizeof(sock->_wreq.smallbufs) / sizeof(sock->_wreq.smallbufs[0])) { + sock->_wreq.bufs = sock->_wreq.smallbufs; + } else { + sock->_wreq.bufs = h2o_mem_alloc(sizeof(h2o_iovec_t) * bufcnt); + sock->_wreq.alloced_ptr = sock->_wreq.bufs; + } + memcpy(sock->_wreq.bufs, bufs, sizeof(h2o_iovec_t) * bufcnt); + sock->_wreq.cnt = bufcnt; + + /* schedule the write */ + link_to_statechanged(sock); +Out: + free(tofree); +} + +int h2o_socket_get_fd(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + return sock->fd; +} + +void do_read_start(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + link_to_statechanged(sock); +} + +void do_read_stop(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY; + link_to_statechanged(sock); +} + +void h2o_socket_dont_read(h2o_socket_t *_sock, int dont_read) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + if (dont_read) { + sock->_flags |= H2O_SOCKET_FLAG_DONT_READ; + } else { + sock->_flags &= ~H2O_SOCKET_FLAG_DONT_READ; + } +} + +int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + + assert((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) == 0); + evloop_do_on_socket_export(sock); + sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED; + + info->fd = sock->fd; + sock->fd = -1; + + return 0; +} + +h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info) +{ + return h2o_evloop_socket_create(loop, info->fd, 0); +} + +h2o_loop_t *h2o_socket_get_loop(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + return sock->loop; +} + +socklen_t h2o_socket_getsockname(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + socklen_t len = sizeof(struct sockaddr_storage); + if (getsockname(sock->fd, sa, &len) != 0) + return 0; + return len; +} + +socklen_t get_peername_uncached(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + socklen_t len = sizeof(struct sockaddr_storage); + if (getpeername(sock->fd, sa, &len) != 0) + return 0; + return len; +} + +static struct st_h2o_evloop_socket_t *create_socket(h2o_evloop_t *loop, int fd, int flags) +{ + struct st_h2o_evloop_socket_t *sock; + + fcntl(fd, F_SETFL, O_NONBLOCK); + + sock = h2o_mem_alloc(sizeof(*sock)); + memset(sock, 0, sizeof(*sock)); + h2o_buffer_init(&sock->super.input, &h2o_socket_buffer_prototype); + sock->loop = loop; + sock->fd = fd; + sock->_flags = flags; + sock->_wreq.bufs = sock->_wreq.smallbufs; + sock->_next_pending = sock; + sock->_next_statechanged = sock; + + evloop_do_on_socket_create(sock); + + return sock; +} + +static struct st_h2o_evloop_socket_t *create_socket_set_nodelay(h2o_evloop_t *loop, int fd, int flags) +{ + int on = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); + return create_socket(loop, fd, flags); +} + +h2o_socket_t *h2o_evloop_socket_create(h2o_evloop_t *loop, int fd, int flags) +{ + fcntl(fd, F_SETFL, O_NONBLOCK); + return &create_socket(loop, fd, flags)->super; +} + +h2o_socket_t *h2o_evloop_socket_accept(h2o_socket_t *_listener) +{ + struct st_h2o_evloop_socket_t *listener = (struct st_h2o_evloop_socket_t *)_listener; + int fd; + +#if H2O_USE_ACCEPT4 + if ((fd = accept4(listener->fd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)) == -1) + return NULL; +#else + if ((fd = cloexec_accept(listener->fd, NULL, NULL)) == -1) + return NULL; + fcntl(fd, F_SETFL, O_NONBLOCK); +#endif + + return &create_socket_set_nodelay(listener->loop, fd, H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION)->super; +} + +h2o_socket_t *h2o_socket_connect(h2o_loop_t *loop, struct sockaddr *addr, socklen_t addrlen, h2o_socket_cb cb) +{ + int fd; + struct st_h2o_evloop_socket_t *sock; + + if ((fd = cloexec_socket(addr->sa_family, SOCK_STREAM, 0)) == -1) + return NULL; + fcntl(fd, F_SETFL, O_NONBLOCK); + if (!(connect(fd, addr, addrlen) == 0 || errno == EINPROGRESS)) { + close(fd); + return NULL; + } + + sock = create_socket_set_nodelay(loop, fd, H2O_SOCKET_FLAG_IS_CONNECTING); + h2o_socket_notify_write(&sock->super, cb); + return &sock->super; +} + +h2o_evloop_t *create_evloop(size_t sz) +{ + h2o_evloop_t *loop = h2o_mem_alloc(sz); + + memset(loop, 0, sz); + loop->_statechanged.tail_ref = &loop->_statechanged.head; + h2o_linklist_init_anchor(&loop->_timeouts); + + update_now(loop); + + return loop; +} + +void update_now(h2o_evloop_t *loop) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + loop->_now = (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; +} + +int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait) +{ + uint64_t wake_at = h2o_timeout_get_wake_at(&loop->_timeouts); + + update_now(loop); + + if (wake_at <= loop->_now) { + max_wait = 0; + } else { + uint64_t delta = wake_at - loop->_now; + if (delta < max_wait) + max_wait = (int32_t)delta; + } + + return max_wait; +} + +void h2o_socket_notify_write(h2o_socket_t *_sock, h2o_socket_cb cb) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + assert(sock->super._cb.write == NULL); + assert(sock->_wreq.cnt == 0); + + sock->super._cb.write = cb; + link_to_statechanged(sock); +} + +static void run_socket(struct st_h2o_evloop_socket_t *sock) +{ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + /* is freed in updatestates phase */ + return; + } + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_READ_READY) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY; + read_on_ready(sock); + } + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_WRITE_NOTIFY) != 0) { + const char *err = NULL; + assert(sock->super._cb.write != NULL); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + if (sock->_wreq.cnt != 0) { + /* error */ + err = h2o_socket_error_io; + sock->_wreq.cnt = 0; + } else if ((sock->_flags & H2O_SOCKET_FLAG_IS_CONNECTING) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_CONNECTING; + int so_err = 0; + socklen_t l = sizeof(so_err); + so_err = 0; + if (getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &so_err, &l) != 0 || so_err != 0) { + /* FIXME lookup the error table */ + err = h2o_socket_error_conn_fail; + } + } + on_write_complete(&sock->super, err); + } +} + +static void run_pending(h2o_evloop_t *loop) +{ + struct st_h2o_evloop_socket_t *sock; + + while (loop->_pending_as_server != NULL || loop->_pending_as_client != NULL) { + while ((sock = loop->_pending_as_client) != NULL) { + loop->_pending_as_client = sock->_next_pending; + sock->_next_pending = sock; + run_socket(sock); + } + if ((sock = loop->_pending_as_server) != NULL) { + loop->_pending_as_server = sock->_next_pending; + sock->_next_pending = sock; + run_socket(sock); + } + } +} + +void h2o_evloop_destroy(h2o_evloop_t *loop) +{ + struct st_h2o_evloop_socket_t *sock; + + /* timeouts are governed by the application and MUST be destroyed prior to destroying the loop */ + assert(h2o_linklist_is_empty(&loop->_timeouts)); + + /* dispose all socket */ + while ((sock = loop->_pending_as_client) != NULL) { + loop->_pending_as_client = sock->_next_pending; + sock->_next_pending = sock; + h2o_socket_close((h2o_socket_t *)sock); + } + while ((sock = loop->_pending_as_server) != NULL) { + loop->_pending_as_server = sock->_next_pending; + sock->_next_pending = sock; + h2o_socket_close((h2o_socket_t *)sock); + } + + /* now all socket are disposedand and placed in linked list statechanged + * we can freeing memory in cycle by next_statechanged, + */ + while ((sock = loop->_statechanged.head) != NULL) { + loop->_statechanged.head = sock->_next_statechanged; + free(sock); + } + + /* lastly we need to free loop memory */ + free(loop); +} + +int h2o_evloop_run(h2o_evloop_t *loop, int32_t max_wait) +{ + h2o_linklist_t *node; + + /* update socket states, poll, set readable flags, perform pending writes */ + if (evloop_do_proceed(loop, max_wait) != 0) + return -1; + + /* run the pending callbacks */ + run_pending(loop); + + /* run the timeouts */ + for (node = loop->_timeouts.next; node != &loop->_timeouts; node = node->next) { + h2o_timeout_t *timeout = H2O_STRUCT_FROM_MEMBER(h2o_timeout_t, _link, node); + h2o_timeout_run(loop, timeout, loop->_now); + } + /* assert h2o_timeout_run has called run_pending */ + assert(loop->_pending_as_client == NULL); + assert(loop->_pending_as_server == NULL); + + if (h2o_sliding_counter_is_running(&loop->exec_time_counter)) { + update_now(loop); + h2o_sliding_counter_stop(&loop->exec_time_counter, loop->_now); + } + + return 0; +} + +void h2o_timeout__do_init(h2o_evloop_t *loop, h2o_timeout_t *timeout) +{ + h2o_linklist_insert(&loop->_timeouts, &timeout->_link); +} + +void h2o_timeout__do_dispose(h2o_evloop_t *loop, h2o_timeout_t *timeout) +{ + h2o_linklist_unlink(&timeout->_link); +} + +void h2o_timeout__do_link(h2o_evloop_t *loop, h2o_timeout_t *timeout, h2o_timeout_entry_t *entry) +{ + /* nothing to do */ +} + +void h2o_timeout__do_post_callback(h2o_evloop_t *loop) +{ + run_pending(loop); +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h new file mode 100644 index 00000000..247dac89 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <limits.h> +#include <stdio.h> +#include <sys/epoll.h> + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_evloop_epoll_t { + h2o_evloop_t super; + int ep; +}; + +static int update_status(struct st_h2o_evloop_epoll_t *loop) +{ + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + free(sock); + } else { + int changed = 0, op, ret; + struct epoll_event ev; + ev.events = 0; + if (h2o_socket_is_reading(&sock->super)) { + ev.events |= EPOLLIN; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + changed = 1; + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + changed = 1; + } + } + if (h2o_socket_is_writing(&sock->super)) { + ev.events |= EPOLLOUT; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + changed = 1; + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + changed = 1; + } + } + if (changed) { + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) != 0) { + if (ev.events != 0) + op = EPOLL_CTL_MOD; + else + op = EPOLL_CTL_DEL; + } else { + assert(ev.events != 0); + op = EPOLL_CTL_ADD; + } + ev.data.ptr = sock; + while ((ret = epoll_ctl(loop->ep, op, sock->fd, &ev)) != 0 && errno == EINTR) + ; + if (ret != 0) + return -1; + if (op == EPOLL_CTL_DEL) + sock->_flags &= ~H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED; + else + sock->_flags |= H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED; + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; + + return 0; +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop; + struct epoll_event events[256]; + int nevents, i; + + /* collect (and update) status */ + if (update_status(loop) != 0) + return -1; + + /* poll */ + max_wait = adjust_max_wait(&loop->super, max_wait); + nevents = epoll_wait(loop->ep, events, sizeof(events) / sizeof(events[0]), max_wait); + update_now(&loop->super); + if (nevents == -1) + return -1; + + if (nevents != 0) + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + + /* update readable flags, perform writes */ + for (i = 0; i != nevents; ++i) { + struct st_h2o_evloop_socket_t *sock = events[i].data.ptr; + int notified = 0; + if ((events[i].events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + notified = 1; + } + } + if ((events[i].events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + write_pending(sock); + notified = 1; + } + } + if (!notified) { + static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + static time_t last_reported = 0; + time_t now = time(NULL); + pthread_mutex_lock(&lock); + if (last_reported + 60 < now) { + last_reported = now; + fprintf(stderr, "ignoring epoll event (fd:%d,event:%x)\n", sock->fd, (int)events[i].events); + } + pthread_mutex_unlock(&lock); + } + } + + return 0; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop; + int ret; + + if (sock->fd == -1) + return; + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) + return; + while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR) + ; + if (ret != 0) + fprintf(stderr, "socket_close: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop; + int ret; + + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) + return; + while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR) + ; + if (ret != 0) + fprintf(stderr, "socket_export: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)create_evloop(sizeof(*loop)); + + pthread_mutex_lock(&cloexec_mutex); + loop->ep = epoll_create(10); + while (fcntl(loop->ep, F_SETFD, FD_CLOEXEC) == -1) { + if (errno != EAGAIN) { + fprintf(stderr, "h2o_evloop_create: failed to set FD_CLOEXEC to the epoll fd (errno=%d)\n", errno); + abort(); + } + } + pthread_mutex_unlock(&cloexec_mutex); + + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h new file mode 100644 index 00000000..21288ed7 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/event.h> +#include <sys/time.h> + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_socket_loop_kqueue_t { + h2o_evloop_t super; + int kq; +}; + +static void ev_set(struct kevent *ev, int fd, int filter, int flags, struct st_h2o_evloop_socket_t *sock) +{ +#ifdef __NetBSD__ + EV_SET(ev, fd, filter, flags, 0, 0, (intptr_t)sock); +#else + EV_SET(ev, fd, filter, flags, 0, 0, sock); +#endif +} + +static int collect_status(struct st_h2o_socket_loop_kqueue_t *loop, struct kevent *changelist, int changelist_capacity) +{ + int change_index = 0; + +#define SET_AND_UPDATE(filter, flags) \ + do { \ + ev_set(changelist + change_index++, sock->fd, filter, flags, sock); \ + if (change_index == changelist_capacity) { \ + int ret; \ + while ((ret = kevent(loop->kq, changelist, change_index, NULL, 0, NULL)) != 0 && errno == EINTR) \ + ; \ + if (ret == -1) \ + return -1; \ + change_index = 0; \ + } \ + } while (0) + + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + free(sock); + } else { + if (h2o_socket_is_reading(&sock->super)) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + SET_AND_UPDATE(EVFILT_READ, EV_ADD); + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + SET_AND_UPDATE(EVFILT_READ, EV_DELETE); + } + } + if (h2o_socket_is_writing(&sock->super)) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + SET_AND_UPDATE(EVFILT_WRITE, EV_ADD); + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + SET_AND_UPDATE(EVFILT_WRITE, EV_DELETE); + } + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; + + return change_index; + +#undef SET_AND_UPDATE +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)_loop; + struct kevent changelist[64], events[128]; + int nchanges, nevents, i; + struct timespec ts; + + /* collect (and update) status */ + if ((nchanges = collect_status(loop, changelist, sizeof(changelist) / sizeof(changelist[0]))) == -1) + return -1; + + /* poll */ + max_wait = adjust_max_wait(&loop->super, max_wait); + ts.tv_sec = max_wait / 1000; + ts.tv_nsec = max_wait % 1000 * 1000 * 1000; + nevents = kevent(loop->kq, changelist, nchanges, events, sizeof(events) / sizeof(events[0]), &ts); + + update_now(&loop->super); + if (nevents == -1) + return -1; + + if (nevents != 0) + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + + /* update readable flags, perform writes */ + for (i = 0; i != nevents; ++i) { + struct st_h2o_evloop_socket_t *sock = (void *)events[i].udata; + assert(sock->fd == events[i].ident); + switch (events[i].filter) { + case EVFILT_READ: + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + } + break; + case EVFILT_WRITE: + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + write_pending(sock); + } + break; + default: + break; /* ??? */ + } + } + + return 0; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (void *)sock->loop; + struct kevent changelist[2]; + int change_index = 0, ret; + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) + ev_set(changelist + change_index++, sock->fd, EVFILT_READ, EV_DELETE, 0); + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) + ev_set(changelist + change_index++, sock->fd, EVFILT_WRITE, EV_DELETE, 0); + if (change_index == 0) + return; + while ((ret = kevent(loop->kq, changelist, change_index, NULL, 0, NULL)) != 0 && errno == EINTR) + ; + if (ret == -1) + fprintf(stderr, "kevent returned error %d (fd=%d)", errno, sock->fd); +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)create_evloop(sizeof(*loop)); + + loop->kq = kqueue(); + + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h new file mode 100644 index 00000000..8b3f3d14 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2014,2015 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <stdio.h> +#include <poll.h> + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_evloop_poll_t { + h2o_evloop_t super; + H2O_VECTOR(struct st_h2o_evloop_socket_t *) socks; +}; + +static void update_socks(struct st_h2o_evloop_poll_t *loop) +{ + /* update loop->socks */ + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + assert(sock->fd == -1); + free(sock); + } else { + assert(sock->fd < loop->socks.size); + if (loop->socks.entries[sock->fd] == NULL) { + loop->socks.entries[sock->fd] = sock; + } else { + assert(loop->socks.entries[sock->fd] == sock); + } + if (h2o_socket_is_reading(&sock->super)) { + DEBUG_LOG("setting READ for fd: %d\n", sock->fd); + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + } else { + DEBUG_LOG("clearing READ for fd: %d\n", sock->fd); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + } + if (h2o_socket_is_writing(&sock->super)) { + DEBUG_LOG("setting WRITE for fd: %d\n", sock->fd); + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + } else { + DEBUG_LOG("clearing WRITE for fd: %d\n", sock->fd); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)_loop; + H2O_VECTOR(struct pollfd) pollfds = {NULL}; + int fd, ret; + + /* update status */ + update_socks(loop); + + /* build list of fds to be polled */ + for (fd = 0; fd != loop->socks.size; ++fd) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[fd]; + if (sock == NULL) + continue; + assert(fd == sock->fd); + if ((sock->_flags & (H2O_SOCKET_FLAG_IS_POLLED_FOR_READ | H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE)) != 0) { + h2o_vector_reserve(NULL, &pollfds, pollfds.size + 1); + struct pollfd *slot = pollfds.entries + pollfds.size++; + slot->fd = fd; + slot->events = 0; + slot->revents = 0; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) + slot->events |= POLLIN; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) + slot->events |= POLLOUT; + } + } + + /* call */ + max_wait = adjust_max_wait(&loop->super, max_wait); + ret = poll(pollfds.entries, (nfds_t)pollfds.size, max_wait); + update_now(&loop->super); + if (ret == -1) + goto Exit; + DEBUG_LOG("poll returned: %d\n", ret); + + /* update readable flags, perform writes */ + if (ret > 0) { + size_t i; + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + for (i = 0; i != pollfds.size; ++i) { + /* set read_ready flag before calling the write cb, since app. code invoked by the latter may close the socket, clearing + * the former flag */ + if ((pollfds.entries[i].revents & POLLIN) != 0) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[pollfds.entries[i].fd]; + assert(sock != NULL); + assert(sock->fd == pollfds.entries[i].fd); + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + DEBUG_LOG("added fd %d as read_ready\n", sock->fd); + } + } + if ((pollfds.entries[i].revents & POLLOUT) != 0) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[pollfds.entries[i].fd]; + assert(sock != NULL); + assert(sock->fd == pollfds.entries[i].fd); + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + DEBUG_LOG("handling pending writes on fd %d\n", fd); + write_pending(sock); + } + } + } + ret = 0; + } + +Exit: + free(pollfds.entries); + return ret; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + + if (sock->fd >= loop->socks.size) { + h2o_vector_reserve(NULL, &loop->socks, sock->fd + 1); + memset(loop->socks.entries + loop->socks.size, 0, (sock->fd + 1 - loop->socks.size) * sizeof(loop->socks.entries[0])); + loop->socks.size = sock->fd + 1; + } + + if (loop->socks.entries[sock->fd] != NULL) + assert(loop->socks.entries[sock->fd]->_flags == H2O_SOCKET_FLAG_IS_DISPOSED); +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + + if (sock->fd != -1) + loop->socks.entries[sock->fd] = NULL; +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + evloop_do_on_socket_close(sock); + loop->socks.entries[sock->fd] = NULL; +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)create_evloop(sizeof(*loop)); + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h b/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h new file mode 100644 index 00000000..44c71c16 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Fastly, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +struct st_h2o_uv_socket_t { + h2o_socket_t super; + struct { + uv_stream_t *stream; + uv_close_cb close_cb; + } uv; + union { + uv_connect_t _creq; + uv_write_t _wreq; + }; +}; + +static void schedule_timer(h2o_timeout_t *timeout); + +static void alloc_inbuf_tcp(uv_handle_t *handle, size_t suggested_size, uv_buf_t *_buf) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + + h2o_iovec_t buf = h2o_buffer_reserve(&sock->super.input, 4096); + memcpy(_buf, &buf, sizeof(buf)); +} + +static void alloc_inbuf_ssl(uv_handle_t *handle, size_t suggested_size, uv_buf_t *_buf) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + + h2o_iovec_t buf = h2o_buffer_reserve(&sock->super.ssl->input.encrypted, 4096); + memcpy(_buf, &buf, sizeof(buf)); +} + +static void on_read_tcp(uv_stream_t *stream, ssize_t nread, const uv_buf_t *_unused) +{ + struct st_h2o_uv_socket_t *sock = stream->data; + + if (nread < 0) { + sock->super.bytes_read = 0; + sock->super._cb.read(&sock->super, h2o_socket_error_closed); + return; + } + + sock->super.input->size += nread; + sock->super.bytes_read = nread; + sock->super._cb.read(&sock->super, NULL); +} + +static void on_read_ssl(uv_stream_t *stream, ssize_t nread, const uv_buf_t *_unused) +{ + struct st_h2o_uv_socket_t *sock = stream->data; + size_t prev_bytes_read = sock->super.input->size; + const char *err = h2o_socket_error_io; + + if (nread > 0) { + sock->super.ssl->input.encrypted->size += nread; + if (sock->super.ssl->handshake.cb == NULL) + err = decode_ssl_input(&sock->super); + else + err = NULL; + } + sock->super.bytes_read = sock->super.input->size - prev_bytes_read; + sock->super._cb.read(&sock->super, err); +} + +static void on_do_write_complete(uv_write_t *wreq, int status) +{ + struct st_h2o_uv_socket_t *sock = H2O_STRUCT_FROM_MEMBER(struct st_h2o_uv_socket_t, _wreq, wreq); + if (sock->super._cb.write != NULL) + on_write_complete(&sock->super, status == 0 ? NULL : h2o_socket_error_io); +} + +static void free_sock(uv_handle_t *handle) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + uv_close_cb cb = sock->uv.close_cb; + free(sock); + cb(handle); +} + +void do_dispose_socket(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + sock->super._cb.write = NULL; /* avoid the write callback getting called when closing the socket (#1249) */ + uv_close((uv_handle_t *)sock->uv.stream, free_sock); +} + +int h2o_socket_get_fd(h2o_socket_t *_sock) +{ + int fd, ret; + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + ret = uv_fileno((uv_handle_t *)sock->uv.stream, (uv_os_fd_t *)&fd); + if (ret) + return -1; + + return fd; +} + +void do_read_start(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + if (sock->super.ssl == NULL) + uv_read_start(sock->uv.stream, alloc_inbuf_tcp, on_read_tcp); + else + uv_read_start(sock->uv.stream, alloc_inbuf_ssl, on_read_ssl); +} + +void do_read_stop(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + uv_read_stop(sock->uv.stream); +} + +void do_write(h2o_socket_t *_sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + assert(sock->super._cb.write == NULL); + sock->super._cb.write = cb; + + uv_write(&sock->_wreq, sock->uv.stream, (uv_buf_t *)bufs, (int)bufcnt, on_do_write_complete); +} + +static struct st_h2o_uv_socket_t *create_socket(h2o_loop_t *loop) +{ + uv_tcp_t *tcp = h2o_mem_alloc(sizeof(*tcp)); + + if (uv_tcp_init(loop, tcp) != 0) { + free(tcp); + return NULL; + } + return (void *)h2o_uv_socket_create((void *)tcp, (uv_close_cb)free); +} + +int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + uv_os_fd_t fd; + + if (uv_fileno((uv_handle_t *)sock->uv.stream, &fd) != 0) + return -1; + /* FIXME: consider how to overcome the epoll(2) problem; man says, + * "even after a file descriptor that is part of an epoll set has been closed, + * events may be reported for that file descriptor if other file descriptors + * referring to the same underlying file description remain open" + */ + if ((info->fd = dup(fd)) == -1) + return -1; + return 0; +} + +h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info) +{ + struct st_h2o_uv_socket_t *sock = create_socket(loop); + + if (sock == NULL) + return NULL; + if (uv_tcp_open((uv_tcp_t *)sock->uv.stream, info->fd) != 0) { + h2o_socket_close(&sock->super); + return NULL; + } + + return &sock->super; +} + +h2o_socket_t *h2o_uv_socket_create(uv_stream_t *stream, uv_close_cb close_cb) +{ + struct st_h2o_uv_socket_t *sock = h2o_mem_alloc(sizeof(*sock)); + + memset(sock, 0, sizeof(*sock)); + h2o_buffer_init(&sock->super.input, &h2o_socket_buffer_prototype); + sock->uv.stream = stream; + sock->uv.close_cb = close_cb; + stream->data = sock; + return &sock->super; +} + +static void on_connect(uv_connect_t *conn, int status) +{ + if (status == UV_ECANCELED) + return; + struct st_h2o_uv_socket_t *sock = H2O_STRUCT_FROM_MEMBER(struct st_h2o_uv_socket_t, _creq, conn); + h2o_socket_cb cb = sock->super._cb.write; + sock->super._cb.write = NULL; + cb(&sock->super, status == 0 ? NULL : h2o_socket_error_conn_fail); +} + +h2o_loop_t *h2o_socket_get_loop(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + return sock->uv.stream->loop; +} + +h2o_socket_t *h2o_socket_connect(h2o_loop_t *loop, struct sockaddr *addr, socklen_t addrlen, h2o_socket_cb cb) +{ + struct st_h2o_uv_socket_t *sock = create_socket(loop); + + if (sock == NULL) + return NULL; + if (uv_tcp_connect(&sock->_creq, (void *)sock->uv.stream, addr, on_connect) != 0) { + h2o_socket_close(&sock->super); + return NULL; + } + sock->super._cb.write = cb; + return &sock->super; +} + +socklen_t h2o_socket_getsockname(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + int len = sizeof(struct sockaddr_storage); + if (uv_tcp_getsockname((void *)sock->uv.stream, sa, &len) != 0) + return 0; + return (socklen_t)len; +} + +socklen_t get_peername_uncached(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + int len = sizeof(struct sockaddr_storage); + if (uv_tcp_getpeername((void *)sock->uv.stream, sa, &len) != 0) + return 0; + return (socklen_t)len; +} + +static void on_timeout(uv_timer_t *timer) +{ + h2o_timeout_t *timeout = H2O_STRUCT_FROM_MEMBER(h2o_timeout_t, _backend.timer, timer); + + h2o_timeout_run(timer->loop, timeout, h2o_now(timer->loop)); + if (!h2o_linklist_is_empty(&timeout->_entries)) + schedule_timer(timeout); +} + +void schedule_timer(h2o_timeout_t *timeout) +{ + h2o_timeout_entry_t *entry = H2O_STRUCT_FROM_MEMBER(h2o_timeout_entry_t, _link, timeout->_entries.next); + uv_timer_start(&timeout->_backend.timer, on_timeout, + entry->registered_at + timeout->timeout - h2o_now(timeout->_backend.timer.loop), 0); +} + +void h2o_timeout__do_init(h2o_loop_t *loop, h2o_timeout_t *timeout) +{ + uv_timer_init(loop, &timeout->_backend.timer); +} + +void h2o_timeout__do_dispose(h2o_loop_t *loop, h2o_timeout_t *timeout) +{ + uv_close((uv_handle_t *)&timeout->_backend.timer, NULL); +} + +void h2o_timeout__do_link(h2o_loop_t *loop, h2o_timeout_t *timeout, h2o_timeout_entry_t *entry) +{ + /* register the timer if the entry just being added is the only entry */ + if (timeout->_entries.next == &entry->_link) + schedule_timer(timeout); +} + +void h2o_timeout__do_post_callback(h2o_loop_t *loop) +{ + /* nothing to do */ +} diff --git a/web/server/h2o/libh2o/lib/common/socketpool.c b/web/server/h2o/libh2o/lib/common/socketpool.c new file mode 100644 index 00000000..da69933f --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socketpool.c @@ -0,0 +1,342 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <errno.h> +#include <netdb.h> +#include <stdlib.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <netinet/in.h> +#include "h2o/hostinfo.h" +#include "h2o/linklist.h" +#include "h2o/socketpool.h" +#include "h2o/string_.h" +#include "h2o/timeout.h" + +struct pool_entry_t { + h2o_socket_export_t sockinfo; + h2o_linklist_t link; + uint64_t added_at; +}; + +struct st_h2o_socketpool_connect_request_t { + void *data; + h2o_socketpool_connect_cb cb; + h2o_socketpool_t *pool; + h2o_loop_t *loop; + h2o_hostinfo_getaddr_req_t *getaddr_req; + h2o_socket_t *sock; +}; + +static void destroy_detached(struct pool_entry_t *entry) +{ + h2o_socket_dispose_export(&entry->sockinfo); + free(entry); +} + +static void destroy_attached(struct pool_entry_t *entry) +{ + h2o_linklist_unlink(&entry->link); + destroy_detached(entry); +} + +static void destroy_expired(h2o_socketpool_t *pool) +{ + /* caller should lock the mutex */ + uint64_t expire_before = h2o_now(pool->_interval_cb.loop) - pool->timeout; + while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { + struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + if (entry->added_at > expire_before) + break; + destroy_attached(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + } +} + +static void on_timeout(h2o_timeout_entry_t *timeout_entry) +{ + /* FIXME decrease the frequency of this function being called; the expiration + * check can be (should be) performed in the `connect` fuction as well + */ + h2o_socketpool_t *pool = H2O_STRUCT_FROM_MEMBER(h2o_socketpool_t, _interval_cb.entry, timeout_entry); + + if (pthread_mutex_trylock(&pool->_shared.mutex) == 0) { + destroy_expired(pool); + pthread_mutex_unlock(&pool->_shared.mutex); + } + + h2o_timeout_link(pool->_interval_cb.loop, &pool->_interval_cb.timeout, &pool->_interval_cb.entry); +} + +static void common_init(h2o_socketpool_t *pool, h2o_socketpool_type_t type, h2o_iovec_t host, int is_ssl, size_t capacity) +{ + memset(pool, 0, sizeof(*pool)); + + pool->type = type; + pool->peer.host = h2o_strdup(NULL, host.base, host.len); + pool->is_ssl = is_ssl; + pool->capacity = capacity; + pool->timeout = UINT64_MAX; + + pthread_mutex_init(&pool->_shared.mutex, NULL); + h2o_linklist_init_anchor(&pool->_shared.sockets); +} + +void h2o_socketpool_init_by_address(h2o_socketpool_t *pool, struct sockaddr *sa, socklen_t salen, int is_ssl, size_t capacity) +{ + char host[NI_MAXHOST]; + size_t host_len; + + assert(salen <= sizeof(pool->peer.sockaddr.bytes)); + + if ((host_len = h2o_socket_getnumerichost(sa, salen, host)) == SIZE_MAX) { + if (sa->sa_family != AF_UNIX) + h2o_fatal("failed to convert a non-unix socket address to a numerical representation"); + /* use the sockaddr_un::sun_path as the SNI indicator (is that the right thing to do?) */ + strcpy(host, ((struct sockaddr_un *)sa)->sun_path); + host_len = strlen(host); + } + + common_init(pool, H2O_SOCKETPOOL_TYPE_SOCKADDR, h2o_iovec_init(host, host_len), is_ssl, capacity); + memcpy(&pool->peer.sockaddr.bytes, sa, salen); + pool->peer.sockaddr.len = salen; +} + +void h2o_socketpool_init_by_hostport(h2o_socketpool_t *pool, h2o_iovec_t host, uint16_t port, int is_ssl, size_t capacity) +{ + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + + if (h2o_hostinfo_aton(host, &sin.sin_addr) == 0) { + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + h2o_socketpool_init_by_address(pool, (void *)&sin, sizeof(sin), is_ssl, capacity); + return; + } + + common_init(pool, H2O_SOCKETPOOL_TYPE_NAMED, host, is_ssl, capacity); + pool->peer.named_serv.base = h2o_mem_alloc(sizeof(H2O_UINT16_LONGEST_STR)); + pool->peer.named_serv.len = sprintf(pool->peer.named_serv.base, "%u", (unsigned)port); +} + +void h2o_socketpool_dispose(h2o_socketpool_t *pool) +{ + pthread_mutex_lock(&pool->_shared.mutex); + while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { + struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + destroy_attached(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + } + pthread_mutex_unlock(&pool->_shared.mutex); + pthread_mutex_destroy(&pool->_shared.mutex); + + if (pool->_interval_cb.loop != NULL) { + h2o_timeout_unlink(&pool->_interval_cb.entry); + h2o_timeout_dispose(pool->_interval_cb.loop, &pool->_interval_cb.timeout); + } + free(pool->peer.host.base); + switch (pool->type) { + case H2O_SOCKETPOOL_TYPE_NAMED: + free(pool->peer.named_serv.base); + break; + case H2O_SOCKETPOOL_TYPE_SOCKADDR: + break; + } +} + +void h2o_socketpool_set_timeout(h2o_socketpool_t *pool, h2o_loop_t *loop, uint64_t msec) +{ + pool->timeout = msec; + + pool->_interval_cb.loop = loop; + h2o_timeout_init(loop, &pool->_interval_cb.timeout, 1000); + pool->_interval_cb.entry.cb = on_timeout; + + h2o_timeout_link(loop, &pool->_interval_cb.timeout, &pool->_interval_cb.entry); +} + +static void call_connect_cb(h2o_socketpool_connect_request_t *req, const char *errstr) +{ + h2o_socketpool_connect_cb cb = req->cb; + h2o_socket_t *sock = req->sock; + void *data = req->data; + + free(req); + cb(sock, errstr, data); +} + +static void on_connect(h2o_socket_t *sock, const char *err) +{ + h2o_socketpool_connect_request_t *req = sock->data; + const char *errstr = NULL; + + assert(req->sock == sock); + + if (err != NULL) { + h2o_socket_close(sock); + req->sock = NULL; + errstr = "connection failed"; + } + call_connect_cb(req, errstr); +} + +static void on_close(void *data) +{ + h2o_socketpool_t *pool = data; + __sync_sub_and_fetch(&pool->_shared.count, 1); +} + +static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen) +{ + req->sock = h2o_socket_connect(req->loop, addr, addrlen, on_connect); + if (req->sock == NULL) { + __sync_sub_and_fetch(&req->pool->_shared.count, 1); + call_connect_cb(req, "failed to connect to host"); + return; + } + req->sock->data = req; + req->sock->on_close.cb = on_close; + req->sock->on_close.data = req->pool; +} + +static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req) +{ + h2o_socketpool_connect_request_t *req = _req; + + assert(getaddr_req == req->getaddr_req); + req->getaddr_req = NULL; + + if (errstr != NULL) { + __sync_sub_and_fetch(&req->pool->_shared.count, 1); + call_connect_cb(req, errstr); + return; + } + + struct addrinfo *selected = h2o_hostinfo_select_one(res); + start_connect(req, selected->ai_addr, selected->ai_addrlen); +} + +void h2o_socketpool_connect(h2o_socketpool_connect_request_t **_req, h2o_socketpool_t *pool, h2o_loop_t *loop, + h2o_multithread_receiver_t *getaddr_receiver, h2o_socketpool_connect_cb cb, void *data) +{ + struct pool_entry_t *entry = NULL; + + if (_req != NULL) + *_req = NULL; + + /* fetch an entry and return it */ + pthread_mutex_lock(&pool->_shared.mutex); + destroy_expired(pool); + while (1) { + if (h2o_linklist_is_empty(&pool->_shared.sockets)) + break; + entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + h2o_linklist_unlink(&entry->link); + pthread_mutex_unlock(&pool->_shared.mutex); + + /* test if the connection is still alive */ + char buf[1]; + ssize_t rret = recv(entry->sockinfo.fd, buf, 1, MSG_PEEK); + if (rret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* yes! return it */ + h2o_socket_t *sock = h2o_socket_import(loop, &entry->sockinfo); + free(entry); + sock->on_close.cb = on_close; + sock->on_close.data = pool; + cb(sock, NULL, data); + return; + } + + /* connection is dead, report, close, and retry */ + if (rret <= 0) { + static long counter = 0; + if (__sync_fetch_and_add(&counter, 1) == 0) + fprintf(stderr, "[WARN] detected close by upstream before the expected timeout (see issue #679)\n"); + } else { + static long counter = 0; + if (__sync_fetch_and_add(&counter, 1) == 0) + fprintf(stderr, "[WARN] unexpectedly received data to a pooled socket (see issue #679)\n"); + } + destroy_detached(entry); + pthread_mutex_lock(&pool->_shared.mutex); + } + pthread_mutex_unlock(&pool->_shared.mutex); + + /* FIXME repsect `capacity` */ + __sync_add_and_fetch(&pool->_shared.count, 1); + + /* prepare request object */ + h2o_socketpool_connect_request_t *req = h2o_mem_alloc(sizeof(*req)); + *req = (h2o_socketpool_connect_request_t){data, cb, pool, loop}; + if (_req != NULL) + *_req = req; + + switch (pool->type) { + case H2O_SOCKETPOOL_TYPE_NAMED: + /* resolve the name, and connect */ + req->getaddr_req = h2o_hostinfo_getaddr(getaddr_receiver, pool->peer.host, pool->peer.named_serv, AF_UNSPEC, SOCK_STREAM, + IPPROTO_TCP, AI_ADDRCONFIG | AI_NUMERICSERV, on_getaddr, req); + break; + case H2O_SOCKETPOOL_TYPE_SOCKADDR: + /* connect (using sockaddr_in) */ + start_connect(req, (void *)&pool->peer.sockaddr.bytes, pool->peer.sockaddr.len); + break; + } +} + +void h2o_socketpool_cancel_connect(h2o_socketpool_connect_request_t *req) +{ + if (req->getaddr_req != NULL) { + h2o_hostinfo_getaddr_cancel(req->getaddr_req); + req->getaddr_req = NULL; + } + if (req->sock != NULL) + h2o_socket_close(req->sock); + free(req); +} + +int h2o_socketpool_return(h2o_socketpool_t *pool, h2o_socket_t *sock) +{ + struct pool_entry_t *entry; + + /* reset the on_close callback */ + assert(sock->on_close.data == pool); + sock->on_close.cb = NULL; + sock->on_close.data = NULL; + + entry = h2o_mem_alloc(sizeof(*entry)); + if (h2o_socket_export(sock, &entry->sockinfo) != 0) { + free(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + return -1; + } + memset(&entry->link, 0, sizeof(entry->link)); + entry->added_at = h2o_now(h2o_socket_get_loop(sock)); + + pthread_mutex_lock(&pool->_shared.mutex); + destroy_expired(pool); + h2o_linklist_insert(&pool->_shared.sockets, &entry->link); + pthread_mutex_unlock(&pool->_shared.mutex); + + return 0; +} |