diff options
Diffstat (limited to 'web/server/h2o/libh2o/lib/common')
20 files changed, 7259 insertions, 0 deletions
diff --git a/web/server/h2o/libh2o/lib/common/cache.c b/web/server/h2o/libh2o/lib/common/cache.c new file mode 100644 index 000000000..cc8d8f007 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/cache.c @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <pthread.h> +#include "khash.h" +#include "h2o/cache.h" +#include "h2o/linklist.h" +#include "h2o/memory.h" +#include "h2o/string_.h" + +static h2o_cache_hashcode_t get_keyhash(h2o_cache_ref_t *ref); +static int is_equal(h2o_cache_ref_t *x, h2o_cache_ref_t *y); + +KHASH_INIT(cache, h2o_cache_ref_t *, char, 0, get_keyhash, is_equal) + +struct st_h2o_cache_t { + int flags; + khash_t(cache) * table; + size_t size; + size_t capacity; + h2o_linklist_t lru; + h2o_linklist_t age; + uint64_t duration; + void (*destroy_cb)(h2o_iovec_t value); + pthread_mutex_t mutex; /* only used if (flags & H2O_CACHE_FLAG_MULTITHREADED) != 0 */ +}; + +static h2o_cache_hashcode_t get_keyhash(h2o_cache_ref_t *ref) +{ + return ref->keyhash; +} + +static int is_equal(h2o_cache_ref_t *x, h2o_cache_ref_t *y) +{ + return x->key.len == y->key.len && memcmp(x->key.base, y->key.base, x->key.len) == 0; +} + +static void lock_cache(h2o_cache_t *cache) +{ + if ((cache->flags & H2O_CACHE_FLAG_MULTITHREADED) != 0) + pthread_mutex_lock(&cache->mutex); +} + +static void unlock_cache(h2o_cache_t *cache) +{ + if ((cache->flags & H2O_CACHE_FLAG_MULTITHREADED) != 0) + pthread_mutex_unlock(&cache->mutex); +} + +static void erase_ref(h2o_cache_t *cache, khiter_t iter, int reuse) +{ + h2o_cache_ref_t *ref = kh_key(cache->table, iter); + + if (!reuse) + kh_del(cache, cache->table, iter); + h2o_linklist_unlink(&ref->_lru_link); + h2o_linklist_unlink(&ref->_age_link); + cache->size -= ref->value.len; + + h2o_cache_release(cache, ref); +} + +static int64_t get_timeleft(h2o_cache_t *cache, h2o_cache_ref_t *ref, uint64_t now) +{ + return (int64_t)(ref->at + cache->duration) - now; +} + +static void purge(h2o_cache_t *cache, uint64_t now) +{ + /* by cache size */ + while (cache->capacity < cache->size) { + h2o_cache_ref_t *last; + assert(!h2o_linklist_is_empty(&cache->lru)); + last = H2O_STRUCT_FROM_MEMBER(h2o_cache_ref_t, _lru_link, cache->lru.next); + erase_ref(cache, kh_get(cache, cache->table, last), 0); + } + /* by TTL */ + while (!h2o_linklist_is_empty(&cache->age)) { + h2o_cache_ref_t *oldest = H2O_STRUCT_FROM_MEMBER(h2o_cache_ref_t, _age_link, cache->age.next); + if (get_timeleft(cache, oldest, now) >= 0) + break; + erase_ref(cache, kh_get(cache, cache->table, oldest), 0); + } +} + +h2o_cache_hashcode_t h2o_cache_calchash(const char *s, size_t l) +{ + h2o_cache_hashcode_t h = 0; + for (; l != 0; --l) + h = (h << 5) - h + ((unsigned char *)s)[l - 1]; + return h; +} + +h2o_cache_t *h2o_cache_create(int flags, size_t capacity, uint64_t duration, void (*destroy_cb)(h2o_iovec_t value)) +{ + h2o_cache_t *cache = h2o_mem_alloc(sizeof(*cache)); + + cache->flags = flags; + cache->table = kh_init(cache); + cache->size = 0; + cache->capacity = capacity; + h2o_linklist_init_anchor(&cache->lru); + h2o_linklist_init_anchor(&cache->age); + cache->duration = duration; + cache->destroy_cb = destroy_cb; + if ((cache->flags & H2O_CACHE_FLAG_MULTITHREADED) != 0) + pthread_mutex_init(&cache->mutex, NULL); + + return cache; +} + +void h2o_cache_destroy(h2o_cache_t *cache) +{ + h2o_cache_clear(cache); + kh_destroy(cache, cache->table); + if ((cache->flags & H2O_CACHE_FLAG_MULTITHREADED) != 0) + pthread_mutex_destroy(&cache->mutex); + free(cache); +} + +void h2o_cache_clear(h2o_cache_t *cache) +{ + lock_cache(cache); + + while (!h2o_linklist_is_empty(&cache->lru)) { + h2o_cache_ref_t *ref = H2O_STRUCT_FROM_MEMBER(h2o_cache_ref_t, _lru_link, cache->lru.next); + erase_ref(cache, kh_get(cache, cache->table, ref), 0); + } + assert(h2o_linklist_is_linked(&cache->age)); + assert(kh_size(cache->table) == 0); + assert(cache->size == 0); + + unlock_cache(cache); +} + +h2o_cache_ref_t *h2o_cache_fetch(h2o_cache_t *cache, uint64_t now, h2o_iovec_t key, h2o_cache_hashcode_t keyhash) +{ + h2o_cache_ref_t search_key, *ref; + khiter_t iter; + int64_t timeleft; + + if (keyhash == 0) + keyhash = h2o_cache_calchash(key.base, key.len); + search_key.key = key; + search_key.keyhash = keyhash; + + lock_cache(cache); + + purge(cache, now); + + if ((iter = kh_get(cache, cache->table, &search_key)) == kh_end(cache->table)) + goto NotFound; + + /* found */ + ref = kh_key(cache->table, iter); + timeleft = get_timeleft(cache, ref, now); + if (timeleft < 0) + goto NotFound; + if ((cache->flags & H2O_CACHE_FLAG_EARLY_UPDATE) != 0 && timeleft < 10 && !ref->_requested_early_update) { + ref->_requested_early_update = 1; + goto NotFound; + } + /* move the entry to the top of LRU */ + h2o_linklist_unlink(&ref->_lru_link); + h2o_linklist_insert(&cache->lru, &ref->_lru_link); + __sync_fetch_and_add(&ref->_refcnt, 1); + + /* unlock and return the found entry */ + unlock_cache(cache); + return ref; + +NotFound: + unlock_cache(cache); + return NULL; +} + +void h2o_cache_release(h2o_cache_t *cache, h2o_cache_ref_t *ref) +{ + if (__sync_fetch_and_sub(&ref->_refcnt, 1) == 1) { + assert(!h2o_linklist_is_linked(&ref->_lru_link)); + assert(!h2o_linklist_is_linked(&ref->_age_link)); + if (cache->destroy_cb != NULL) + cache->destroy_cb(ref->value); + free(ref->key.base); + free(ref); + } +} + +int h2o_cache_set(h2o_cache_t *cache, uint64_t now, h2o_iovec_t key, h2o_cache_hashcode_t keyhash, h2o_iovec_t value) +{ + h2o_cache_ref_t *newref; + khiter_t iter; + int existed; + + if (keyhash == 0) + keyhash = h2o_cache_calchash(key.base, key.len); + + /* create newref */ + newref = h2o_mem_alloc(sizeof(*newref)); + *newref = (h2o_cache_ref_t){h2o_strdup(NULL, key.base, key.len), keyhash, now, value, 0, {NULL}, {NULL}, 1}; + + lock_cache(cache); + + /* set or replace the named value */ + iter = kh_get(cache, cache->table, newref); + if (iter != kh_end(cache->table)) { + erase_ref(cache, iter, 1); + kh_key(cache->table, iter) = newref; + existed = 1; + } else { + int unused; + kh_put(cache, cache->table, newref, &unused); + existed = 0; + } + h2o_linklist_insert(&cache->lru, &newref->_lru_link); + h2o_linklist_insert(&cache->age, &newref->_age_link); + cache->size += newref->value.len; + + purge(cache, now); + + unlock_cache(cache); + + return existed; +} + +void h2o_cache_delete(h2o_cache_t *cache, uint64_t now, h2o_iovec_t key, h2o_cache_hashcode_t keyhash) +{ + h2o_cache_ref_t search_key; + khiter_t iter; + + if (keyhash == 0) + keyhash = h2o_cache_calchash(key.base, key.len); + search_key.key = key; + search_key.keyhash = keyhash; + + lock_cache(cache); + + purge(cache, now); + + if ((iter = kh_get(cache, cache->table, &search_key)) != kh_end(cache->table)) + erase_ref(cache, iter, 0); + + unlock_cache(cache); +} + +size_t h2o_cache_get_capacity(h2o_cache_t *cache) +{ + return cache->capacity; +} + +uint64_t h2o_cache_get_duration(h2o_cache_t *cache) +{ + return cache->duration; +} diff --git a/web/server/h2o/libh2o/lib/common/file.c b/web/server/h2o/libh2o/lib/common/file.c new file mode 100644 index 000000000..3cf5ac5d1 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/file.c @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <errno.h> +#include <fcntl.h> +#include <stdint.h> +#include <stdlib.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#include "h2o/file.h" + +h2o_iovec_t h2o_file_read(const char *fn) +{ + int fd; + struct stat st; + h2o_iovec_t ret = {NULL}; + + /* open */ + if ((fd = open(fn, O_RDONLY | O_CLOEXEC)) == -1) + goto Error; + if (fstat(fd, &st)) + goto Error; + /* allocate memory */ + if (st.st_size > SIZE_MAX) { + errno = ENOMEM; + goto Error; + } + if ((ret.base = malloc((size_t)st.st_size)) == NULL) + goto Error; + /* read */ + while (ret.len != (size_t)st.st_size) { + ssize_t r; + while ((r = read(fd, ret.base + ret.len, (size_t)st.st_size - ret.len)) == -1 && errno == EINTR) + ; + if (r <= 0) + goto Error; + ret.len += r; + } + /* close */ + close(fd); + return ret; + +Error: + if (fd != -1) + close(fd); + free(ret.base); + return (h2o_iovec_t){NULL}; +} diff --git a/web/server/h2o/libh2o/lib/common/filecache.c b/web/server/h2o/libh2o/lib/common/filecache.c new file mode 100644 index 000000000..747a1ffa6 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/filecache.c @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <errno.h> +#include <fcntl.h> +#include <stddef.h> +#include <unistd.h> +#include "khash.h" +#include "h2o/memory.h" +#include "h2o/filecache.h" + +KHASH_SET_INIT_STR(opencache_set) + +struct st_h2o_filecache_t { + khash_t(opencache_set) * hash; + h2o_linklist_t lru; + size_t capacity; +}; + +static inline void release_from_cache(h2o_filecache_t *cache, khiter_t iter) +{ + const char *path = kh_key(cache->hash, iter); + h2o_filecache_ref_t *ref = H2O_STRUCT_FROM_MEMBER(h2o_filecache_ref_t, _path, path); + + /* detach from list */ + kh_del(opencache_set, cache->hash, iter); + h2o_linklist_unlink(&ref->_lru); + + /* and close */ + h2o_filecache_close_file(ref); +} + +h2o_filecache_t *h2o_filecache_create(size_t capacity) +{ + h2o_filecache_t *cache = h2o_mem_alloc(sizeof(*cache)); + + cache->hash = kh_init(opencache_set); + h2o_linklist_init_anchor(&cache->lru); + cache->capacity = capacity; + + return cache; +} + +void h2o_filecache_destroy(h2o_filecache_t *cache) +{ + h2o_filecache_clear(cache); + assert(kh_size(cache->hash) == 0); + assert(h2o_linklist_is_empty(&cache->lru)); + kh_destroy(opencache_set, cache->hash); + free(cache); +} + +void h2o_filecache_clear(h2o_filecache_t *cache) +{ + khiter_t iter; + for (iter = kh_begin(cache->hash); iter != kh_end(cache->hash); ++iter) { + if (!kh_exist(cache->hash, iter)) + continue; + release_from_cache(cache, iter); + } + assert(kh_size(cache->hash) == 0); +} + +h2o_filecache_ref_t *h2o_filecache_open_file(h2o_filecache_t *cache, const char *path, int oflag) +{ + khiter_t iter = kh_get(opencache_set, cache->hash, path); + h2o_filecache_ref_t *ref; + int dummy; + + /* lookup cache, and return the one if found */ + if (iter != kh_end(cache->hash)) { + ref = H2O_STRUCT_FROM_MEMBER(h2o_filecache_ref_t, _path, kh_key(cache->hash, iter)); + ++ref->_refcnt; + goto Exit; + } + + /* create a new cache entry */ + ref = h2o_mem_alloc(offsetof(h2o_filecache_ref_t, _path) + strlen(path) + 1); + ref->_refcnt = 1; + ref->_lru = (h2o_linklist_t){NULL}; + strcpy(ref->_path, path); + + /* if cache is used, then... */ + if (cache->capacity != 0) { + /* purge one entry from LRU if cache is full */ + if (kh_size(cache->hash) == cache->capacity) { + h2o_filecache_ref_t *purge_ref = H2O_STRUCT_FROM_MEMBER(h2o_filecache_ref_t, _lru, cache->lru.prev); + khiter_t purge_iter = kh_get(opencache_set, cache->hash, purge_ref->_path); + assert(purge_iter != kh_end(cache->hash)); + release_from_cache(cache, purge_iter); + } + /* assign the new entry */ + ++ref->_refcnt; + kh_put(opencache_set, cache->hash, ref->_path, &dummy); + h2o_linklist_insert(cache->lru.next, &ref->_lru); + } + + /* open the file, or memoize the error */ + if ((ref->fd = open(path, oflag)) != -1 && fstat(ref->fd, &ref->st) == 0) { + ref->_last_modified.str[0] = '\0'; + ref->_etag.len = 0; + } else { + ref->open_err = errno; + if (ref->fd != -1) { + close(ref->fd); + ref->fd = -1; + } + } + +Exit: + /* if the cache entry retains an error, return it instead of the reference */ + if (ref->fd == -1) { + errno = ref->open_err; + h2o_filecache_close_file(ref); + ref = NULL; + } + return ref; +} + +void h2o_filecache_close_file(h2o_filecache_ref_t *ref) +{ + if (--ref->_refcnt != 0) + return; + assert(!h2o_linklist_is_linked(&ref->_lru)); + if (ref->fd != -1) { + close(ref->fd); + ref->fd = -1; + } + free(ref); +} + +struct tm *h2o_filecache_get_last_modified(h2o_filecache_ref_t *ref, char *outbuf) +{ + assert(ref->fd != -1); + if (ref->_last_modified.str[0] == '\0') { + gmtime_r(&ref->st.st_mtime, &ref->_last_modified.gm); + h2o_time2str_rfc1123(ref->_last_modified.str, &ref->_last_modified.gm); + } + if (outbuf != NULL) + memcpy(outbuf, ref->_last_modified.str, H2O_TIMESTR_RFC1123_LEN + 1); + return &ref->_last_modified.gm; +} + +size_t h2o_filecache_get_etag(h2o_filecache_ref_t *ref, char *outbuf) +{ + assert(ref->fd != -1); + if (ref->_etag.len == 0) + ref->_etag.len = sprintf(ref->_etag.buf, "\"%08x-%zx\"", (unsigned)ref->st.st_mtime, (size_t)ref->st.st_size); + memcpy(outbuf, ref->_etag.buf, ref->_etag.len + 1); + return ref->_etag.len; +} diff --git a/web/server/h2o/libh2o/lib/common/hostinfo.c b/web/server/h2o/libh2o/lib/common/hostinfo.c new file mode 100644 index 000000000..7b481e296 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/hostinfo.c @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include "h2o/hostinfo.h" + +struct st_h2o_hostinfo_getaddr_req_t { + h2o_multithread_receiver_t *_receiver; + h2o_hostinfo_getaddr_cb _cb; + void *cbdata; + h2o_linklist_t _pending; + union { + struct { + char *name; + char *serv; + struct addrinfo hints; + } _in; + struct { + h2o_multithread_message_t message; + const char *errstr; + struct addrinfo *ai; + } _out; + }; +}; + +static struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + h2o_linklist_t pending; /* anchor of h2o_hostinfo_getaddr_req_t::_pending */ + size_t num_threads; + size_t num_threads_idle; +} queue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, {&queue.pending, &queue.pending}, 0, 0}; + +size_t h2o_hostinfo_max_threads = 1; + +static void lookup_and_respond(h2o_hostinfo_getaddr_req_t *req) +{ + struct addrinfo *res; + + int ret = getaddrinfo(req->_in.name, req->_in.serv, &req->_in.hints, &res); + req->_out.message = (h2o_multithread_message_t){{NULL}}; + if (ret != 0) { + req->_out.errstr = gai_strerror(ret); + req->_out.ai = NULL; + } else { + req->_out.errstr = NULL; + req->_out.ai = res; + } + + h2o_multithread_send_message(req->_receiver, &req->_out.message); +} + +static void *lookup_thread_main(void *_unused) +{ + pthread_mutex_lock(&queue.mutex); + + while (1) { + --queue.num_threads_idle; + while (!h2o_linklist_is_empty(&queue.pending)) { + h2o_hostinfo_getaddr_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_hostinfo_getaddr_req_t, _pending, queue.pending.next); + h2o_linklist_unlink(&req->_pending); + pthread_mutex_unlock(&queue.mutex); + lookup_and_respond(req); + pthread_mutex_lock(&queue.mutex); + } + ++queue.num_threads_idle; + pthread_cond_wait(&queue.cond, &queue.mutex); + } + + h2o_fatal("unreachable"); + return NULL; +} + +static void create_lookup_thread(void) +{ + pthread_t tid; + pthread_attr_t attr; + int ret; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, 1); + pthread_attr_setstacksize(&attr, 100 * 1024); + if ((ret = pthread_create(&tid, NULL, lookup_thread_main, NULL)) != 0) { + if (queue.num_threads == 0) { + fprintf(stderr, "failed to start first thread for getaddrinfo:%s\n", strerror(ret)); + abort(); + } else { + perror("pthread_create(for getaddrinfo)"); + } + return; + } + + ++queue.num_threads; + ++queue.num_threads_idle; +} + +h2o_hostinfo_getaddr_req_t *h2o_hostinfo_getaddr(h2o_multithread_receiver_t *receiver, h2o_iovec_t name, h2o_iovec_t serv, + int family, int socktype, int protocol, int flags, h2o_hostinfo_getaddr_cb cb, + void *cbdata) +{ + h2o_hostinfo_getaddr_req_t *req = h2o_mem_alloc(sizeof(*req) + name.len + 1 + serv.len + 1); + req->_receiver = receiver; + req->_cb = cb; + req->cbdata = cbdata; + req->_pending = (h2o_linklist_t){NULL}; + req->_in.name = (char *)req + sizeof(*req); + memcpy(req->_in.name, name.base, name.len); + req->_in.name[name.len] = '\0'; + req->_in.serv = req->_in.name + name.len + 1; + memcpy(req->_in.serv, serv.base, serv.len); + req->_in.serv[serv.len] = '\0'; + memset(&req->_in.hints, 0, sizeof(req->_in.hints)); + req->_in.hints.ai_family = family; + req->_in.hints.ai_socktype = socktype; + req->_in.hints.ai_protocol = protocol; + req->_in.hints.ai_flags = flags; + + h2o__hostinfo_getaddr_dispatch(req); + + return req; +} + +void h2o__hostinfo_getaddr_dispatch(h2o_hostinfo_getaddr_req_t *req) +{ + pthread_mutex_lock(&queue.mutex); + + h2o_linklist_insert(&queue.pending, &req->_pending); + + if (queue.num_threads_idle == 0 && queue.num_threads < h2o_hostinfo_max_threads) + create_lookup_thread(); + + pthread_cond_signal(&queue.cond); + pthread_mutex_unlock(&queue.mutex); +} + +void h2o_hostinfo_getaddr_cancel(h2o_hostinfo_getaddr_req_t *req) +{ + int should_free = 0; + + pthread_mutex_lock(&queue.mutex); + + if (h2o_linklist_is_linked(&req->_pending)) { + h2o_linklist_unlink(&req->_pending); + should_free = 1; + } else { + req->_cb = NULL; + } + + pthread_mutex_unlock(&queue.mutex); + + if (should_free) + free(req); +} + +void h2o_hostinfo_getaddr_receiver(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages) +{ + while (!h2o_linklist_is_empty(messages)) { + h2o_hostinfo_getaddr_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_hostinfo_getaddr_req_t, _out.message.link, messages->next); + h2o_linklist_unlink(&req->_out.message.link); + h2o_hostinfo_getaddr_cb cb = req->_cb; + if (cb != NULL) { + req->_cb = NULL; + cb(req, req->_out.errstr, req->_out.ai, req->cbdata); + } + if (req->_out.ai != NULL) + freeaddrinfo(req->_out.ai); + free(req); + } +} + +static const char *fetch_aton_digit(const char *p, const char *end, unsigned char *value) +{ + size_t ndigits = 0; + int v = 0; + + while (p != end && ('0' <= *p && *p <= '9')) { + v = v * 10 + *p++ - '0'; + ++ndigits; + } + if (!(1 <= ndigits && ndigits <= 3)) + return NULL; + if (v > 255) + return NULL; + *value = (unsigned char)v; + return p; +} + +int h2o_hostinfo_aton(h2o_iovec_t host, struct in_addr *addr) +{ + union { + int32_t n; + unsigned char c[4]; + } value; + const char *p = host.base, *end = p + host.len; + size_t ndots = 0; + + while (1) { + if ((p = fetch_aton_digit(p, end, value.c + ndots)) == NULL) + return -1; + if (ndots == 3) + break; + if (p == end || !(*p == '.')) + return -1; + ++p; + ++ndots; + } + if (p != end) + return -1; + + addr->s_addr = value.n; + return 0; +} diff --git a/web/server/h2o/libh2o/lib/common/http1client.c b/web/server/h2o/libh2o/lib/common/http1client.c new file mode 100644 index 000000000..8547ea817 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/http1client.c @@ -0,0 +1,582 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <arpa/inet.h> +#include <netdb.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include "picohttpparser.h" +#include "h2o.h" + +struct st_h2o_http1client_private_t { + h2o_http1client_t super; + union { + h2o_http1client_connect_cb on_connect; + h2o_http1client_head_cb on_head; + h2o_http1client_body_cb on_body; + } _cb; + h2o_timeout_entry_t _timeout; + int _method_is_head; + h2o_hostinfo_getaddr_req_t *_getaddr_req; + int _can_keepalive; + union { + struct { + size_t bytesleft; + } content_length; + struct { + struct phr_chunked_decoder decoder; + size_t bytes_decoded_in_buf; + } chunked; + } _body_decoder; +}; + +static void close_client(struct st_h2o_http1client_private_t *client) +{ + if (client->_getaddr_req != NULL) { + h2o_hostinfo_getaddr_cancel(client->_getaddr_req); + client->_getaddr_req = NULL; + } + if (client->super.ssl.server_name != NULL) + free(client->super.ssl.server_name); + if (client->super.sock != NULL) { + if (client->super.sockpool.pool != NULL && client->_can_keepalive) { + /* we do not send pipelined requests, and thus can trash all the received input at the end of the request */ + h2o_buffer_consume(&client->super.sock->input, client->super.sock->input->size); + h2o_socketpool_return(client->super.sockpool.pool, client->super.sock); + } else { + h2o_socket_close(client->super.sock); + } + } else { + if (client->super.sockpool.connect_req != NULL) { + h2o_socketpool_cancel_connect(client->super.sockpool.connect_req); + client->super.sockpool.connect_req = NULL; + } + } + if (h2o_timeout_is_linked(&client->_timeout)) + h2o_timeout_unlink(&client->_timeout); + free(client); +} + +static void on_body_error(struct st_h2o_http1client_private_t *client, const char *errstr) +{ + client->_can_keepalive = 0; + client->_cb.on_body(&client->super, errstr); + close_client(client); +} + +static void on_body_timeout(h2o_timeout_entry_t *entry) +{ + struct st_h2o_http1client_private_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_private_t, _timeout, entry); + on_body_error(client, "I/O timeout"); +} + +static void on_body_until_close(h2o_socket_t *sock, const char *err) +{ + struct st_h2o_http1client_private_t *client = sock->data; + + h2o_timeout_unlink(&client->_timeout); + + if (err != NULL) { + client->_cb.on_body(&client->super, h2o_http1client_error_is_eos); + close_client(client); + return; + } + + if (sock->bytes_read != 0) { + if (client->_cb.on_body(&client->super, NULL) != 0) { + close_client(client); + return; + } + } + + h2o_timeout_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->_timeout); +} + +static void on_body_content_length(h2o_socket_t *sock, const char *err) +{ + struct st_h2o_http1client_private_t *client = sock->data; + + h2o_timeout_unlink(&client->_timeout); + + if (err != NULL) { + on_body_error(client, "I/O error (body; content-length)"); + return; + } + + if (sock->bytes_read != 0 || client->_body_decoder.content_length.bytesleft == 0) { + const char *errstr; + int ret; + if (client->_body_decoder.content_length.bytesleft <= sock->bytes_read) { + if (client->_body_decoder.content_length.bytesleft < sock->bytes_read) { + /* remove the trailing garbage from buf, and disable keepalive */ + client->super.sock->input->size -= sock->bytes_read - client->_body_decoder.content_length.bytesleft; + client->_can_keepalive = 0; + } + client->_body_decoder.content_length.bytesleft = 0; + errstr = h2o_http1client_error_is_eos; + } else { + client->_body_decoder.content_length.bytesleft -= sock->bytes_read; + errstr = NULL; + } + ret = client->_cb.on_body(&client->super, errstr); + if (errstr == h2o_http1client_error_is_eos) { + close_client(client); + return; + } else if (ret != 0) { + client->_can_keepalive = 0; + close_client(client); + return; + } + } + + h2o_timeout_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->_timeout); +} + +static void on_body_chunked(h2o_socket_t *sock, const char *err) +{ + struct st_h2o_http1client_private_t *client = sock->data; + h2o_buffer_t *inbuf; + + h2o_timeout_unlink(&client->_timeout); + + if (err != NULL) { + if (err == h2o_socket_error_closed && !phr_decode_chunked_is_in_data(&client->_body_decoder.chunked.decoder)) { + /* + * if the peer closed after a full chunk, treat this + * as if the transfer had complete, browsers appear to ignore + * a missing 0\r\n chunk + */ + client->_can_keepalive = 0; + client->_cb.on_body(&client->super, h2o_http1client_error_is_eos); + close_client(client); + } else { + on_body_error(client, "I/O error (body; chunked)"); + } + return; + } + + inbuf = client->super.sock->input; + if (sock->bytes_read != 0) { + const char *errstr; + int cb_ret; + size_t newsz = sock->bytes_read; + switch (phr_decode_chunked(&client->_body_decoder.chunked.decoder, inbuf->bytes + inbuf->size - newsz, &newsz)) { + case -1: /* error */ + newsz = sock->bytes_read; + client->_can_keepalive = 0; + errstr = "failed to parse the response (chunked)"; + break; + case -2: /* incomplete */ + errstr = NULL; + break; + default: /* complete, with garbage on tail; should disable keepalive */ + client->_can_keepalive = 0; + /* fallthru */ + case 0: /* complete */ + errstr = h2o_http1client_error_is_eos; + break; + } + inbuf->size -= sock->bytes_read - newsz; + cb_ret = client->_cb.on_body(&client->super, errstr); + if (errstr != NULL) { + close_client(client); + return; + } else if (cb_ret != 0) { + client->_can_keepalive = 0; + close_client(client); + return; + } + } + + h2o_timeout_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->_timeout); +} + +static void on_error_before_head(struct st_h2o_http1client_private_t *client, const char *errstr) +{ + assert(!client->_can_keepalive); + client->_cb.on_head(&client->super, errstr, 0, 0, h2o_iovec_init(NULL, 0), NULL, 0, 0); + close_client(client); +} + +static void on_head(h2o_socket_t *sock, const char *err) +{ + struct st_h2o_http1client_private_t *client = sock->data; + int minor_version, http_status, rlen, is_eos; + const char *msg; +#define MAX_HEADERS 100 + h2o_header_t *headers; + h2o_iovec_t *header_names; + size_t msg_len, num_headers, i; + h2o_socket_cb reader; + h2o_mem_pool_t pool; + + h2o_timeout_unlink(&client->_timeout); + + if (err != NULL) { + on_error_before_head(client, "I/O error (head)"); + return; + } + + h2o_mem_init_pool(&pool); + + headers = h2o_mem_alloc_pool(&pool, sizeof(*headers) * MAX_HEADERS); + header_names = h2o_mem_alloc_pool(&pool, sizeof(*header_names) * MAX_HEADERS); + + /* continue parsing the responses until we see a final one */ + while (1) { + /* parse response */ + struct phr_header src_headers[MAX_HEADERS]; + num_headers = MAX_HEADERS; + rlen = phr_parse_response(sock->input->bytes, sock->input->size, &minor_version, &http_status, &msg, &msg_len, src_headers, + &num_headers, 0); + switch (rlen) { + case -1: /* error */ + on_error_before_head(client, "failed to parse the response"); + goto Exit; + case -2: /* incomplete */ + h2o_timeout_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->_timeout); + goto Exit; + } + /* fill-in the headers */ + for (i = 0; i != num_headers; ++i) { + const h2o_token_t *token; + char *orig_name = h2o_strdup(&pool, src_headers[i].name, src_headers[i].name_len).base; + h2o_strtolower((char *)src_headers[i].name, src_headers[i].name_len); + token = h2o_lookup_token(src_headers[i].name, src_headers[i].name_len); + if (token != NULL) { + headers[i].name = (h2o_iovec_t *)&token->buf; + } else { + header_names[i] = h2o_iovec_init(src_headers[i].name, src_headers[i].name_len); + headers[i].name = &header_names[i]; + } + headers[i].value = h2o_iovec_init(src_headers[i].value, src_headers[i].value_len); + headers[i].orig_name = orig_name; + } + + if (!(100 <= http_status && http_status <= 199 && http_status != 101)) + break; + + if (client->super.informational_cb != NULL && + client->super.informational_cb(&client->super, minor_version, http_status, h2o_iovec_init(msg, msg_len), headers, + num_headers) != 0) { + close_client(client); + goto Exit; + } + h2o_buffer_consume(&client->super.sock->input, rlen); + if (client->super.sock->input->size == 0) { + h2o_timeout_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->_timeout); + goto Exit; + } + } + + /* parse the headers */ + reader = on_body_until_close; + client->_can_keepalive = minor_version >= 1; + for (i = 0; i != num_headers; ++i) { + if (headers[i].name == &H2O_TOKEN_CONNECTION->buf) { + if (h2o_contains_token(headers[i].value.base, headers[i].value.len, H2O_STRLIT("keep-alive"), ',')) { + client->_can_keepalive = 1; + } else { + client->_can_keepalive = 0; + } + } else if (headers[i].name == &H2O_TOKEN_TRANSFER_ENCODING->buf) { + if (h2o_memis(headers[i].value.base, headers[i].value.len, H2O_STRLIT("chunked"))) { + /* precond: _body_decoder.chunked is zero-filled */ + client->_body_decoder.chunked.decoder.consume_trailer = 1; + reader = on_body_chunked; + } else if (h2o_memis(headers[i].value.base, headers[i].value.len, H2O_STRLIT("identity"))) { + /* continue */ + } else { + on_error_before_head(client, "unexpected type of transfer-encoding"); + goto Exit; + } + } else if (headers[i].name == &H2O_TOKEN_CONTENT_LENGTH->buf) { + if ((client->_body_decoder.content_length.bytesleft = h2o_strtosize(headers[i].value.base, headers[i].value.len)) == + SIZE_MAX) { + on_error_before_head(client, "invalid content-length"); + goto Exit; + } + if (reader != on_body_chunked) + reader = on_body_content_length; + } + } + + /* RFC 2616 4.4 */ + if (client->_method_is_head || http_status == 101 || http_status == 204 || http_status == 304) { + is_eos = 1; + } else { + is_eos = 0; + /* close the connection if impossible to determine the end of the response (RFC 7230 3.3.3) */ + if (reader == on_body_until_close) + client->_can_keepalive = 0; + } + + /* call the callback. sock may be stealed and stealed sock need rlen.*/ + client->_cb.on_body = client->_cb.on_head(&client->super, is_eos ? h2o_http1client_error_is_eos : NULL, minor_version, + http_status, h2o_iovec_init(msg, msg_len), headers, num_headers, rlen); + + if (is_eos) { + close_client(client); + goto Exit; + } else if (client->_cb.on_body == NULL) { + client->_can_keepalive = 0; + close_client(client); + goto Exit; + } + + h2o_buffer_consume(&client->super.sock->input, rlen); + client->super.sock->bytes_read = client->super.sock->input->size; + + client->_timeout.cb = on_body_timeout; + h2o_socket_read_start(sock, reader); + reader(client->super.sock, 0); + +Exit: + h2o_mem_clear_pool(&pool); +#undef MAX_HEADERS +} + +static void on_head_timeout(h2o_timeout_entry_t *entry) +{ + struct st_h2o_http1client_private_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_private_t, _timeout, entry); + on_error_before_head(client, "I/O timeout"); +} + +static void on_send_request(h2o_socket_t *sock, const char *err) +{ + struct st_h2o_http1client_private_t *client = sock->data; + + h2o_timeout_unlink(&client->_timeout); + + if (err != NULL) { + on_error_before_head(client, "I/O error (send request)"); + return; + } + + h2o_socket_read_start(client->super.sock, on_head); + client->_timeout.cb = on_head_timeout; + h2o_timeout_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->_timeout); +} + +static void on_send_timeout(h2o_timeout_entry_t *entry) +{ + struct st_h2o_http1client_private_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_private_t, _timeout, entry); + on_error_before_head(client, "I/O timeout"); +} + +static void on_connect_error(struct st_h2o_http1client_private_t *client, const char *errstr) +{ + assert(errstr != NULL); + client->_cb.on_connect(&client->super, errstr, NULL, NULL, NULL); + close_client(client); +} + +static void on_connection_ready(struct st_h2o_http1client_private_t *client) +{ + h2o_iovec_t *reqbufs; + size_t reqbufcnt; + + if ((client->_cb.on_head = client->_cb.on_connect(&client->super, NULL, &reqbufs, &reqbufcnt, &client->_method_is_head)) == + NULL) { + close_client(client); + return; + } + h2o_socket_write(client->super.sock, reqbufs, reqbufcnt, on_send_request); + /* TODO no need to set the timeout if all data has been written into TCP sendbuf */ + client->_timeout.cb = on_send_timeout; + h2o_timeout_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->_timeout); +} + +static void on_handshake_complete(h2o_socket_t *sock, const char *err) +{ + struct st_h2o_http1client_private_t *client = sock->data; + + h2o_timeout_unlink(&client->_timeout); + + if (err == NULL) { + /* success */ + } else if (err == h2o_socket_error_ssl_cert_name_mismatch && + (SSL_CTX_get_verify_mode(client->super.ctx->ssl_ctx) & SSL_VERIFY_PEER) == 0) { + /* peer verification skipped */ + } else { + on_connect_error(client, err); + return; + } + + on_connection_ready(client); +} + +static void on_connect(h2o_socket_t *sock, const char *err) +{ + struct st_h2o_http1client_private_t *client = sock->data; + + if (err != NULL) { + h2o_timeout_unlink(&client->_timeout); + on_connect_error(client, err); + return; + } + if (client->super.ssl.server_name != NULL && client->super.sock->ssl == NULL) { + h2o_socket_ssl_handshake(client->super.sock, client->super.ctx->ssl_ctx, client->super.ssl.server_name, + on_handshake_complete); + return; + } + + h2o_timeout_unlink(&client->_timeout); + + on_connection_ready(client); +} + +static void on_pool_connect(h2o_socket_t *sock, const char *errstr, void *data) +{ + struct st_h2o_http1client_private_t *client = data; + + client->super.sockpool.connect_req = NULL; + + if (sock == NULL) { + assert(errstr != NULL); + on_connect_error(client, errstr); + return; + } + + client->super.sock = sock; + sock->data = client; + on_connect(sock, NULL); +} + +static void on_connect_timeout(h2o_timeout_entry_t *entry) +{ + struct st_h2o_http1client_private_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_private_t, _timeout, entry); + on_connect_error(client, "connection timeout"); +} + +static void start_connect(struct st_h2o_http1client_private_t *client, struct sockaddr *addr, socklen_t addrlen) +{ + if ((client->super.sock = h2o_socket_connect(client->super.ctx->loop, addr, addrlen, on_connect)) == NULL) { + on_connect_error(client, "socket create error"); + return; + } + client->super.sock->data = client; +} + +static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_client) +{ + struct st_h2o_http1client_private_t *client = _client; + + assert(getaddr_req == client->_getaddr_req); + client->_getaddr_req = NULL; + + if (errstr != NULL) { + on_connect_error(client, errstr); + return; + } + + /* start connecting */ + struct addrinfo *selected = h2o_hostinfo_select_one(res); + start_connect(client, selected->ai_addr, selected->ai_addrlen); +} + +static struct st_h2o_http1client_private_t *create_client(h2o_http1client_t **_client, void *data, h2o_http1client_ctx_t *ctx, + h2o_iovec_t ssl_server_name, h2o_http1client_connect_cb cb) +{ + struct st_h2o_http1client_private_t *client = h2o_mem_alloc(sizeof(*client)); + + *client = (struct st_h2o_http1client_private_t){{ctx}}; + if (ssl_server_name.base != NULL) + client->super.ssl.server_name = h2o_strdup(NULL, ssl_server_name.base, ssl_server_name.len).base; + client->super.data = data; + client->_cb.on_connect = cb; + /* caller needs to setup _cb, timeout.cb, sock, and sock->data */ + + if (_client != NULL) + *_client = &client->super; + return client; +} + +const char *const h2o_http1client_error_is_eos = "end of stream"; + +void h2o_http1client_connect(h2o_http1client_t **_client, void *data, h2o_http1client_ctx_t *ctx, h2o_iovec_t host, uint16_t port, + int is_ssl, h2o_http1client_connect_cb cb) +{ + struct st_h2o_http1client_private_t *client; + char serv[sizeof("65536")]; + + /* setup */ + client = create_client(_client, data, ctx, is_ssl ? host : h2o_iovec_init(NULL, 0), cb); + client->_timeout.cb = on_connect_timeout; + h2o_timeout_link(ctx->loop, ctx->io_timeout, &client->_timeout); + + { /* directly call connect(2) if `host` is an IP address */ + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + if (h2o_hostinfo_aton(host, &sin.sin_addr) == 0) { + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + start_connect(client, (void *)&sin, sizeof(sin)); + return; + } + } + { /* directly call connect(2) if `host` refers to an UNIX-domain socket */ + struct sockaddr_un sa; + const char *to_sa_err; + if ((to_sa_err = h2o_url_host_to_sun(host, &sa)) != h2o_url_host_to_sun_err_is_not_unix_socket) { + if (to_sa_err != NULL) { + on_connect_error(client, to_sa_err); + return; + } + start_connect(client, (void *)&sa, sizeof(sa)); + return; + } + } + /* resolve destination and then connect */ + client->_getaddr_req = + h2o_hostinfo_getaddr(ctx->getaddr_receiver, host, h2o_iovec_init(serv, sprintf(serv, "%u", (unsigned)port)), AF_UNSPEC, + SOCK_STREAM, IPPROTO_TCP, AI_ADDRCONFIG | AI_NUMERICSERV, on_getaddr, client); +} + +void h2o_http1client_connect_with_pool(h2o_http1client_t **_client, void *data, h2o_http1client_ctx_t *ctx, + h2o_socketpool_t *sockpool, h2o_http1client_connect_cb cb) +{ + struct st_h2o_http1client_private_t *client = + create_client(_client, data, ctx, sockpool->is_ssl ? sockpool->peer.host : h2o_iovec_init(NULL, 0), cb); + client->super.sockpool.pool = sockpool; + client->_timeout.cb = on_connect_timeout; + h2o_timeout_link(ctx->loop, ctx->io_timeout, &client->_timeout); + h2o_socketpool_connect(&client->super.sockpool.connect_req, sockpool, ctx->loop, ctx->getaddr_receiver, on_pool_connect, + client); +} + +void h2o_http1client_cancel(h2o_http1client_t *_client) +{ + struct st_h2o_http1client_private_t *client = (void *)_client; + client->_can_keepalive = 0; + close_client(client); +} + +h2o_socket_t *h2o_http1client_steal_socket(h2o_http1client_t *_client) +{ + struct st_h2o_http1client_private_t *client = (void *)_client; + h2o_socket_t *sock = client->super.sock; + h2o_socket_read_stop(sock); + client->super.sock = NULL; + return sock; +} diff --git a/web/server/h2o/libh2o/lib/common/memcached.c b/web/server/h2o/libh2o/lib/common/memcached.c new file mode 100644 index 000000000..752ea2fcb --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/memcached.c @@ -0,0 +1,429 @@ +/* + * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <errno.h> +#include <inttypes.h> +#include <unistd.h> +#include "yrmcds.h" +#include "h2o/linklist.h" +#include "h2o/memcached.h" +#include "h2o/rand.h" +#include "h2o/string_.h" + +struct st_h2o_memcached_context_t { + pthread_mutex_t mutex; + pthread_cond_t cond; + h2o_linklist_t pending; + size_t num_threads_connected; + char *host; + uint16_t port; + int text_protocol; + h2o_iovec_t prefix; +}; + +struct st_h2o_memcached_conn_t { + h2o_memcached_context_t *ctx; + yrmcds yrmcds; + pthread_mutex_t mutex; + h2o_linklist_t inflight; + int writer_exit_requested; +}; + +enum en_h2o_memcached_req_type_t { REQ_TYPE_GET, REQ_TYPE_SET, REQ_TYPE_DELETE }; + +struct st_h2o_memcached_req_t { + enum en_h2o_memcached_req_type_t type; + h2o_linklist_t pending; + h2o_linklist_t inflight; + union { + struct { + h2o_multithread_receiver_t *receiver; + h2o_multithread_message_t message; + h2o_memcached_get_cb cb; + void *cb_data; + int value_is_encoded; + h2o_iovec_t value; + uint32_t serial; + } get; + struct { + h2o_iovec_t value; + uint32_t expiration; + } set; + } data; + struct { + size_t len; + char base[1]; + } key; +}; + +static h2o_memcached_req_t *create_req(h2o_memcached_context_t *ctx, enum en_h2o_memcached_req_type_t type, h2o_iovec_t key, + int encode_key) +{ + h2o_memcached_req_t *req = h2o_mem_alloc(offsetof(h2o_memcached_req_t, key.base) + ctx->prefix.len + + (encode_key ? (key.len + 2) / 3 * 4 + 1 : key.len)); + req->type = type; + req->pending = (h2o_linklist_t){NULL}; + req->inflight = (h2o_linklist_t){NULL}; + memset(&req->data, 0, sizeof(req->data)); + if (ctx->prefix.len != 0) + memcpy(req->key.base, ctx->prefix.base, ctx->prefix.len); + req->key.len = ctx->prefix.len; + if (encode_key) { + req->key.len += h2o_base64_encode(req->key.base + req->key.len, key.base, key.len, 1); + } else { + memcpy(req->key.base + req->key.len, key.base, key.len); + req->key.len += key.len; + } + return req; +} + +static void free_req(h2o_memcached_req_t *req) +{ + assert(!h2o_linklist_is_linked(&req->pending)); + switch (req->type) { + case REQ_TYPE_GET: + assert(!h2o_linklist_is_linked(&req->data.get.message.link)); + h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len); + free(req->data.get.value.base); + break; + case REQ_TYPE_SET: + h2o_mem_set_secure(req->data.set.value.base, 0, req->data.set.value.len); + free(req->data.set.value.base); + break; + case REQ_TYPE_DELETE: + break; + default: + assert(!"FIXME"); + break; + } + free(req); +} + +static void discard_req(h2o_memcached_req_t *req) +{ + switch (req->type) { + case REQ_TYPE_GET: + h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); + break; + default: + free_req(req); + break; + } +} + +static h2o_memcached_req_t *pop_inflight(struct st_h2o_memcached_conn_t *conn, uint32_t serial) +{ + h2o_memcached_req_t *req; + + pthread_mutex_lock(&conn->mutex); + + if (conn->yrmcds.text_mode) { + /* in text mode, responses are returned in order (and we may receive responses for commands other than GET) */ + if (!h2o_linklist_is_empty(&conn->inflight)) { + req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn->inflight.next); + assert(req->type == REQ_TYPE_GET); + if (req->data.get.serial == serial) + goto Found; + } + } else { + /* in binary mode, responses are received out-of-order (and we would only recieve responses for GET) */ + h2o_linklist_t *node; + for (node = conn->inflight.next; node != &conn->inflight; node = node->next) { + req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, node); + assert(req->type == REQ_TYPE_GET); + if (req->data.get.serial == serial) + goto Found; + } + } + + /* not found */ + pthread_mutex_unlock(&conn->mutex); + return NULL; + +Found: + h2o_linklist_unlink(&req->inflight); + pthread_mutex_unlock(&conn->mutex); + return req; +} + +static void *writer_main(void *_conn) +{ + struct st_h2o_memcached_conn_t *conn = _conn; + yrmcds_error err; + + pthread_mutex_lock(&conn->ctx->mutex); + + while (!__sync_add_and_fetch(&conn->writer_exit_requested, 0)) { + while (!h2o_linklist_is_empty(&conn->ctx->pending)) { + h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn->ctx->pending.next); + h2o_linklist_unlink(&req->pending); + pthread_mutex_unlock(&conn->ctx->mutex); + + switch (req->type) { + case REQ_TYPE_GET: + pthread_mutex_lock(&conn->mutex); + h2o_linklist_insert(&conn->inflight, &req->inflight); + pthread_mutex_unlock(&conn->mutex); + if ((err = yrmcds_get(&conn->yrmcds, req->key.base, req->key.len, 0, &req->data.get.serial)) != YRMCDS_OK) + goto Error; + break; + case REQ_TYPE_SET: + err = yrmcds_set(&conn->yrmcds, req->key.base, req->key.len, req->data.set.value.base, req->data.set.value.len, 0, + req->data.set.expiration, 0, !conn->yrmcds.text_mode, NULL); + discard_req(req); + if (err != YRMCDS_OK) + goto Error; + break; + case REQ_TYPE_DELETE: + err = yrmcds_remove(&conn->yrmcds, req->key.base, req->key.len, !conn->yrmcds.text_mode, NULL); + discard_req(req); + if (err != YRMCDS_OK) + goto Error; + break; + default: + fprintf(stderr, "[lib/common/memcached.c] unknown type:%d\n", (int)req->type); + err = YRMCDS_NOT_IMPLEMENTED; + goto Error; + } + + pthread_mutex_lock(&conn->ctx->mutex); + } + pthread_cond_wait(&conn->ctx->cond, &conn->ctx->mutex); + } + + pthread_mutex_unlock(&conn->ctx->mutex); + return NULL; + +Error: + fprintf(stderr, "[lib/common/memcached.c] failed to send request; %s\n", yrmcds_strerror(err)); + /* doc says the call can be used to interrupt yrmcds_recv */ + yrmcds_shutdown(&conn->yrmcds); + + return NULL; +} + +static void connect_to_server(h2o_memcached_context_t *ctx, yrmcds *yrmcds) +{ + size_t failcnt; + yrmcds_error err; + + for (failcnt = 0; (err = yrmcds_connect(yrmcds, ctx->host, ctx->port)) != YRMCDS_OK; ++failcnt) { + if (failcnt == 0) { + fprintf(stderr, "[lib/common/memcached.c] failed to connect to memcached at %s:%" PRIu16 ", %s\n", ctx->host, ctx->port, + yrmcds_strerror(err)); + } + ++failcnt; + usleep(2000000 + h2o_rand() % 3000000); /* sleep 2 to 5 seconds */ + } + /* connected */ + if (ctx->text_protocol) + yrmcds_text_mode(yrmcds); + fprintf(stderr, "[lib/common/memcached.c] connected to memcached at %s:%" PRIu16 "\n", ctx->host, ctx->port); +} + +static void reader_main(h2o_memcached_context_t *ctx) +{ + struct st_h2o_memcached_conn_t conn = {ctx, {0}, PTHREAD_MUTEX_INITIALIZER, {&conn.inflight, &conn.inflight}, 0}; + pthread_t writer_thread; + yrmcds_response resp; + yrmcds_error err; + + /* connect to server and start the writer thread */ + connect_to_server(conn.ctx, &conn.yrmcds); + if (pthread_create(&writer_thread, NULL, writer_main, &conn) != 0) { + perror("pthread_create"); + abort(); + } + + pthread_mutex_lock(&conn.ctx->mutex); + ++conn.ctx->num_threads_connected; + pthread_mutex_unlock(&conn.ctx->mutex); + + /* receive data until an error occurs */ + while (1) { + if ((err = yrmcds_recv(&conn.yrmcds, &resp)) != YRMCDS_OK) { + fprintf(stderr, "[lib/common/memcached.c] yrmcds_recv:%s\n", yrmcds_strerror(err)); + break; + } + h2o_memcached_req_t *req = pop_inflight(&conn, resp.serial); + if (req == NULL) { + if (conn.yrmcds.text_mode) + continue; + fprintf(stderr, "[lib/common/memcached.c] received unexpected serial\n"); + break; + } + if (resp.status == YRMCDS_STATUS_OK) { + req->data.get.value = h2o_iovec_init(h2o_mem_alloc(resp.data_len), resp.data_len); + memcpy(req->data.get.value.base, resp.data, resp.data_len); + h2o_mem_set_secure((void *)resp.data, 0, resp.data_len); + } + h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); + } + + /* send error to all the reqs in-flight */ + pthread_mutex_lock(&conn.mutex); + while (!h2o_linklist_is_empty(&conn.inflight)) { + h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn.inflight.next); + h2o_linklist_unlink(&req->inflight); + assert(req->type == REQ_TYPE_GET); + h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); + } + pthread_mutex_unlock(&conn.mutex); + + /* stop the writer thread */ + __sync_add_and_fetch(&conn.writer_exit_requested, 1); + pthread_mutex_lock(&conn.ctx->mutex); + pthread_cond_broadcast(&conn.ctx->cond); + pthread_mutex_unlock(&conn.ctx->mutex); + pthread_join(writer_thread, NULL); + + /* decrement num_threads_connected, and discard all the pending requests if no connections are alive */ + pthread_mutex_lock(&conn.ctx->mutex); + if (--conn.ctx->num_threads_connected == 0) { + while (!h2o_linklist_is_empty(&conn.ctx->pending)) { + h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn.ctx->pending.next); + h2o_linklist_unlink(&req->pending); + discard_req(req); + } + } + pthread_mutex_unlock(&conn.ctx->mutex); + + /* close the connection */ + yrmcds_close(&conn.yrmcds); +} + +static void *thread_main(void *_ctx) +{ + h2o_memcached_context_t *ctx = _ctx; + + while (1) + reader_main(ctx); + return NULL; +} + +static void dispatch(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req) +{ + pthread_mutex_lock(&ctx->mutex); + + if (ctx->num_threads_connected != 0) { + h2o_linklist_insert(&ctx->pending, &req->pending); + pthread_cond_signal(&ctx->cond); + } else { + discard_req(req); + } + + pthread_mutex_unlock(&ctx->mutex); +} + +void h2o_memcached_receiver(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages) +{ + while (!h2o_linklist_is_empty(messages)) { + h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, data.get.message.link, messages->next); + h2o_linklist_unlink(&req->data.get.message.link); + assert(req->type == REQ_TYPE_GET); + if (req->data.get.cb != NULL) { + if (req->data.get.value_is_encoded && req->data.get.value.len != 0) { + h2o_iovec_t decoded = h2o_decode_base64url(NULL, req->data.get.value.base, req->data.get.value.len); + h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len); + free(req->data.get.value.base); + req->data.get.value = decoded; + } + req->data.get.cb(req->data.get.value, req->data.get.cb_data); + } + free_req(req); + } +} + +h2o_memcached_req_t *h2o_memcached_get(h2o_memcached_context_t *ctx, h2o_multithread_receiver_t *receiver, h2o_iovec_t key, + h2o_memcached_get_cb cb, void *cb_data, int flags) +{ + h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_GET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); + req->data.get.receiver = receiver; + req->data.get.cb = cb; + req->data.get.cb_data = cb_data; + req->data.get.value_is_encoded = (flags & H2O_MEMCACHED_ENCODE_VALUE) != 0; + dispatch(ctx, req); + return req; +} + +void h2o_memcached_cancel_get(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req) +{ + int do_free = 0; + + pthread_mutex_lock(&ctx->mutex); + req->data.get.cb = NULL; + if (h2o_linklist_is_linked(&req->pending)) { + h2o_linklist_unlink(&req->pending); + do_free = 1; + } + pthread_mutex_unlock(&ctx->mutex); + + if (do_free) + free_req(req); +} + +void h2o_memcached_set(h2o_memcached_context_t *ctx, h2o_iovec_t key, h2o_iovec_t value, uint32_t expiration, int flags) +{ + h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_SET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); + if ((flags & H2O_MEMCACHED_ENCODE_VALUE) != 0) { + req->data.set.value.base = h2o_mem_alloc((value.len + 2) / 3 * 4 + 1); + req->data.set.value.len = h2o_base64_encode(req->data.set.value.base, value.base, value.len, 1); + } else { + req->data.set.value = h2o_iovec_init(h2o_mem_alloc(value.len), value.len); + memcpy(req->data.set.value.base, value.base, value.len); + } + req->data.set.expiration = expiration; + dispatch(ctx, req); +} + +void h2o_memcached_delete(h2o_memcached_context_t *ctx, h2o_iovec_t key, int flags) +{ + h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_DELETE, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); + dispatch(ctx, req); +} + +h2o_memcached_context_t *h2o_memcached_create_context(const char *host, uint16_t port, int text_protocol, size_t num_threads, + const char *prefix) +{ + h2o_memcached_context_t *ctx = h2o_mem_alloc(sizeof(*ctx)); + + pthread_mutex_init(&ctx->mutex, NULL); + pthread_cond_init(&ctx->cond, NULL); + h2o_linklist_init_anchor(&ctx->pending); + ctx->num_threads_connected = 0; + ctx->host = h2o_strdup(NULL, host, SIZE_MAX).base; + ctx->port = port; + ctx->text_protocol = text_protocol; + ctx->prefix = h2o_strdup(NULL, prefix, SIZE_MAX); + + { /* start the threads */ + pthread_t tid; + pthread_attr_t attr; + size_t i; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, 1); + for (i = 0; i != num_threads; ++i) + h2o_multithread_create_thread(&tid, &attr, thread_main, ctx); + pthread_attr_destroy(&attr); + } + + return ctx; +} diff --git a/web/server/h2o/libh2o/lib/common/memory.c b/web/server/h2o/libh2o/lib/common/memory.c new file mode 100644 index 000000000..ba9f2dba2 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/memory.c @@ -0,0 +1,400 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <errno.h> +#include <fcntl.h> +#include <stddef.h> +#include <stdio.h> +#include <stdint.h> +#include <stdlib.h> +#include <string.h> +#include <sys/mman.h> +#include <unistd.h> +#include "h2o/memory.h" + +#if defined(__linux__) +#define USE_POSIX_FALLOCATE 1 +#elif __FreeBSD__ >= 9 +#define USE_POSIX_FALLOCATE 1 +#elif __NetBSD__ >= 7 +#define USE_POSIX_FALLOCATE 1 +#else +#define USE_POSIX_FALLOCATE 0 +#endif + +struct st_h2o_mem_recycle_chunk_t { + struct st_h2o_mem_recycle_chunk_t *next; +}; + +struct st_h2o_mem_pool_chunk_t { + struct st_h2o_mem_pool_chunk_t *next; + size_t _dummy; /* align to 2*sizeof(void*) */ + char bytes[4096 - sizeof(void *) * 2]; +}; + +struct st_h2o_mem_pool_direct_t { + struct st_h2o_mem_pool_direct_t *next; + size_t _dummy; /* align to 2*sizeof(void*) */ + char bytes[1]; +}; + +struct st_h2o_mem_pool_shared_ref_t { + struct st_h2o_mem_pool_shared_ref_t *next; + struct st_h2o_mem_pool_shared_entry_t *entry; +}; + +void *(*h2o_mem__set_secure)(void *, int, size_t) = memset; + +static __thread h2o_mem_recycle_t mempool_allocator = {16}; + +void h2o__fatal(const char *msg) +{ + fprintf(stderr, "fatal:%s\n", msg); + abort(); +} + +void *h2o_mem_alloc_recycle(h2o_mem_recycle_t *allocator, size_t sz) +{ + struct st_h2o_mem_recycle_chunk_t *chunk; + if (allocator->cnt == 0) + return h2o_mem_alloc(sz); + /* detach and return the pooled pointer */ + chunk = allocator->_link; + assert(chunk != NULL); + allocator->_link = chunk->next; + --allocator->cnt; + return chunk; +} + +void h2o_mem_free_recycle(h2o_mem_recycle_t *allocator, void *p) +{ + struct st_h2o_mem_recycle_chunk_t *chunk; + if (allocator->cnt == allocator->max) { + free(p); + return; + } + /* register the pointer to the pool */ + chunk = p; + chunk->next = allocator->_link; + allocator->_link = chunk; + ++allocator->cnt; +} + +void h2o_mem_init_pool(h2o_mem_pool_t *pool) +{ + pool->chunks = NULL; + pool->chunk_offset = sizeof(pool->chunks->bytes); + pool->directs = NULL; + pool->shared_refs = NULL; +} + +void h2o_mem_clear_pool(h2o_mem_pool_t *pool) +{ + /* release the refcounted chunks */ + if (pool->shared_refs != NULL) { + struct st_h2o_mem_pool_shared_ref_t *ref = pool->shared_refs; + do { + h2o_mem_release_shared(ref->entry->bytes); + } while ((ref = ref->next) != NULL); + pool->shared_refs = NULL; + } + /* release the direct chunks */ + if (pool->directs != NULL) { + struct st_h2o_mem_pool_direct_t *direct = pool->directs, *next; + do { + next = direct->next; + free(direct); + } while ((direct = next) != NULL); + pool->directs = NULL; + } + /* free chunks, and reset the first chunk */ + while (pool->chunks != NULL) { + struct st_h2o_mem_pool_chunk_t *next = pool->chunks->next; + h2o_mem_free_recycle(&mempool_allocator, pool->chunks); + pool->chunks = next; + } + pool->chunk_offset = sizeof(pool->chunks->bytes); +} + +void *h2o_mem_alloc_pool(h2o_mem_pool_t *pool, size_t sz) +{ + void *ret; + + if (sz >= sizeof(pool->chunks->bytes) / 4) { + /* allocate large requests directly */ + struct st_h2o_mem_pool_direct_t *newp = h2o_mem_alloc(offsetof(struct st_h2o_mem_pool_direct_t, bytes) + sz); + newp->next = pool->directs; + pool->directs = newp; + return newp->bytes; + } + + /* return a valid pointer even for 0 sized allocs */ + if (sz == 0) + sz = 1; + + /* 16-bytes rounding */ + sz = (sz + 15) & ~15; + if (sizeof(pool->chunks->bytes) - pool->chunk_offset < sz) { + /* allocate new chunk */ + struct st_h2o_mem_pool_chunk_t *newp = h2o_mem_alloc_recycle(&mempool_allocator, sizeof(*newp)); + newp->next = pool->chunks; + pool->chunks = newp; + pool->chunk_offset = 0; + } + + ret = pool->chunks->bytes + pool->chunk_offset; + pool->chunk_offset += sz; + return ret; +} + +static void link_shared(h2o_mem_pool_t *pool, struct st_h2o_mem_pool_shared_entry_t *entry) +{ + struct st_h2o_mem_pool_shared_ref_t *ref = h2o_mem_alloc_pool(pool, sizeof(struct st_h2o_mem_pool_shared_ref_t)); + ref->entry = entry; + ref->next = pool->shared_refs; + pool->shared_refs = ref; +} + +void *h2o_mem_alloc_shared(h2o_mem_pool_t *pool, size_t sz, void (*dispose)(void *)) +{ + struct st_h2o_mem_pool_shared_entry_t *entry = h2o_mem_alloc(offsetof(struct st_h2o_mem_pool_shared_entry_t, bytes) + sz); + entry->refcnt = 1; + entry->dispose = dispose; + if (pool != NULL) + link_shared(pool, entry); + return entry->bytes; +} + +void h2o_mem_link_shared(h2o_mem_pool_t *pool, void *p) +{ + h2o_mem_addref_shared(p); + link_shared(pool, H2O_STRUCT_FROM_MEMBER(struct st_h2o_mem_pool_shared_entry_t, bytes, p)); +} + +static size_t topagesize(size_t capacity) +{ + size_t pagesize = getpagesize(); + return (offsetof(h2o_buffer_t, _buf) + capacity + pagesize - 1) / pagesize * pagesize; +} + +void h2o_buffer__do_free(h2o_buffer_t *buffer) +{ + /* caller should assert that the buffer is not part of the prototype */ + if (buffer->capacity == buffer->_prototype->_initial_buf.capacity) { + h2o_mem_free_recycle(&buffer->_prototype->allocator, buffer); + } else if (buffer->_fd != -1) { + close(buffer->_fd); + munmap((void *)buffer, topagesize(buffer->capacity)); + } else { + free(buffer); + } +} + +h2o_iovec_t h2o_buffer_reserve(h2o_buffer_t **_inbuf, size_t min_guarantee) +{ + h2o_buffer_t *inbuf = *_inbuf; + h2o_iovec_t ret; + + if (inbuf->bytes == NULL) { + h2o_buffer_prototype_t *prototype = H2O_STRUCT_FROM_MEMBER(h2o_buffer_prototype_t, _initial_buf, inbuf); + if (min_guarantee <= prototype->_initial_buf.capacity) { + min_guarantee = prototype->_initial_buf.capacity; + inbuf = h2o_mem_alloc_recycle(&prototype->allocator, offsetof(h2o_buffer_t, _buf) + min_guarantee); + } else { + inbuf = h2o_mem_alloc(offsetof(h2o_buffer_t, _buf) + min_guarantee); + } + *_inbuf = inbuf; + inbuf->size = 0; + inbuf->bytes = inbuf->_buf; + inbuf->capacity = min_guarantee; + inbuf->_prototype = prototype; + inbuf->_fd = -1; + } else { + if (min_guarantee <= inbuf->capacity - inbuf->size - (inbuf->bytes - inbuf->_buf)) { + /* ok */ + } else if ((inbuf->size + min_guarantee) * 2 <= inbuf->capacity) { + /* the capacity should be less than or equal to 2 times of: size + guarantee */ + memmove(inbuf->_buf, inbuf->bytes, inbuf->size); + inbuf->bytes = inbuf->_buf; + } else { + size_t new_capacity = inbuf->capacity; + do { + new_capacity *= 2; + } while (new_capacity - inbuf->size < min_guarantee); + if (inbuf->_prototype->mmap_settings != NULL && inbuf->_prototype->mmap_settings->threshold <= new_capacity) { + size_t new_allocsize = topagesize(new_capacity); + int fd; + h2o_buffer_t *newp; + if (inbuf->_fd == -1) { + char *tmpfn = alloca(strlen(inbuf->_prototype->mmap_settings->fn_template) + 1); + strcpy(tmpfn, inbuf->_prototype->mmap_settings->fn_template); + if ((fd = mkstemp(tmpfn)) == -1) { + fprintf(stderr, "failed to create temporary file:%s:%s\n", tmpfn, strerror(errno)); + goto MapError; + } + unlink(tmpfn); + } else { + fd = inbuf->_fd; + } + int fallocate_ret; +#if USE_POSIX_FALLOCATE + fallocate_ret = posix_fallocate(fd, 0, new_allocsize); +#else + fallocate_ret = ftruncate(fd, new_allocsize); +#endif + if (fallocate_ret != 0) { + perror("failed to resize temporary file"); + goto MapError; + } + if ((newp = (void *)mmap(NULL, new_allocsize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) { + perror("mmap failed"); + goto MapError; + } + if (inbuf->_fd == -1) { + /* copy data (moving from malloc to mmap) */ + newp->size = inbuf->size; + newp->bytes = newp->_buf; + newp->capacity = new_capacity; + newp->_prototype = inbuf->_prototype; + newp->_fd = fd; + memcpy(newp->_buf, inbuf->bytes, inbuf->size); + h2o_buffer__do_free(inbuf); + *_inbuf = inbuf = newp; + } else { + /* munmap */ + size_t offset = inbuf->bytes - inbuf->_buf; + munmap((void *)inbuf, topagesize(inbuf->capacity)); + *_inbuf = inbuf = newp; + inbuf->capacity = new_capacity; + inbuf->bytes = newp->_buf + offset; + } + } else { + h2o_buffer_t *newp = h2o_mem_alloc(offsetof(h2o_buffer_t, _buf) + new_capacity); + newp->size = inbuf->size; + newp->bytes = newp->_buf; + newp->capacity = new_capacity; + newp->_prototype = inbuf->_prototype; + newp->_fd = -1; + memcpy(newp->_buf, inbuf->bytes, inbuf->size); + h2o_buffer__do_free(inbuf); + *_inbuf = inbuf = newp; + } + } + } + + ret.base = inbuf->bytes + inbuf->size; + ret.len = inbuf->_buf + inbuf->capacity - ret.base; + + return ret; + +MapError: + ret.base = NULL; + ret.len = 0; + return ret; +} + +void h2o_buffer_consume(h2o_buffer_t **_inbuf, size_t delta) +{ + h2o_buffer_t *inbuf = *_inbuf; + + if (delta != 0) { + assert(inbuf->bytes != NULL); + if (inbuf->size == delta) { + *_inbuf = &inbuf->_prototype->_initial_buf; + h2o_buffer__do_free(inbuf); + } else { + inbuf->size -= delta; + inbuf->bytes += delta; + } + } +} + +void h2o_buffer__dispose_linked(void *p) +{ + h2o_buffer_t **buf = p; + h2o_buffer_dispose(buf); +} + +void h2o_vector__expand(h2o_mem_pool_t *pool, h2o_vector_t *vector, size_t element_size, size_t new_capacity) +{ + void *new_entries; + assert(vector->capacity < new_capacity); + if (vector->capacity == 0) + vector->capacity = 4; + while (vector->capacity < new_capacity) + vector->capacity *= 2; + if (pool != NULL) { + new_entries = h2o_mem_alloc_pool(pool, element_size * vector->capacity); + h2o_memcpy(new_entries, vector->entries, element_size * vector->size); + } else { + new_entries = h2o_mem_realloc(vector->entries, element_size * vector->capacity); + } + vector->entries = new_entries; +} + +void h2o_mem_swap(void *_x, void *_y, size_t len) +{ + char *x = _x, *y = _y; + char buf[256]; + + while (len != 0) { + size_t blocksz = len < sizeof(buf) ? len : sizeof(buf); + memcpy(buf, x, blocksz); + memcpy(x, y, blocksz); + memcpy(y, buf, blocksz); + len -= blocksz; + x += blocksz; + y += blocksz; + } +} + +void h2o_dump_memory(FILE *fp, const char *buf, size_t len) +{ + size_t i, j; + + for (i = 0; i < len; i += 16) { + fprintf(fp, "%08zx", i); + for (j = 0; j != 16; ++j) { + if (i + j < len) + fprintf(fp, " %02x", (int)(unsigned char)buf[i + j]); + else + fprintf(fp, " "); + } + fprintf(fp, " "); + for (j = 0; j != 16 && i + j < len; ++j) { + int ch = buf[i + j]; + fputc(' ' <= ch && ch < 0x7f ? ch : '.', fp); + } + fprintf(fp, "\n"); + } +} + +void h2o_append_to_null_terminated_list(void ***list, void *element) +{ + size_t cnt; + + for (cnt = 0; (*list)[cnt] != NULL; ++cnt) + ; + *list = h2o_mem_realloc(*list, (cnt + 2) * sizeof(void *)); + (*list)[cnt++] = element; + (*list)[cnt] = NULL; +} diff --git a/web/server/h2o/libh2o/lib/common/multithread.c b/web/server/h2o/libh2o/lib/common/multithread.c new file mode 100644 index 000000000..b4e8ba836 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/multithread.c @@ -0,0 +1,274 @@ +/* + * Copyright (c) 2015-2016 DeNA Co., Ltd., Kazuho Oku, Tatsuhiko Kubo, + * Chul-Woong Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <pthread.h> +#include "cloexec.h" +#include "h2o/multithread.h" + +struct st_h2o_multithread_queue_t { +#if H2O_USE_LIBUV + uv_async_t async; +#else + struct { + int write; + h2o_socket_t *read; + } async; +#endif + pthread_mutex_t mutex; + struct { + h2o_linklist_t active; + h2o_linklist_t inactive; + } receivers; +}; + +static void queue_cb(h2o_multithread_queue_t *queue) +{ + pthread_mutex_lock(&queue->mutex); + + while (!h2o_linklist_is_empty(&queue->receivers.active)) { + h2o_multithread_receiver_t *receiver = + H2O_STRUCT_FROM_MEMBER(h2o_multithread_receiver_t, _link, queue->receivers.active.next); + /* detach all the messages from the receiver */ + h2o_linklist_t messages; + h2o_linklist_init_anchor(&messages); + h2o_linklist_insert_list(&messages, &receiver->_messages); + /* relink the receiver to the inactive list */ + h2o_linklist_unlink(&receiver->_link); + h2o_linklist_insert(&queue->receivers.inactive, &receiver->_link); + + /* dispatch the messages */ + pthread_mutex_unlock(&queue->mutex); + receiver->cb(receiver, &messages); + assert(h2o_linklist_is_empty(&messages)); + pthread_mutex_lock(&queue->mutex); + } + + pthread_mutex_unlock(&queue->mutex); +} + +#ifdef H2O_NO_64BIT_ATOMICS +pthread_mutex_t h2o_conn_id_mutex = PTHREAD_MUTEX_INITIALIZER; +#endif + +#if H2O_USE_LIBUV +#else + +#include <errno.h> +#include <fcntl.h> +#include <unistd.h> + +static void on_read(h2o_socket_t *sock, const char *err) +{ + if (err != NULL) { + fprintf(stderr, "pipe error\n"); + abort(); + } + + h2o_buffer_consume(&sock->input, sock->input->size); + queue_cb(sock->data); +} + +static void init_async(h2o_multithread_queue_t *queue, h2o_loop_t *loop) +{ + int fds[2]; + + if (cloexec_pipe(fds) != 0) { + perror("pipe"); + abort(); + } + fcntl(fds[1], F_SETFL, O_NONBLOCK); + queue->async.write = fds[1]; + queue->async.read = h2o_evloop_socket_create(loop, fds[0], 0); + queue->async.read->data = queue; + h2o_socket_read_start(queue->async.read, on_read); +} + +#endif + +h2o_multithread_queue_t *h2o_multithread_create_queue(h2o_loop_t *loop) +{ + h2o_multithread_queue_t *queue = h2o_mem_alloc(sizeof(*queue)); + memset(queue, 0, sizeof(*queue)); + +#if H2O_USE_LIBUV + uv_async_init(loop, &queue->async, (uv_async_cb)queue_cb); +#else + init_async(queue, loop); +#endif + pthread_mutex_init(&queue->mutex, NULL); + h2o_linklist_init_anchor(&queue->receivers.active); + h2o_linklist_init_anchor(&queue->receivers.inactive); + + return queue; +} + +void h2o_multithread_destroy_queue(h2o_multithread_queue_t *queue) +{ + assert(h2o_linklist_is_empty(&queue->receivers.active)); + assert(h2o_linklist_is_empty(&queue->receivers.inactive)); +#if H2O_USE_LIBUV + uv_close((uv_handle_t *)&queue->async, (uv_close_cb)free); +#else + h2o_socket_read_stop(queue->async.read); + h2o_socket_close(queue->async.read); + close(queue->async.write); +#endif + pthread_mutex_destroy(&queue->mutex); +} + +void h2o_multithread_register_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver, + h2o_multithread_receiver_cb cb) +{ + receiver->queue = queue; + receiver->_link = (h2o_linklist_t){NULL}; + h2o_linklist_init_anchor(&receiver->_messages); + receiver->cb = cb; + + pthread_mutex_lock(&queue->mutex); + h2o_linklist_insert(&queue->receivers.inactive, &receiver->_link); + pthread_mutex_unlock(&queue->mutex); +} + +void h2o_multithread_unregister_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver) +{ + assert(queue == receiver->queue); + assert(h2o_linklist_is_empty(&receiver->_messages)); + pthread_mutex_lock(&queue->mutex); + h2o_linklist_unlink(&receiver->_link); + pthread_mutex_unlock(&queue->mutex); +} + +void h2o_multithread_send_message(h2o_multithread_receiver_t *receiver, h2o_multithread_message_t *message) +{ + int do_send = 0; + + pthread_mutex_lock(&receiver->queue->mutex); + if (message != NULL) { + assert(!h2o_linklist_is_linked(&message->link)); + if (h2o_linklist_is_empty(&receiver->_messages)) { + h2o_linklist_unlink(&receiver->_link); + h2o_linklist_insert(&receiver->queue->receivers.active, &receiver->_link); + do_send = 1; + } + h2o_linklist_insert(&receiver->_messages, &message->link); + } else { + if (h2o_linklist_is_empty(&receiver->_messages)) + do_send = 1; + } + pthread_mutex_unlock(&receiver->queue->mutex); + + if (do_send) { +#if H2O_USE_LIBUV + uv_async_send(&receiver->queue->async); +#else + while (write(receiver->queue->async.write, "", 1) == -1 && errno == EINTR) + ; +#endif + } +} + +void h2o_multithread_create_thread(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg) +{ + if (pthread_create(tid, attr, func, arg) != 0) { + perror("pthread_create"); + abort(); + } +} + +void h2o_sem_init(h2o_sem_t *sem, ssize_t capacity) +{ + pthread_mutex_init(&sem->_mutex, NULL); + pthread_cond_init(&sem->_cond, NULL); + sem->_cur = capacity; + sem->_capacity = capacity; +} + +void h2o_sem_destroy(h2o_sem_t *sem) +{ + assert(sem->_cur == sem->_capacity); + pthread_cond_destroy(&sem->_cond); + pthread_mutex_destroy(&sem->_mutex); +} + +void h2o_sem_wait(h2o_sem_t *sem) +{ + pthread_mutex_lock(&sem->_mutex); + while (sem->_cur <= 0) + pthread_cond_wait(&sem->_cond, &sem->_mutex); + --sem->_cur; + pthread_mutex_unlock(&sem->_mutex); +} + +void h2o_sem_post(h2o_sem_t *sem) +{ + pthread_mutex_lock(&sem->_mutex); + ++sem->_cur; + pthread_cond_signal(&sem->_cond); + pthread_mutex_unlock(&sem->_mutex); +} + +void h2o_sem_set_capacity(h2o_sem_t *sem, ssize_t new_capacity) +{ + pthread_mutex_lock(&sem->_mutex); + sem->_cur += new_capacity - sem->_capacity; + sem->_capacity = new_capacity; + pthread_cond_broadcast(&sem->_cond); + pthread_mutex_unlock(&sem->_mutex); +} + +/* barrier */ + +void h2o_barrier_init(h2o_barrier_t *barrier, size_t count) +{ + pthread_mutex_init(&barrier->_mutex, NULL); + pthread_cond_init(&barrier->_cond, NULL); + barrier->_count = count; +} + +int h2o_barrier_wait(h2o_barrier_t *barrier) +{ + int ret; + pthread_mutex_lock(&barrier->_mutex); + barrier->_count--; + if (barrier->_count == 0) { + pthread_cond_broadcast(&barrier->_cond); + ret = 1; + } else { + while (barrier->_count) + pthread_cond_wait(&barrier->_cond, &barrier->_mutex); + ret = 0; + } + pthread_mutex_unlock(&barrier->_mutex); + return ret; +} + +int h2o_barrier_done(h2o_barrier_t *barrier) +{ + return __sync_add_and_fetch(&barrier->_count, 0) == 0; +} + +void h2o_barrier_destroy(h2o_barrier_t *barrier) +{ + pthread_mutex_destroy(&barrier->_mutex); + pthread_cond_destroy(&barrier->_cond); +} diff --git a/web/server/h2o/libh2o/lib/common/serverutil.c b/web/server/h2o/libh2o/lib/common/serverutil.c new file mode 100644 index 000000000..8226f6efc --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/serverutil.c @@ -0,0 +1,317 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Nick Desaulniers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <errno.h> +#include <fcntl.h> +#include <grp.h> +#include <pthread.h> +#include <pwd.h> +#include <signal.h> +#include <spawn.h> +#include <stdint.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <unistd.h> +#if !defined(_SC_NPROCESSORS_ONLN) +#include <sys/sysctl.h> +#endif +#include "cloexec.h" +#include "h2o/memory.h" +#include "h2o/serverutil.h" +#include "h2o/socket.h" +#include "h2o/string_.h" + +void h2o_set_signal_handler(int signo, void (*cb)(int signo)) +{ + struct sigaction action; + + memset(&action, 0, sizeof(action)); + sigemptyset(&action.sa_mask); + action.sa_handler = cb; + sigaction(signo, &action, NULL); +} + +int h2o_setuidgid(const char *user) +{ + struct passwd pwbuf, *pw; + char buf[65536]; /* should be large enough */ + + errno = 0; + if (getpwnam_r(user, &pwbuf, buf, sizeof(buf), &pw) != 0) { + perror("getpwnam_r"); + return -1; + } + if (pw == NULL) { + fprintf(stderr, "unknown user:%s\n", user); + return -1; + } + if (setgid(pw->pw_gid) != 0) { + fprintf(stderr, "setgid(%d) failed:%s\n", (int)pw->pw_gid, strerror(errno)); + return -1; + } + if (initgroups(pw->pw_name, pw->pw_gid) != 0) { + fprintf(stderr, "initgroups(%s, %d) failed:%s\n", pw->pw_name, (int)pw->pw_gid, strerror(errno)); + return -1; + } + if (setuid(pw->pw_uid) != 0) { + fprintf(stderr, "setuid(%d) failed:%s\n", (int)pw->pw_uid, strerror(errno)); + return -1; + } + + return 0; +} + +size_t h2o_server_starter_get_fds(int **_fds) +{ + const char *ports_env, *start, *end, *eq; + size_t t; + H2O_VECTOR(int) fds = {NULL}; + + if ((ports_env = getenv("SERVER_STARTER_PORT")) == NULL) + return 0; + if (ports_env[0] == '\0') { + fprintf(stderr, "$SERVER_STARTER_PORT is empty\n"); + return SIZE_MAX; + } + + /* ports_env example: 127.0.0.1:80=3;/tmp/sock=4 */ + for (start = ports_env; *start != '\0'; start = *end == ';' ? end + 1 : end) { + if ((end = strchr(start, ';')) == NULL) + end = start + strlen(start); + if ((eq = memchr(start, '=', end - start)) == NULL) { + fprintf(stderr, "invalid $SERVER_STARTER_PORT, an element without `=` in: %s\n", ports_env); + goto Error; + } + if ((t = h2o_strtosize(eq + 1, end - eq - 1)) == SIZE_MAX) { + fprintf(stderr, "invalid file descriptor number in $SERVER_STARTER_PORT: %s\n", ports_env); + goto Error; + } + h2o_vector_reserve(NULL, &fds, fds.size + 1); + fds.entries[fds.size++] = (int)t; + } + + *_fds = fds.entries; + return fds.size; +Error: + free(fds.entries); + return SIZE_MAX; +} + +static char **build_spawn_env(void) +{ + extern char **environ; + size_t num; + + /* calculate number of envvars, as well as looking for H2O_ROOT= */ + for (num = 0; environ[num] != NULL; ++num) + if (strncmp(environ[num], "H2O_ROOT=", sizeof("H2O_ROOT=") - 1) == 0) + return NULL; + + /* not found */ + char **newenv = h2o_mem_alloc(sizeof(*newenv) * (num + 2) + sizeof("H2O_ROOT=" H2O_TO_STR(H2O_ROOT))); + memcpy(newenv, environ, sizeof(*newenv) * num); + newenv[num] = (char *)(newenv + num + 2); + newenv[num + 1] = NULL; + strcpy(newenv[num], "H2O_ROOT=" H2O_TO_STR(H2O_ROOT)); + + return newenv; +} + +pid_t h2o_spawnp(const char *cmd, char *const *argv, const int *mapped_fds, int cloexec_mutex_is_locked) +{ +#if defined(__linux__) + + /* posix_spawnp of Linux does not return error if the executable does not exist, see + * https://gist.github.com/kazuho/0c233e6f86d27d6e4f09 + */ + extern char **environ; + int pipefds[2] = {-1, -1}, errnum; + pid_t pid; + + /* create pipe, used for sending error codes */ + if (pipe2(pipefds, O_CLOEXEC) != 0) + goto Error; + + /* fork */ + if (!cloexec_mutex_is_locked) + pthread_mutex_lock(&cloexec_mutex); + if ((pid = fork()) == 0) { + /* in child process, map the file descriptors and execute; return the errnum through pipe if exec failed */ + if (mapped_fds != NULL) { + for (; *mapped_fds != -1; mapped_fds += 2) { + if (mapped_fds[0] != mapped_fds[1]) { + if (mapped_fds[1] != -1) + dup2(mapped_fds[0], mapped_fds[1]); + close(mapped_fds[0]); + } + } + } + char **env = build_spawn_env(); + if (env != NULL) + environ = env; + execvp(cmd, argv); + errnum = errno; + write(pipefds[1], &errnum, sizeof(errnum)); + _exit(EX_SOFTWARE); + } + if (!cloexec_mutex_is_locked) + pthread_mutex_unlock(&cloexec_mutex); + if (pid == -1) + goto Error; + + /* parent process */ + close(pipefds[1]); + pipefds[1] = -1; + ssize_t rret; + errnum = 0; + while ((rret = read(pipefds[0], &errnum, sizeof(errnum))) == -1 && errno == EINTR) + ; + if (rret != 0) { + /* spawn failed */ + while (waitpid(pid, NULL, 0) != pid) + ; + pid = -1; + errno = errnum; + goto Error; + } + + /* spawn succeeded */ + close(pipefds[0]); + return pid; + +Error: + errnum = errno; + if (pipefds[0] != -1) + close(pipefds[0]); + if (pipefds[1] != -1) + close(pipefds[1]); + errno = errnum; + return -1; + +#else + + posix_spawn_file_actions_t file_actions; + pid_t pid; + extern char **environ; + char **env = build_spawn_env(); + posix_spawn_file_actions_init(&file_actions); + if (mapped_fds != NULL) { + for (; *mapped_fds != -1; mapped_fds += 2) { + if (mapped_fds[1] != -1) + posix_spawn_file_actions_adddup2(&file_actions, mapped_fds[0], mapped_fds[1]); + posix_spawn_file_actions_addclose(&file_actions, mapped_fds[0]); + } + } + if (!cloexec_mutex_is_locked) + pthread_mutex_lock(&cloexec_mutex); + errno = posix_spawnp(&pid, cmd, &file_actions, NULL, argv, env != NULL ? env : environ); + if (!cloexec_mutex_is_locked) + pthread_mutex_unlock(&cloexec_mutex); + free(env); + if (errno != 0) + return -1; + + return pid; + +#endif +} + +int h2o_read_command(const char *cmd, char **argv, h2o_buffer_t **resp, int *child_status) +{ + int respfds[2] = {-1, -1}; + pid_t pid = -1; + int mutex_locked = 0, ret = -1; + + h2o_buffer_init(resp, &h2o_socket_buffer_prototype); + + pthread_mutex_lock(&cloexec_mutex); + mutex_locked = 1; + + /* create pipe for reading the result */ + if (pipe(respfds) != 0) + goto Exit; + if (fcntl(respfds[0], F_SETFD, O_CLOEXEC) < 0) + goto Exit; + + /* spawn */ + int mapped_fds[] = {respfds[1], 1, /* stdout of the child process is read from the pipe */ + -1}; + if ((pid = h2o_spawnp(cmd, argv, mapped_fds, 1)) == -1) + goto Exit; + close(respfds[1]); + respfds[1] = -1; + + pthread_mutex_unlock(&cloexec_mutex); + mutex_locked = 0; + + /* read the response from pipe */ + while (1) { + h2o_iovec_t buf = h2o_buffer_reserve(resp, 8192); + ssize_t r; + while ((r = read(respfds[0], buf.base, buf.len)) == -1 && errno == EINTR) + ; + if (r <= 0) + break; + (*resp)->size += r; + } + +Exit: + if (mutex_locked) + pthread_mutex_unlock(&cloexec_mutex); + if (pid != -1) { + /* wait for the child to complete */ + pid_t r; + while ((r = waitpid(pid, child_status, 0)) == -1 && errno == EINTR) + ; + if (r == pid) { + /* success */ + ret = 0; + } + } + if (respfds[0] != -1) + close(respfds[0]); + if (respfds[1] != -1) + close(respfds[1]); + if (ret != 0) + h2o_buffer_dispose(resp); + + return ret; +} + +size_t h2o_numproc(void) +{ +#if defined(_SC_NPROCESSORS_ONLN) + return (size_t)sysconf(_SC_NPROCESSORS_ONLN); +#elif defined(CTL_HW) && defined(HW_AVAILCPU) + int name[] = {CTL_HW, HW_AVAILCPU}; + int ncpu; + size_t ncpu_sz = sizeof(ncpu); + if (sysctl(name, sizeof(name) / sizeof(name[0]), &ncpu, &ncpu_sz, NULL, 0) != 0 || sizeof(ncpu) != ncpu_sz) { + fprintf(stderr, "[ERROR] failed to obtain number of CPU cores, assuming as one\n"); + ncpu = 1; + } + return ncpu; +#else + return 1; +#endif +} diff --git a/web/server/h2o/libh2o/lib/common/socket.c b/web/server/h2o/libh2o/lib/common/socket.c new file mode 100644 index 000000000..5b1c37e04 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket.c @@ -0,0 +1,1433 @@ +/* + * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku, Justin Zhu + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <errno.h> +#include <fcntl.h> +#include <inttypes.h> +#include <limits.h> +#include <netdb.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <string.h> +#include <sys/un.h> +#include <unistd.h> +#include <openssl/err.h> +#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) +#include <sys/ioctl.h> +#endif +#if H2O_USE_PICOTLS +#include "picotls.h" +#endif +#include "h2o/socket.h" +#include "h2o/timeout.h" + +#if defined(__APPLE__) && defined(__clang__) +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif + +#ifndef IOV_MAX +#define IOV_MAX UIO_MAXIOV +#endif + +/* kernel-headers bundled with Ubuntu 14.04 does not have the constant defined in netinet/tcp.h */ +#if defined(__linux__) && !defined(TCP_NOTSENT_LOWAT) +#define TCP_NOTSENT_LOWAT 25 +#endif + +#define OPENSSL_HOSTNAME_VALIDATION_LINKAGE static +#include "../../deps/ssl-conservatory/openssl/openssl_hostname_validation.c" + +struct st_h2o_socket_ssl_t { + SSL_CTX *ssl_ctx; + SSL *ossl; +#if H2O_USE_PICOTLS + ptls_t *ptls; +#endif + int *did_write_in_read; /* used for detecting and closing the connection upon renegotiation (FIXME implement renegotiation) */ + size_t record_overhead; + struct { + h2o_socket_cb cb; + union { + struct { + struct { + enum { + ASYNC_RESUMPTION_STATE_COMPLETE = 0, /* just pass thru */ + ASYNC_RESUMPTION_STATE_RECORD, /* record first input, restore SSL state if it changes to REQUEST_SENT + */ + ASYNC_RESUMPTION_STATE_REQUEST_SENT /* async request has been sent, and is waiting for response */ + } state; + SSL_SESSION *session_data; + } async_resumption; + } server; + struct { + char *server_name; + h2o_cache_t *session_cache; + h2o_iovec_t session_cache_key; + h2o_cache_hashcode_t session_cache_key_hash; + } client; + }; + } handshake; + struct { + h2o_buffer_t *encrypted; + } input; + struct { + H2O_VECTOR(h2o_iovec_t) bufs; + h2o_mem_pool_t pool; /* placed at the last */ + } output; +}; + +struct st_h2o_ssl_context_t { + SSL_CTX *ctx; + const h2o_iovec_t *protocols; + h2o_iovec_t _npn_list_of_protocols; +}; + +/* backend functions */ +static void do_dispose_socket(h2o_socket_t *sock); +static void do_write(h2o_socket_t *sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb); +static void do_read_start(h2o_socket_t *sock); +static void do_read_stop(h2o_socket_t *sock); +static int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info); +static h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info); +static socklen_t get_peername_uncached(h2o_socket_t *sock, struct sockaddr *sa); + +/* internal functions called from the backend */ +static const char *decode_ssl_input(h2o_socket_t *sock); +static void on_write_complete(h2o_socket_t *sock, const char *err); + +#if H2O_USE_LIBUV +#include "socket/uv-binding.c.h" +#else +#include "socket/evloop.c.h" +#endif + +h2o_buffer_mmap_settings_t h2o_socket_buffer_mmap_settings = { + 32 * 1024 * 1024, /* 32MB, should better be greater than max frame size of HTTP2 for performance reasons */ + "/tmp/h2o.b.XXXXXX"}; + +__thread h2o_buffer_prototype_t h2o_socket_buffer_prototype = { + {16}, /* keep 16 recently used chunks */ + {H2O_SOCKET_INITIAL_INPUT_BUFFER_SIZE * 2}, /* minimum initial capacity */ + &h2o_socket_buffer_mmap_settings}; + +const char *h2o_socket_error_out_of_memory = "out of memory"; +const char *h2o_socket_error_io = "I/O error"; +const char *h2o_socket_error_closed = "socket closed by peer"; +const char *h2o_socket_error_conn_fail = "connection failure"; +const char *h2o_socket_error_ssl_no_cert = "no certificate"; +const char *h2o_socket_error_ssl_cert_invalid = "invalid certificate"; +const char *h2o_socket_error_ssl_cert_name_mismatch = "certificate name mismatch"; +const char *h2o_socket_error_ssl_decode = "SSL decode error"; + +static void (*resumption_get_async)(h2o_socket_t *sock, h2o_iovec_t session_id); +static void (*resumption_new)(h2o_iovec_t session_id, h2o_iovec_t session_data); + +static int read_bio(BIO *b, char *out, int len) +{ + h2o_socket_t *sock = BIO_get_data(b); + + if (len == 0) + return 0; + + if (sock->ssl->input.encrypted->size == 0) { + BIO_set_retry_read(b); + return -1; + } + + if (sock->ssl->input.encrypted->size < len) { + len = (int)sock->ssl->input.encrypted->size; + } + memcpy(out, sock->ssl->input.encrypted->bytes, len); + h2o_buffer_consume(&sock->ssl->input.encrypted, len); + + return len; +} + +static void write_ssl_bytes(h2o_socket_t *sock, const void *in, size_t len) +{ + if (len != 0) { + void *bytes_alloced = h2o_mem_alloc_pool(&sock->ssl->output.pool, len); + memcpy(bytes_alloced, in, len); + h2o_vector_reserve(&sock->ssl->output.pool, &sock->ssl->output.bufs, sock->ssl->output.bufs.size + 1); + sock->ssl->output.bufs.entries[sock->ssl->output.bufs.size++] = h2o_iovec_init(bytes_alloced, len); + } +} + +static int write_bio(BIO *b, const char *in, int len) +{ + h2o_socket_t *sock = BIO_get_data(b); + + /* FIXME no support for SSL renegotiation (yet) */ + if (sock->ssl->did_write_in_read != NULL) { + *sock->ssl->did_write_in_read = 1; + return -1; + } + + write_ssl_bytes(sock, in, len); + return len; +} + +static int puts_bio(BIO *b, const char *str) +{ + return write_bio(b, str, (int)strlen(str)); +} + +static long ctrl_bio(BIO *b, int cmd, long num, void *ptr) +{ + switch (cmd) { + case BIO_CTRL_GET_CLOSE: + return BIO_get_shutdown(b); + case BIO_CTRL_SET_CLOSE: + BIO_set_shutdown(b, (int)num); + return 1; + case BIO_CTRL_FLUSH: + return 1; + default: + return 0; + } +} + +static void setup_bio(h2o_socket_t *sock) +{ + static BIO_METHOD *bio_methods = NULL; + if (bio_methods == NULL) { + static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&init_lock); + if (bio_methods == NULL) { + BIO_METHOD *biom = BIO_meth_new(BIO_TYPE_FD, "h2o_socket"); + BIO_meth_set_write(biom, write_bio); + BIO_meth_set_read(biom, read_bio); + BIO_meth_set_puts(biom, puts_bio); + BIO_meth_set_ctrl(biom, ctrl_bio); + __sync_synchronize(); + bio_methods = biom; + } + pthread_mutex_unlock(&init_lock); + } + + BIO *bio = BIO_new(bio_methods); + if (bio == NULL) + h2o_fatal("no memory"); + BIO_set_data(bio, sock); + BIO_set_init(bio, 1); + SSL_set_bio(sock->ssl->ossl, bio, bio); +} + +const char *decode_ssl_input(h2o_socket_t *sock) +{ + assert(sock->ssl != NULL); + assert(sock->ssl->handshake.cb == NULL); + +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + if (sock->ssl->input.encrypted->size != 0) { + const char *src = sock->ssl->input.encrypted->bytes, *src_end = src + sock->ssl->input.encrypted->size; + h2o_iovec_t reserved; + ptls_buffer_t rbuf; + int ret; + if ((reserved = h2o_buffer_reserve(&sock->input, sock->ssl->input.encrypted->size)).base == NULL) + return h2o_socket_error_out_of_memory; + ptls_buffer_init(&rbuf, reserved.base, reserved.len); + do { + size_t consumed = src_end - src; + if ((ret = ptls_receive(sock->ssl->ptls, &rbuf, src, &consumed)) != 0) + break; + src += consumed; + } while (src != src_end); + h2o_buffer_consume(&sock->ssl->input.encrypted, sock->ssl->input.encrypted->size - (src_end - src)); + if (rbuf.is_allocated) { + if ((reserved = h2o_buffer_reserve(&sock->input, rbuf.off)).base == NULL) + return h2o_socket_error_out_of_memory; + memcpy(reserved.base, rbuf.base, rbuf.off); + sock->input->size += rbuf.off; + ptls_buffer_dispose(&rbuf); + } else { + sock->input->size += rbuf.off; + } + if (!(ret == 0 || ret == PTLS_ERROR_IN_PROGRESS)) + return h2o_socket_error_ssl_decode; + } + return NULL; + } +#endif + + while (sock->ssl->input.encrypted->size != 0 || SSL_pending(sock->ssl->ossl)) { + int rlen; + h2o_iovec_t buf = h2o_buffer_reserve(&sock->input, 4096); + if (buf.base == NULL) + return h2o_socket_error_out_of_memory; + { /* call SSL_read (while detecting SSL renegotiation and reporting it as error) */ + int did_write_in_read = 0; + sock->ssl->did_write_in_read = &did_write_in_read; + ERR_clear_error(); + rlen = SSL_read(sock->ssl->ossl, buf.base, (int)buf.len); + sock->ssl->did_write_in_read = NULL; + if (did_write_in_read) + return "ssl renegotiation not supported"; + } + if (rlen == -1) { + if (SSL_get_error(sock->ssl->ossl, rlen) != SSL_ERROR_WANT_READ) { + return h2o_socket_error_ssl_decode; + } + break; + } else if (rlen == 0) { + break; + } else { + sock->input->size += rlen; + } + } + + return 0; +} + +static void flush_pending_ssl(h2o_socket_t *sock, h2o_socket_cb cb) +{ + do_write(sock, sock->ssl->output.bufs.entries, sock->ssl->output.bufs.size, cb); +} + +static void clear_output_buffer(struct st_h2o_socket_ssl_t *ssl) +{ + memset(&ssl->output.bufs, 0, sizeof(ssl->output.bufs)); + h2o_mem_clear_pool(&ssl->output.pool); +} + +static void destroy_ssl(struct st_h2o_socket_ssl_t *ssl) +{ +#if H2O_USE_PICOTLS + if (ssl->ptls != NULL) { + ptls_free(ssl->ptls); + ssl->ptls = NULL; + } +#endif + if (ssl->ossl != NULL) { + if (!SSL_is_server(ssl->ossl)) { + free(ssl->handshake.client.server_name); + free(ssl->handshake.client.session_cache_key.base); + } + SSL_free(ssl->ossl); + ssl->ossl = NULL; + } + h2o_buffer_dispose(&ssl->input.encrypted); + clear_output_buffer(ssl); + free(ssl); +} + +static void dispose_socket(h2o_socket_t *sock, const char *err) +{ + void (*close_cb)(void *data); + void *close_cb_data; + + if (sock->ssl != NULL) { + destroy_ssl(sock->ssl); + sock->ssl = NULL; + } + h2o_buffer_dispose(&sock->input); + if (sock->_peername != NULL) { + free(sock->_peername); + sock->_peername = NULL; + } + + close_cb = sock->on_close.cb; + close_cb_data = sock->on_close.data; + + do_dispose_socket(sock); + + if (close_cb != NULL) + close_cb(close_cb_data); +} + +static void shutdown_ssl(h2o_socket_t *sock, const char *err) +{ + int ret; + + if (err != NULL) + goto Close; + + if (sock->_cb.write != NULL) { + /* note: libuv calls the write callback after the socket is closed by uv_close (with status set to 0 if the write succeeded) + */ + sock->_cb.write = NULL; + goto Close; + } + +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + ptls_buffer_t wbuf; + uint8_t wbuf_small[32]; + ptls_buffer_init(&wbuf, wbuf_small, sizeof(wbuf_small)); + if ((ret = ptls_send_alert(sock->ssl->ptls, &wbuf, PTLS_ALERT_LEVEL_WARNING, PTLS_ALERT_CLOSE_NOTIFY)) != 0) + goto Close; + write_ssl_bytes(sock, wbuf.base, wbuf.off); + ptls_buffer_dispose(&wbuf); + ret = 1; /* close the socket after sending close_notify */ + } else +#endif + if (sock->ssl->ossl != NULL) { + ERR_clear_error(); + if ((ret = SSL_shutdown(sock->ssl->ossl)) == -1) + goto Close; + } else { + goto Close; + } + + if (sock->ssl->output.bufs.size != 0) { + h2o_socket_read_stop(sock); + flush_pending_ssl(sock, ret == 1 ? dispose_socket : shutdown_ssl); + } else if (ret == 2 && SSL_get_error(sock->ssl->ossl, ret) == SSL_ERROR_WANT_READ) { + h2o_socket_read_start(sock, shutdown_ssl); + } else { + goto Close; + } + + return; +Close: + dispose_socket(sock, err); +} + +void h2o_socket_dispose_export(h2o_socket_export_t *info) +{ + assert(info->fd != -1); + if (info->ssl != NULL) { + destroy_ssl(info->ssl); + info->ssl = NULL; + } + h2o_buffer_dispose(&info->input); + close(info->fd); + info->fd = -1; +} + +int h2o_socket_export(h2o_socket_t *sock, h2o_socket_export_t *info) +{ + static h2o_buffer_prototype_t nonpooling_prototype; + + assert(!h2o_socket_is_writing(sock)); + + if (do_export(sock, info) == -1) + return -1; + + if ((info->ssl = sock->ssl) != NULL) { + sock->ssl = NULL; + h2o_buffer_set_prototype(&info->ssl->input.encrypted, &nonpooling_prototype); + } + info->input = sock->input; + h2o_buffer_set_prototype(&info->input, &nonpooling_prototype); + h2o_buffer_init(&sock->input, &h2o_socket_buffer_prototype); + + h2o_socket_close(sock); + + return 0; +} + +h2o_socket_t *h2o_socket_import(h2o_loop_t *loop, h2o_socket_export_t *info) +{ + h2o_socket_t *sock; + + assert(info->fd != -1); + + sock = do_import(loop, info); + info->fd = -1; /* just in case */ + if ((sock->ssl = info->ssl) != NULL) { + setup_bio(sock); + h2o_buffer_set_prototype(&sock->ssl->input.encrypted, &h2o_socket_buffer_prototype); + } + sock->input = info->input; + h2o_buffer_set_prototype(&sock->input, &h2o_socket_buffer_prototype); + return sock; +} + +void h2o_socket_close(h2o_socket_t *sock) +{ + if (sock->ssl == NULL) { + dispose_socket(sock, 0); + } else { + shutdown_ssl(sock, 0); + } +} + +static uint16_t calc_suggested_tls_payload_size(h2o_socket_t *sock, uint16_t suggested_tls_record_size) +{ + uint16_t ps = suggested_tls_record_size; + if (sock->ssl != NULL && sock->ssl->record_overhead < ps) + ps -= sock->ssl->record_overhead; + return ps; +} + +static void disable_latency_optimized_write(h2o_socket_t *sock, int (*adjust_notsent_lowat)(h2o_socket_t *, unsigned)) +{ + if (sock->_latency_optimization.notsent_is_minimized) { + adjust_notsent_lowat(sock, 0); + sock->_latency_optimization.notsent_is_minimized = 0; + } + sock->_latency_optimization.state = H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_DISABLED; + sock->_latency_optimization.suggested_tls_payload_size = 16384; + sock->_latency_optimization.suggested_write_size = SIZE_MAX; +} + +static inline void prepare_for_latency_optimized_write(h2o_socket_t *sock, + const h2o_socket_latency_optimization_conditions_t *conditions, uint32_t rtt, + uint32_t mss, uint32_t cwnd_size, uint32_t cwnd_avail, uint64_t loop_time, + int (*adjust_notsent_lowat)(h2o_socket_t *, unsigned)) +{ + /* check RTT */ + if (rtt < conditions->min_rtt * (uint64_t)1000) + goto Disable; + if (rtt * conditions->max_additional_delay < loop_time * 1000 * 100) + goto Disable; + + /* latency-optimization is enabled */ + sock->_latency_optimization.state = H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_DETERMINED; + + /* no need to: + * 1) adjust the write size if single_write_size << cwnd_size + * 2) align TLS record boundary to TCP packet boundary if packet loss-rate is low and BW isn't small (implied by cwnd size) + */ + if (mss * cwnd_size < conditions->max_cwnd) { + if (!sock->_latency_optimization.notsent_is_minimized) { + if (adjust_notsent_lowat(sock, 1 /* cannot be set to zero on Linux */) != 0) + goto Disable; + sock->_latency_optimization.notsent_is_minimized = 1; + } + sock->_latency_optimization.suggested_tls_payload_size = calc_suggested_tls_payload_size(sock, mss); + sock->_latency_optimization.suggested_write_size = + cwnd_avail * (size_t)sock->_latency_optimization.suggested_tls_payload_size; + } else { + if (sock->_latency_optimization.notsent_is_minimized) { + if (adjust_notsent_lowat(sock, 0) != 0) + goto Disable; + sock->_latency_optimization.notsent_is_minimized = 0; + } + sock->_latency_optimization.suggested_tls_payload_size = 16384; + sock->_latency_optimization.suggested_write_size = SIZE_MAX; + } + return; + +Disable: + disable_latency_optimized_write(sock, adjust_notsent_lowat); +} + +/** + * Obtains RTT, MSS, size of CWND (in the number of packets). + * Also writes to cwnd_avail minimum number of packets (of MSS size) sufficient to shut up poll-for-write under the precondition + * that TCP_NOTSENT_LOWAT is set to 1. + */ +static int obtain_tcp_info(int fd, uint32_t *rtt, uint32_t *mss, uint32_t *cwnd_size, uint32_t *cwnd_avail) +{ +#define CALC_CWND_PAIR_FROM_BYTE_UNITS(cwnd_bytes, inflight_bytes) \ + do { \ + *cwnd_size = (cwnd_bytes + *mss / 2) / *mss; \ + *cwnd_avail = cwnd_bytes > inflight_bytes ? (cwnd_bytes - inflight_bytes) / *mss + 2 : 2; \ + } while (0) + +#if defined(__linux__) && defined(TCP_INFO) + + struct tcp_info tcpi; + socklen_t tcpisz = sizeof(tcpi); + if (getsockopt(fd, IPPROTO_TCP, TCP_INFO, &tcpi, &tcpisz) != 0) + return -1; + *rtt = tcpi.tcpi_rtt; + *mss = tcpi.tcpi_snd_mss; + *cwnd_size = tcpi.tcpi_snd_cwnd; + *cwnd_avail = tcpi.tcpi_snd_cwnd > tcpi.tcpi_unacked ? tcpi.tcpi_snd_cwnd - tcpi.tcpi_unacked + 2 : 2; + return 0; + +#elif defined(__FreeBSD__) && defined(TCP_INFO) && 0 /* disabled since we wouldn't use it anyways; OS lacks TCP_NOTSENT_LOWAT */ + + struct tcp_info tcpi; + socklen_t tcpisz = sizeof(tcpi); + int bytes_inflight; + if (getsockopt(fd, IPPROTO_TCP, TCP_INFO, &tcpi, &tcpisz) != 0 || ioctl(fd, FIONWRITE, &bytes_inflight) == -1) + return -1; + *rtt = tcpi.tcpi_rtt; + *mss = tcpi.tcpi_snd_mss; + CALC_CWND_PAIR_FROM_BYTE_UNITS(tcpi.tcpi_snd_cwnd, bytes_inflight); + return 0; + +#elif defined(__APPLE__) && defined(TCP_CONNECTION_INFO) + + struct tcp_connection_info tcpi; + socklen_t tcpisz = sizeof(tcpi); + if (getsockopt(fd, IPPROTO_TCP, TCP_CONNECTION_INFO, &tcpi, &tcpisz) != 0 || tcpi.tcpi_maxseg == 0) + return -1; + *rtt = tcpi.tcpi_srtt * 1000; + *mss = tcpi.tcpi_maxseg; + CALC_CWND_PAIR_FROM_BYTE_UNITS(tcpi.tcpi_snd_cwnd, tcpi.tcpi_snd_sbbytes); + return 0; + +#else + /* TODO add support for NetBSD; note that the OS returns the number of packets for tcpi_snd_cwnd; see + * http://twitter.com/n_soda/status/740719125878575105 + */ + return -1; +#endif + +#undef CALC_CWND_PAIR_FROM_BYTE_UNITS +} + +#ifdef TCP_NOTSENT_LOWAT +static int adjust_notsent_lowat(h2o_socket_t *sock, unsigned notsent_lowat) +{ + return setsockopt(h2o_socket_get_fd(sock), IPPROTO_TCP, TCP_NOTSENT_LOWAT, ¬sent_lowat, sizeof(notsent_lowat)); +} +#else +#define adjust_notsent_lowat NULL +#endif + +size_t h2o_socket_do_prepare_for_latency_optimized_write(h2o_socket_t *sock, + const h2o_socket_latency_optimization_conditions_t *conditions) +{ + uint32_t rtt = 0, mss = 0, cwnd_size = 0, cwnd_avail = 0; + uint64_t loop_time = UINT64_MAX; + int can_prepare = 1; + +#if !defined(TCP_NOTSENT_LOWAT) + /* the feature cannot be setup unless TCP_NOTSENT_LOWAT is available */ + can_prepare = 0; +#endif + +#if H2O_USE_LIBUV + /* poll-then-write is impossible with libuv */ + can_prepare = 0; +#else + if (can_prepare) + loop_time = h2o_evloop_get_execution_time(h2o_socket_get_loop(sock)); +#endif + + /* obtain TCP states */ + if (can_prepare && obtain_tcp_info(h2o_socket_get_fd(sock), &rtt, &mss, &cwnd_size, &cwnd_avail) != 0) + can_prepare = 0; + + /* determine suggested_write_size, suggested_tls_record_size and adjust TCP_NOTSENT_LOWAT based on the obtained information */ + if (can_prepare) { + prepare_for_latency_optimized_write(sock, conditions, rtt, mss, cwnd_size, cwnd_avail, loop_time, adjust_notsent_lowat); + } else { + disable_latency_optimized_write(sock, adjust_notsent_lowat); + } + + return sock->_latency_optimization.suggested_write_size; + +#undef CALC_CWND_PAIR_FROM_BYTE_UNITS +} + +void h2o_socket_write(h2o_socket_t *sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb) +{ + size_t i, prev_bytes_written = sock->bytes_written; + + for (i = 0; i != bufcnt; ++i) { + sock->bytes_written += bufs[i].len; +#if H2O_SOCKET_DUMP_WRITE + fprintf(stderr, "writing %zu bytes to fd:%d\n", bufs[i].len, h2o_socket_get_fd(sock)); + h2o_dump_memory(stderr, bufs[i].base, bufs[i].len); +#endif + } + + if (sock->ssl == NULL) { + do_write(sock, bufs, bufcnt, cb); + } else { + assert(sock->ssl->output.bufs.size == 0); + /* fill in the data */ + size_t ssl_record_size; + switch (sock->_latency_optimization.state) { + case H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_TBD: + case H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_DISABLED: + ssl_record_size = prev_bytes_written < 200 * 1024 ? calc_suggested_tls_payload_size(sock, 1400) : 16384; + break; + case H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_DETERMINED: + sock->_latency_optimization.state = H2O_SOCKET_LATENCY_OPTIMIZATION_STATE_NEEDS_UPDATE; + /* fallthru */ + default: + ssl_record_size = sock->_latency_optimization.suggested_tls_payload_size; + break; + } + for (; bufcnt != 0; ++bufs, --bufcnt) { + size_t off = 0; + while (off != bufs[0].len) { + int ret; + size_t sz = bufs[0].len - off; + if (sz > ssl_record_size) + sz = ssl_record_size; +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + size_t dst_size = sz + ptls_get_record_overhead(sock->ssl->ptls); + void *dst = h2o_mem_alloc_pool(&sock->ssl->output.pool, dst_size); + ptls_buffer_t wbuf; + ptls_buffer_init(&wbuf, dst, dst_size); + ret = ptls_send(sock->ssl->ptls, &wbuf, bufs[0].base + off, sz); + assert(ret == 0); + assert(!wbuf.is_allocated); + h2o_vector_reserve(&sock->ssl->output.pool, &sock->ssl->output.bufs, sock->ssl->output.bufs.size + 1); + sock->ssl->output.bufs.entries[sock->ssl->output.bufs.size++] = h2o_iovec_init(dst, wbuf.off); + } else +#endif + { + ret = SSL_write(sock->ssl->ossl, bufs[0].base + off, (int)sz); + if (ret != sz) { + /* The error happens if SSL_write is called after SSL_read returns a fatal error (e.g. due to corrupt TCP + * packet being received). We need to take care of this since some protocol implementations send data after + * the read-side of the connection gets closed (note that protocol implementations are (yet) incapable of + * distinguishing a normal shutdown and close due to an error using the `status` value of the read + * callback). + */ + clear_output_buffer(sock->ssl); + flush_pending_ssl(sock, cb); +#ifndef H2O_USE_LIBUV + ((struct st_h2o_evloop_socket_t *)sock)->_flags |= H2O_SOCKET_FLAG_IS_WRITE_ERROR; +#endif + return; + } + } + off += sz; + } + } + flush_pending_ssl(sock, cb); + } +} + +void on_write_complete(h2o_socket_t *sock, const char *err) +{ + h2o_socket_cb cb; + + if (sock->ssl != NULL) + clear_output_buffer(sock->ssl); + + cb = sock->_cb.write; + sock->_cb.write = NULL; + cb(sock, err); +} + +void h2o_socket_read_start(h2o_socket_t *sock, h2o_socket_cb cb) +{ + sock->_cb.read = cb; + do_read_start(sock); +} + +void h2o_socket_read_stop(h2o_socket_t *sock) +{ + sock->_cb.read = NULL; + do_read_stop(sock); +} + +void h2o_socket_setpeername(h2o_socket_t *sock, struct sockaddr *sa, socklen_t len) +{ + if (sock->_peername != NULL) + free(sock->_peername); + sock->_peername = h2o_mem_alloc(offsetof(struct st_h2o_socket_peername_t, addr) + len); + sock->_peername->len = len; + memcpy(&sock->_peername->addr, sa, len); +} + +socklen_t h2o_socket_getpeername(h2o_socket_t *sock, struct sockaddr *sa) +{ + /* return cached, if exists */ + if (sock->_peername != NULL) { + memcpy(sa, &sock->_peername->addr, sock->_peername->len); + return sock->_peername->len; + } + /* call, copy to cache, and return */ + socklen_t len = get_peername_uncached(sock, sa); + h2o_socket_setpeername(sock, sa, len); + return len; +} + +const char *h2o_socket_get_ssl_protocol_version(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) + return "TLSv1.3"; +#endif + if (sock->ssl->ossl != NULL) + return SSL_get_version(sock->ssl->ossl); + } + return NULL; +} + +int h2o_socket_get_ssl_session_reused(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) + return ptls_is_psk_handshake(sock->ssl->ptls); +#endif + if (sock->ssl->ossl != NULL) + return (int)SSL_session_reused(sock->ssl->ossl); + } + return -1; +} + +const char *h2o_socket_get_ssl_cipher(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + ptls_cipher_suite_t *cipher = ptls_get_cipher(sock->ssl->ptls); + if (cipher != NULL) + return cipher->aead->name; + } else +#endif + if (sock->ssl->ossl != NULL) + return SSL_get_cipher_name(sock->ssl->ossl); + } + return NULL; +} + +int h2o_socket_get_ssl_cipher_bits(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + ptls_cipher_suite_t *cipher = ptls_get_cipher(sock->ssl->ptls); + if (cipher == NULL) + return 0; + return (int)cipher->aead->key_size; + } else +#endif + if (sock->ssl->ossl != NULL) + return SSL_get_cipher_bits(sock->ssl->ossl, NULL); + } + return 0; +} + +h2o_iovec_t h2o_socket_get_ssl_session_id(h2o_socket_t *sock) +{ + if (sock->ssl != NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + /* FIXME */ + } else +#endif + if (sock->ssl->ossl != NULL) { + SSL_SESSION *session; + if (sock->ssl->handshake.server.async_resumption.state == ASYNC_RESUMPTION_STATE_COMPLETE && + (session = SSL_get_session(sock->ssl->ossl)) != NULL) { + unsigned id_len; + const unsigned char *id = SSL_SESSION_get_id(session, &id_len); + return h2o_iovec_init(id, id_len); + } + } + } + + return h2o_iovec_init(NULL, 0); +} + +h2o_iovec_t h2o_socket_log_ssl_session_id(h2o_socket_t *sock, h2o_mem_pool_t *pool) +{ + h2o_iovec_t base64id, rawid = h2o_socket_get_ssl_session_id(sock); + + if (rawid.base == NULL) + return h2o_iovec_init(NULL, 0); + + base64id.base = pool != NULL ? h2o_mem_alloc_pool(pool, h2o_base64_encode_capacity(rawid.len)) + : h2o_mem_alloc(h2o_base64_encode_capacity(rawid.len)); + base64id.len = h2o_base64_encode(base64id.base, rawid.base, rawid.len, 1); + return base64id; +} + +h2o_iovec_t h2o_socket_log_ssl_cipher_bits(h2o_socket_t *sock, h2o_mem_pool_t *pool) +{ + int bits = h2o_socket_get_ssl_cipher_bits(sock); + if (bits != 0) { + char *s = (char *)(pool != NULL ? h2o_mem_alloc_pool(pool, sizeof(H2O_INT16_LONGEST_STR)) + : h2o_mem_alloc(sizeof(H2O_INT16_LONGEST_STR))); + size_t len = sprintf(s, "%" PRId16, (int16_t)bits); + return h2o_iovec_init(s, len); + } else { + return h2o_iovec_init(NULL, 0); + } +} + +int h2o_socket_compare_address(struct sockaddr *x, struct sockaddr *y) +{ +#define CMP(a, b) \ + if (a != b) \ + return a < b ? -1 : 1 + + CMP(x->sa_family, y->sa_family); + + if (x->sa_family == AF_UNIX) { + struct sockaddr_un *xun = (void *)x, *yun = (void *)y; + int r = strcmp(xun->sun_path, yun->sun_path); + if (r != 0) + return r; + } else if (x->sa_family == AF_INET) { + struct sockaddr_in *xin = (void *)x, *yin = (void *)y; + CMP(ntohl(xin->sin_addr.s_addr), ntohl(yin->sin_addr.s_addr)); + CMP(ntohs(xin->sin_port), ntohs(yin->sin_port)); + } else if (x->sa_family == AF_INET6) { + struct sockaddr_in6 *xin6 = (void *)x, *yin6 = (void *)y; + int r = memcmp(xin6->sin6_addr.s6_addr, yin6->sin6_addr.s6_addr, sizeof(xin6->sin6_addr.s6_addr)); + if (r != 0) + return r; + CMP(ntohs(xin6->sin6_port), ntohs(yin6->sin6_port)); + CMP(xin6->sin6_flowinfo, yin6->sin6_flowinfo); + CMP(xin6->sin6_scope_id, yin6->sin6_scope_id); + } else { + assert(!"unknown sa_family"); + } + +#undef CMP + return 0; +} + +size_t h2o_socket_getnumerichost(struct sockaddr *sa, socklen_t salen, char *buf) +{ + if (sa->sa_family == AF_INET) { + /* fast path for IPv4 addresses */ + struct sockaddr_in *sin = (void *)sa; + uint32_t addr; + addr = htonl(sin->sin_addr.s_addr); + return sprintf(buf, "%d.%d.%d.%d", addr >> 24, (addr >> 16) & 255, (addr >> 8) & 255, addr & 255); + } + + if (getnameinfo(sa, salen, buf, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) != 0) + return SIZE_MAX; + return strlen(buf); +} + +int32_t h2o_socket_getport(struct sockaddr *sa) +{ + switch (sa->sa_family) { + case AF_INET: + return htons(((struct sockaddr_in *)sa)->sin_port); + case AF_INET6: + return htons(((struct sockaddr_in6 *)sa)->sin6_port); + default: + return -1; + } +} + +static void create_ossl(h2o_socket_t *sock) +{ + sock->ssl->ossl = SSL_new(sock->ssl->ssl_ctx); + setup_bio(sock); +} + +static SSL_SESSION *on_async_resumption_get(SSL *ssl, +#if OPENSSL_VERSION_NUMBER >= 0x1010000fL && !defined(LIBRESSL_VERSION_NUMBER) + const +#endif + unsigned char *data, + int len, int *copy) +{ + h2o_socket_t *sock = BIO_get_data(SSL_get_rbio(ssl)); + + switch (sock->ssl->handshake.server.async_resumption.state) { + case ASYNC_RESUMPTION_STATE_RECORD: + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_REQUEST_SENT; + resumption_get_async(sock, h2o_iovec_init(data, len)); + return NULL; + case ASYNC_RESUMPTION_STATE_COMPLETE: + *copy = 1; + return sock->ssl->handshake.server.async_resumption.session_data; + default: + assert(!"FIXME"); + return NULL; + } +} + +static int on_async_resumption_new(SSL *ssl, SSL_SESSION *session) +{ + h2o_iovec_t data; + const unsigned char *id; + unsigned id_len; + unsigned char *p; + + /* build data */ + data.len = i2d_SSL_SESSION(session, NULL); + data.base = alloca(data.len); + p = (void *)data.base; + i2d_SSL_SESSION(session, &p); + + id = SSL_SESSION_get_id(session, &id_len); + resumption_new(h2o_iovec_init(id, id_len), data); + return 0; +} + +static void on_handshake_complete(h2o_socket_t *sock, const char *err) +{ + if (err == NULL) { +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + sock->ssl->record_overhead = ptls_get_record_overhead(sock->ssl->ptls); + } else +#endif + { + const SSL_CIPHER *cipher = SSL_get_current_cipher(sock->ssl->ossl); + switch (SSL_CIPHER_get_id(cipher)) { + case TLS1_CK_RSA_WITH_AES_128_GCM_SHA256: + case TLS1_CK_DHE_RSA_WITH_AES_128_GCM_SHA256: + case TLS1_CK_ECDHE_RSA_WITH_AES_128_GCM_SHA256: + case TLS1_CK_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: + case TLS1_CK_RSA_WITH_AES_256_GCM_SHA384: + case TLS1_CK_DHE_RSA_WITH_AES_256_GCM_SHA384: + case TLS1_CK_ECDHE_RSA_WITH_AES_256_GCM_SHA384: + case TLS1_CK_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384: + sock->ssl->record_overhead = 5 /* header */ + 8 /* record_iv_length (RFC 5288 3) */ + 16 /* tag (RFC 5116 5.1) */; + break; +#if defined(TLS1_CK_DHE_RSA_CHACHA20_POLY1305) + case TLS1_CK_DHE_RSA_CHACHA20_POLY1305: + case TLS1_CK_ECDHE_RSA_CHACHA20_POLY1305: + case TLS1_CK_ECDHE_ECDSA_CHACHA20_POLY1305: + sock->ssl->record_overhead = 5 /* header */ + 16 /* tag */; + break; +#endif + default: + sock->ssl->record_overhead = 32; /* sufficiently large number that can hold most payloads */ + break; + } + } + } + + /* set ssl session into the cache */ + if (sock->ssl->ossl != NULL && !SSL_is_server(sock->ssl->ossl) && sock->ssl->handshake.client.session_cache != NULL) { + if (err == NULL || err == h2o_socket_error_ssl_cert_name_mismatch) { + SSL_SESSION *session = SSL_get1_session(sock->ssl->ossl); + h2o_cache_set(sock->ssl->handshake.client.session_cache, h2o_now(h2o_socket_get_loop(sock)), + sock->ssl->handshake.client.session_cache_key, sock->ssl->handshake.client.session_cache_key_hash, + h2o_iovec_init(session, 1)); + } + } + + h2o_socket_cb handshake_cb = sock->ssl->handshake.cb; + sock->_cb.write = NULL; + sock->ssl->handshake.cb = NULL; + if (err == NULL) + decode_ssl_input(sock); + handshake_cb(sock, err); +} + +static void proceed_handshake(h2o_socket_t *sock, const char *err) +{ + h2o_iovec_t first_input = {NULL}; + int ret = 0; + + sock->_cb.write = NULL; + + if (err != NULL) { + goto Complete; + } + + if (sock->ssl->ossl == NULL) { +#if H2O_USE_PICOTLS + /* prepare I/O */ + size_t consumed = sock->ssl->input.encrypted->size; + ptls_buffer_t wbuf; + ptls_buffer_init(&wbuf, "", 0); + + if (sock->ssl->ptls != NULL) { + /* picotls in action, proceed the handshake */ + ret = ptls_handshake(sock->ssl->ptls, &wbuf, sock->ssl->input.encrypted->bytes, &consumed, NULL); + } else { + /* start using picotls if the first packet contains TLS 1.3 CH */ + ptls_context_t *ptls_ctx = h2o_socket_ssl_get_picotls_context(sock->ssl->ssl_ctx); + if (ptls_ctx != NULL) { + ptls_t *ptls = ptls_new(ptls_ctx, 1); + if (ptls == NULL) + h2o_fatal("no memory"); + ret = ptls_handshake(ptls, &wbuf, sock->ssl->input.encrypted->bytes, &consumed, NULL); + if ((ret == 0 || ret == PTLS_ERROR_IN_PROGRESS) && wbuf.off != 0) { + sock->ssl->ptls = ptls; + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_COMPLETE; + } else { + ptls_free(ptls); + } + } + } + + if (sock->ssl->ptls != NULL) { + /* complete I/O done by picotls */ + h2o_buffer_consume(&sock->ssl->input.encrypted, consumed); + switch (ret) { + case 0: + case PTLS_ERROR_IN_PROGRESS: + if (wbuf.off != 0) { + h2o_socket_read_stop(sock); + write_ssl_bytes(sock, wbuf.base, wbuf.off); + flush_pending_ssl(sock, ret == 0 ? on_handshake_complete : proceed_handshake); + } else { + h2o_socket_read_start(sock, proceed_handshake); + } + break; + default: + /* FIXME send alert in wbuf before calling the callback */ + on_handshake_complete(sock, "picotls handshake error"); + break; + } + ptls_buffer_dispose(&wbuf); + return; + } + ptls_buffer_dispose(&wbuf); +#endif + + /* fallback to openssl if the attempt failed */ + create_ossl(sock); + } + + if (sock->ssl->ossl != NULL && SSL_is_server(sock->ssl->ossl) && + sock->ssl->handshake.server.async_resumption.state == ASYNC_RESUMPTION_STATE_RECORD) { + if (sock->ssl->input.encrypted->size <= 1024) { + /* retain a copy of input if performing async resumption */ + first_input = h2o_iovec_init(alloca(sock->ssl->input.encrypted->size), sock->ssl->input.encrypted->size); + memcpy(first_input.base, sock->ssl->input.encrypted->bytes, first_input.len); + } else { + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_COMPLETE; + } + } + +Redo: + ERR_clear_error(); + if (SSL_is_server(sock->ssl->ossl)) { + ret = SSL_accept(sock->ssl->ossl); + switch (sock->ssl->handshake.server.async_resumption.state) { + case ASYNC_RESUMPTION_STATE_COMPLETE: + break; + case ASYNC_RESUMPTION_STATE_RECORD: + /* async resumption has not been triggered; proceed the state to complete */ + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_COMPLETE; + break; + case ASYNC_RESUMPTION_STATE_REQUEST_SENT: { + /* sent async request, reset the ssl state, and wait for async response */ + assert(ret < 0); + SSL_free(sock->ssl->ossl); + create_ossl(sock); + clear_output_buffer(sock->ssl); + h2o_buffer_consume(&sock->ssl->input.encrypted, sock->ssl->input.encrypted->size); + h2o_buffer_reserve(&sock->ssl->input.encrypted, first_input.len); + memcpy(sock->ssl->input.encrypted->bytes, first_input.base, first_input.len); + sock->ssl->input.encrypted->size = first_input.len; + h2o_socket_read_stop(sock); + return; + } + default: + h2o_fatal("unexpected async resumption state"); + break; + } + } else { + ret = SSL_connect(sock->ssl->ossl); + } + + if (ret == 0 || (ret < 0 && SSL_get_error(sock->ssl->ossl, ret) != SSL_ERROR_WANT_READ)) { + /* failed */ + long verify_result = SSL_get_verify_result(sock->ssl->ossl); + if (verify_result != X509_V_OK) { + err = X509_verify_cert_error_string(verify_result); + } else { + err = "ssl handshake failure"; + } + goto Complete; + } + + if (sock->ssl->output.bufs.size != 0) { + h2o_socket_read_stop(sock); + flush_pending_ssl(sock, ret == 1 ? on_handshake_complete : proceed_handshake); + } else { + if (ret == 1) { + if (!SSL_is_server(sock->ssl->ossl)) { + X509 *cert = SSL_get_peer_certificate(sock->ssl->ossl); + if (cert != NULL) { + switch (validate_hostname(sock->ssl->handshake.client.server_name, cert)) { + case MatchFound: + /* ok */ + break; + case MatchNotFound: + err = h2o_socket_error_ssl_cert_name_mismatch; + break; + default: + err = h2o_socket_error_ssl_cert_invalid; + break; + } + X509_free(cert); + } else { + err = h2o_socket_error_ssl_no_cert; + } + } + goto Complete; + } + if (sock->ssl->input.encrypted->size != 0) + goto Redo; + h2o_socket_read_start(sock, proceed_handshake); + } + return; + +Complete: + h2o_socket_read_stop(sock); + on_handshake_complete(sock, err); +} + +void h2o_socket_ssl_handshake(h2o_socket_t *sock, SSL_CTX *ssl_ctx, const char *server_name, h2o_socket_cb handshake_cb) +{ + sock->ssl = h2o_mem_alloc(sizeof(*sock->ssl)); + memset(sock->ssl, 0, offsetof(struct st_h2o_socket_ssl_t, output.pool)); + + sock->ssl->ssl_ctx = ssl_ctx; + + /* setup the buffers; sock->input should be empty, sock->ssl->input.encrypted should contain the initial input, if any */ + h2o_buffer_init(&sock->ssl->input.encrypted, &h2o_socket_buffer_prototype); + if (sock->input->size != 0) { + h2o_buffer_t *tmp = sock->input; + sock->input = sock->ssl->input.encrypted; + sock->ssl->input.encrypted = tmp; + } + + h2o_mem_init_pool(&sock->ssl->output.pool); + + sock->ssl->handshake.cb = handshake_cb; + if (server_name == NULL) { + /* is server */ + if (SSL_CTX_sess_get_get_cb(sock->ssl->ssl_ctx) != NULL) + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_RECORD; + if (sock->ssl->input.encrypted->size != 0) + proceed_handshake(sock, 0); + else + h2o_socket_read_start(sock, proceed_handshake); + } else { + create_ossl(sock); + h2o_cache_t *session_cache = h2o_socket_ssl_get_session_cache(sock->ssl->ssl_ctx); + if (session_cache != NULL) { + struct sockaddr_storage sa; + int32_t port; + if (h2o_socket_getpeername(sock, (struct sockaddr *)&sa) != 0 && + (port = h2o_socket_getport((struct sockaddr *)&sa)) != -1) { + /* session cache is available */ + h2o_iovec_t session_cache_key; + session_cache_key.base = h2o_mem_alloc(strlen(server_name) + sizeof(":" H2O_UINT16_LONGEST_STR)); + session_cache_key.len = sprintf(session_cache_key.base, "%s:%" PRIu16, server_name, (uint16_t)port); + sock->ssl->handshake.client.session_cache = session_cache; + sock->ssl->handshake.client.session_cache_key = session_cache_key; + sock->ssl->handshake.client.session_cache_key_hash = + h2o_cache_calchash(session_cache_key.base, session_cache_key.len); + + /* fetch from session cache */ + h2o_cache_ref_t *cacheref = h2o_cache_fetch(session_cache, h2o_now(h2o_socket_get_loop(sock)), + sock->ssl->handshake.client.session_cache_key, + sock->ssl->handshake.client.session_cache_key_hash); + if (cacheref != NULL) { + SSL_set_session(sock->ssl->ossl, (SSL_SESSION *)cacheref->value.base); + h2o_cache_release(session_cache, cacheref); + } + } + } + sock->ssl->handshake.client.server_name = h2o_strdup(NULL, server_name, SIZE_MAX).base; + SSL_set_tlsext_host_name(sock->ssl->ossl, sock->ssl->handshake.client.server_name); + proceed_handshake(sock, 0); + } +} + +void h2o_socket_ssl_resume_server_handshake(h2o_socket_t *sock, h2o_iovec_t session_data) +{ + if (session_data.len != 0) { + const unsigned char *p = (void *)session_data.base; + sock->ssl->handshake.server.async_resumption.session_data = d2i_SSL_SESSION(NULL, &p, (long)session_data.len); + /* FIXME warn on failure */ + } + + sock->ssl->handshake.server.async_resumption.state = ASYNC_RESUMPTION_STATE_COMPLETE; + proceed_handshake(sock, 0); + + if (sock->ssl->handshake.server.async_resumption.session_data != NULL) { + SSL_SESSION_free(sock->ssl->handshake.server.async_resumption.session_data); + sock->ssl->handshake.server.async_resumption.session_data = NULL; + } +} + +void h2o_socket_ssl_async_resumption_init(h2o_socket_ssl_resumption_get_async_cb get_async_cb, + h2o_socket_ssl_resumption_new_cb new_cb) +{ + resumption_get_async = get_async_cb; + resumption_new = new_cb; +} + +void h2o_socket_ssl_async_resumption_setup_ctx(SSL_CTX *ctx) +{ + SSL_CTX_sess_set_get_cb(ctx, on_async_resumption_get); + SSL_CTX_sess_set_new_cb(ctx, on_async_resumption_new); + /* if necessary, it is the responsibility of the caller to disable the internal cache */ +} + +#if H2O_USE_PICOTLS + +static int get_ptls_index(void) +{ + static int index = -1; + + if (index == -1) { + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&mutex); + if (index == -1) { + index = SSL_CTX_get_ex_new_index(0, NULL, NULL, NULL, NULL); + assert(index != -1); + } + pthread_mutex_unlock(&mutex); + } + + return index; +} + +ptls_context_t *h2o_socket_ssl_get_picotls_context(SSL_CTX *ossl) +{ + return SSL_CTX_get_ex_data(ossl, get_ptls_index()); +} + +void h2o_socket_ssl_set_picotls_context(SSL_CTX *ossl, ptls_context_t *ptls) +{ + SSL_CTX_set_ex_data(ossl, get_ptls_index(), ptls); +} + +#endif + +static void on_dispose_ssl_ctx_session_cache(void *parent, void *ptr, CRYPTO_EX_DATA *ad, int idx, long argl, void *argp) +{ + h2o_cache_t *ssl_session_cache = (h2o_cache_t *)ptr; + if (ssl_session_cache != NULL) + h2o_cache_destroy(ssl_session_cache); +} + +static int get_ssl_session_cache_index(void) +{ + static int index = -1; + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&mutex); + if (index == -1) { + index = SSL_CTX_get_ex_new_index(0, NULL, NULL, NULL, on_dispose_ssl_ctx_session_cache); + assert(index != -1); + } + pthread_mutex_unlock(&mutex); + return index; +} + +h2o_cache_t *h2o_socket_ssl_get_session_cache(SSL_CTX *ctx) +{ + return (h2o_cache_t *)SSL_CTX_get_ex_data(ctx, get_ssl_session_cache_index()); +} + +void h2o_socket_ssl_set_session_cache(SSL_CTX *ctx, h2o_cache_t *cache) +{ + SSL_CTX_set_ex_data(ctx, get_ssl_session_cache_index(), cache); +} + +void h2o_socket_ssl_destroy_session_cache_entry(h2o_iovec_t value) +{ + SSL_SESSION *session = (SSL_SESSION *)value.base; + SSL_SESSION_free(session); +} + +h2o_iovec_t h2o_socket_ssl_get_selected_protocol(h2o_socket_t *sock) +{ + const unsigned char *data = NULL; + unsigned len = 0; + + assert(sock->ssl != NULL); + +#if H2O_USE_PICOTLS + if (sock->ssl->ptls != NULL) { + const char *proto = ptls_get_negotiated_protocol(sock->ssl->ptls); + return proto != NULL ? h2o_iovec_init(proto, strlen(proto)) : h2o_iovec_init(NULL, 0); + } +#endif + +#if H2O_USE_ALPN + if (len == 0) + SSL_get0_alpn_selected(sock->ssl->ossl, &data, &len); +#endif +#if H2O_USE_NPN + if (len == 0) + SSL_get0_next_proto_negotiated(sock->ssl->ossl, &data, &len); +#endif + + return h2o_iovec_init(data, len); +} + +static int on_alpn_select(SSL *ssl, const unsigned char **out, unsigned char *outlen, const unsigned char *_in, unsigned int inlen, + void *_protocols) +{ + const h2o_iovec_t *protocols = _protocols; + size_t i; + + for (i = 0; protocols[i].len != 0; ++i) { + const unsigned char *in = _in, *in_end = in + inlen; + while (in != in_end) { + size_t cand_len = *in++; + if (in_end - in < cand_len) { + /* broken request */ + return SSL_TLSEXT_ERR_NOACK; + } + if (cand_len == protocols[i].len && memcmp(in, protocols[i].base, cand_len) == 0) { + goto Found; + } + in += cand_len; + } + } + /* not found */ + return SSL_TLSEXT_ERR_NOACK; + +Found: + *out = (const unsigned char *)protocols[i].base; + *outlen = (unsigned char)protocols[i].len; + return SSL_TLSEXT_ERR_OK; +} + +#if H2O_USE_ALPN + +void h2o_ssl_register_alpn_protocols(SSL_CTX *ctx, const h2o_iovec_t *protocols) +{ + SSL_CTX_set_alpn_select_cb(ctx, on_alpn_select, (void *)protocols); +} + +#endif + +#if H2O_USE_NPN + +static int on_npn_advertise(SSL *ssl, const unsigned char **out, unsigned *outlen, void *protocols) +{ + *out = protocols; + *outlen = (unsigned)strlen(protocols); + return SSL_TLSEXT_ERR_OK; +} + +void h2o_ssl_register_npn_protocols(SSL_CTX *ctx, const char *protocols) +{ + SSL_CTX_set_next_protos_advertised_cb(ctx, on_npn_advertise, (void *)protocols); +} + +#endif + +void h2o_sliding_counter_stop(h2o_sliding_counter_t *counter, uint64_t now) +{ + uint64_t elapsed; + + assert(counter->cur.start_at != 0); + + /* calculate the time used, and reset cur */ + if (now <= counter->cur.start_at) + elapsed = 0; + else + elapsed = now - counter->cur.start_at; + counter->cur.start_at = 0; + + /* adjust prev */ + counter->prev.sum += elapsed; + counter->prev.sum -= counter->prev.slots[counter->prev.index]; + counter->prev.slots[counter->prev.index] = elapsed; + if (++counter->prev.index >= sizeof(counter->prev.slots) / sizeof(counter->prev.slots[0])) + counter->prev.index = 0; + + /* recalc average */ + counter->average = counter->prev.sum / (sizeof(counter->prev.slots) / sizeof(counter->prev.slots[0])); +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop.c.h new file mode 100644 index 000000000..754ed23b9 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop.c.h @@ -0,0 +1,624 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Fastly, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdlib.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <sys/uio.h> +#include <unistd.h> +#include "cloexec.h" +#include "h2o/linklist.h" + +#if !defined(H2O_USE_ACCEPT4) +#ifdef __linux__ +#define H2O_USE_ACCEPT4 1 +#elif __FreeBSD__ >= 10 +#define H2O_USE_ACCEPT4 1 +#else +#define H2O_USE_ACCEPT4 0 +#endif +#endif + +struct st_h2o_evloop_socket_t { + h2o_socket_t super; + int fd; + int _flags; + h2o_evloop_t *loop; + struct { + size_t cnt; + h2o_iovec_t *bufs; + union { + h2o_iovec_t *alloced_ptr; + h2o_iovec_t smallbufs[4]; + }; + } _wreq; + struct st_h2o_evloop_socket_t *_next_pending; + struct st_h2o_evloop_socket_t *_next_statechanged; +}; + +static void link_to_pending(struct st_h2o_evloop_socket_t *sock); +static void write_pending(struct st_h2o_evloop_socket_t *sock); +static h2o_evloop_t *create_evloop(size_t sz); +static void update_now(h2o_evloop_t *loop); +static int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait); + +/* functions to be defined in the backends */ +static int evloop_do_proceed(h2o_evloop_t *loop, int32_t max_wait); +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock); +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock); +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock); + +#if H2O_USE_POLL || H2O_USE_EPOLL || H2O_USE_KQUEUE +/* explicitly specified */ +#else +#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) +#define H2O_USE_KQUEUE 1 +#elif defined(__linux) +#define H2O_USE_EPOLL 1 +#else +#define H2O_USE_POLL 1 +#endif +#endif + +#if H2O_USE_POLL +#include "evloop/poll.c.h" +#elif H2O_USE_EPOLL +#include "evloop/epoll.c.h" +#elif H2O_USE_KQUEUE +#include "evloop/kqueue.c.h" +#else +#error "poller not specified" +#endif + +void link_to_pending(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_next_pending == sock) { + struct st_h2o_evloop_socket_t **slot = (sock->_flags & H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION) != 0 + ? &sock->loop->_pending_as_server + : &sock->loop->_pending_as_client; + sock->_next_pending = *slot; + *slot = sock; + } +} + +static void link_to_statechanged(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_next_statechanged == sock) { + sock->_next_statechanged = NULL; + *sock->loop->_statechanged.tail_ref = sock; + sock->loop->_statechanged.tail_ref = &sock->_next_statechanged; + } +} + +static const char *on_read_core(int fd, h2o_buffer_t **input) +{ + int read_any = 0; + + while (1) { + ssize_t rret; + h2o_iovec_t buf = h2o_buffer_reserve(input, 4096); + if (buf.base == NULL) { + /* memory allocation failed */ + return h2o_socket_error_out_of_memory; + } + while ((rret = read(fd, buf.base, buf.len <= INT_MAX / 2 ? buf.len : INT_MAX / 2 + 1)) == -1 && errno == EINTR) + ; + if (rret == -1) { + if (errno == EAGAIN) + break; + else + return h2o_socket_error_io; + } else if (rret == 0) { + if (!read_any) + return h2o_socket_error_closed; /* TODO notify close */ + break; + } + (*input)->size += rret; + if (buf.len != rret) + break; + read_any = 1; + } + return NULL; +} + +static void wreq_free_buffer_if_allocated(struct st_h2o_evloop_socket_t *sock) +{ + if (sock->_wreq.smallbufs <= sock->_wreq.bufs && + sock->_wreq.bufs <= sock->_wreq.smallbufs + sizeof(sock->_wreq.smallbufs) / sizeof(sock->_wreq.smallbufs[0])) { + /* no need to free */ + } else { + free(sock->_wreq.alloced_ptr); + sock->_wreq.bufs = sock->_wreq.smallbufs; + } +} + +static int write_core(int fd, h2o_iovec_t **bufs, size_t *bufcnt) +{ + int iovcnt; + ssize_t wret; + + if (*bufcnt != 0) { + do { + /* write */ + iovcnt = IOV_MAX; + if (*bufcnt < iovcnt) + iovcnt = (int)*bufcnt; + while ((wret = writev(fd, (struct iovec *)*bufs, iovcnt)) == -1 && errno == EINTR) + ; + if (wret == -1) { + if (errno != EAGAIN) + return -1; + break; + } + /* adjust the buffer */ + while ((*bufs)->len < wret) { + wret -= (*bufs)->len; + ++*bufs; + --*bufcnt; + assert(*bufcnt != 0); + } + if (((*bufs)->len -= wret) == 0) { + ++*bufs; + --*bufcnt; + } else { + (*bufs)->base += wret; + } + } while (*bufcnt != 0 && iovcnt == IOV_MAX); + } + + return 0; +} + +void write_pending(struct st_h2o_evloop_socket_t *sock) +{ + assert(sock->super._cb.write != NULL); + + /* DONT_WRITE poll */ + if (sock->_wreq.cnt == 0) + goto Complete; + + /* write */ + if (write_core(sock->fd, &sock->_wreq.bufs, &sock->_wreq.cnt) == 0 && sock->_wreq.cnt != 0) { + /* partial write */ + return; + } + + /* either completed or failed */ + wreq_free_buffer_if_allocated(sock); + +Complete: + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + link_to_statechanged(sock); /* might need to disable the write polling */ +} + +static void read_on_ready(struct st_h2o_evloop_socket_t *sock) +{ + const char *err = 0; + size_t prev_bytes_read = sock->super.input->size; + + if ((sock->_flags & H2O_SOCKET_FLAG_DONT_READ) != 0) + goto Notify; + + if ((err = on_read_core(sock->fd, sock->super.ssl == NULL ? &sock->super.input : &sock->super.ssl->input.encrypted)) != NULL) + goto Notify; + + if (sock->super.ssl != NULL && sock->super.ssl->handshake.cb == NULL) + err = decode_ssl_input(&sock->super); + +Notify: + /* the application may get notified even if no new data is avaiable. The + * behavior is intentional; it is designed as such so that the applications + * can update their timeout counters when a partial SSL record arrives. + */ + sock->super.bytes_read = sock->super.input->size - prev_bytes_read; + sock->super._cb.read(&sock->super, err); +} + +void do_dispose_socket(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + evloop_do_on_socket_close(sock); + wreq_free_buffer_if_allocated(sock); + if (sock->fd != -1) { + close(sock->fd); + sock->fd = -1; + } + sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED; + link_to_statechanged(sock); +} + +void do_write(h2o_socket_t *_sock, h2o_iovec_t *_bufs, size_t bufcnt, h2o_socket_cb cb) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + h2o_iovec_t *bufs; + h2o_iovec_t *tofree = NULL; + + assert(sock->super._cb.write == NULL); + assert(sock->_wreq.cnt == 0); + sock->super._cb.write = cb; + + /* cap the number of buffers, since we're using alloca */ + if (bufcnt > 10000) + bufs = tofree = h2o_mem_alloc(sizeof(*bufs) * bufcnt); + else + bufs = alloca(sizeof(*bufs) * bufcnt); + + memcpy(bufs, _bufs, sizeof(*bufs) * bufcnt); + + /* try to write now */ + if (write_core(sock->fd, &bufs, &bufcnt) != 0) { + /* fill in _wreq.bufs with fake data to indicate error */ + sock->_wreq.bufs = sock->_wreq.smallbufs; + sock->_wreq.cnt = 1; + *sock->_wreq.bufs = h2o_iovec_init(H2O_STRLIT("deadbeef")); + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + goto Out; + } + if (bufcnt == 0) { + /* write complete, schedule the callback */ + sock->_flags |= H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + link_to_pending(sock); + goto Out; + } + + + /* setup the buffer to send pending data */ + if (bufcnt <= sizeof(sock->_wreq.smallbufs) / sizeof(sock->_wreq.smallbufs[0])) { + sock->_wreq.bufs = sock->_wreq.smallbufs; + } else { + sock->_wreq.bufs = h2o_mem_alloc(sizeof(h2o_iovec_t) * bufcnt); + sock->_wreq.alloced_ptr = sock->_wreq.bufs; + } + memcpy(sock->_wreq.bufs, bufs, sizeof(h2o_iovec_t) * bufcnt); + sock->_wreq.cnt = bufcnt; + + /* schedule the write */ + link_to_statechanged(sock); +Out: + free(tofree); +} + +int h2o_socket_get_fd(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + return sock->fd; +} + +void do_read_start(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + link_to_statechanged(sock); +} + +void do_read_stop(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY; + link_to_statechanged(sock); +} + +void h2o_socket_dont_read(h2o_socket_t *_sock, int dont_read) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + + if (dont_read) { + sock->_flags |= H2O_SOCKET_FLAG_DONT_READ; + } else { + sock->_flags &= ~H2O_SOCKET_FLAG_DONT_READ; + } +} + +int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + + assert((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) == 0); + evloop_do_on_socket_export(sock); + sock->_flags = H2O_SOCKET_FLAG_IS_DISPOSED; + + info->fd = sock->fd; + sock->fd = -1; + + return 0; +} + +h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info) +{ + return h2o_evloop_socket_create(loop, info->fd, 0); +} + +h2o_loop_t *h2o_socket_get_loop(h2o_socket_t *_sock) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + return sock->loop; +} + +socklen_t h2o_socket_getsockname(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + socklen_t len = sizeof(struct sockaddr_storage); + if (getsockname(sock->fd, sa, &len) != 0) + return 0; + return len; +} + +socklen_t get_peername_uncached(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_evloop_socket_t *sock = (void *)_sock; + socklen_t len = sizeof(struct sockaddr_storage); + if (getpeername(sock->fd, sa, &len) != 0) + return 0; + return len; +} + +static struct st_h2o_evloop_socket_t *create_socket(h2o_evloop_t *loop, int fd, int flags) +{ + struct st_h2o_evloop_socket_t *sock; + + fcntl(fd, F_SETFL, O_NONBLOCK); + + sock = h2o_mem_alloc(sizeof(*sock)); + memset(sock, 0, sizeof(*sock)); + h2o_buffer_init(&sock->super.input, &h2o_socket_buffer_prototype); + sock->loop = loop; + sock->fd = fd; + sock->_flags = flags; + sock->_wreq.bufs = sock->_wreq.smallbufs; + sock->_next_pending = sock; + sock->_next_statechanged = sock; + + evloop_do_on_socket_create(sock); + + return sock; +} + +static struct st_h2o_evloop_socket_t *create_socket_set_nodelay(h2o_evloop_t *loop, int fd, int flags) +{ + int on = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); + return create_socket(loop, fd, flags); +} + +h2o_socket_t *h2o_evloop_socket_create(h2o_evloop_t *loop, int fd, int flags) +{ + fcntl(fd, F_SETFL, O_NONBLOCK); + return &create_socket(loop, fd, flags)->super; +} + +h2o_socket_t *h2o_evloop_socket_accept(h2o_socket_t *_listener) +{ + struct st_h2o_evloop_socket_t *listener = (struct st_h2o_evloop_socket_t *)_listener; + int fd; + +#if H2O_USE_ACCEPT4 + if ((fd = accept4(listener->fd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)) == -1) + return NULL; +#else + if ((fd = cloexec_accept(listener->fd, NULL, NULL)) == -1) + return NULL; + fcntl(fd, F_SETFL, O_NONBLOCK); +#endif + + return &create_socket_set_nodelay(listener->loop, fd, H2O_SOCKET_FLAG_IS_ACCEPTED_CONNECTION)->super; +} + +h2o_socket_t *h2o_socket_connect(h2o_loop_t *loop, struct sockaddr *addr, socklen_t addrlen, h2o_socket_cb cb) +{ + int fd; + struct st_h2o_evloop_socket_t *sock; + + if ((fd = cloexec_socket(addr->sa_family, SOCK_STREAM, 0)) == -1) + return NULL; + fcntl(fd, F_SETFL, O_NONBLOCK); + if (!(connect(fd, addr, addrlen) == 0 || errno == EINPROGRESS)) { + close(fd); + return NULL; + } + + sock = create_socket_set_nodelay(loop, fd, H2O_SOCKET_FLAG_IS_CONNECTING); + h2o_socket_notify_write(&sock->super, cb); + return &sock->super; +} + +h2o_evloop_t *create_evloop(size_t sz) +{ + h2o_evloop_t *loop = h2o_mem_alloc(sz); + + memset(loop, 0, sz); + loop->_statechanged.tail_ref = &loop->_statechanged.head; + h2o_linklist_init_anchor(&loop->_timeouts); + + update_now(loop); + + return loop; +} + +void update_now(h2o_evloop_t *loop) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + loop->_now = (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; +} + +int32_t adjust_max_wait(h2o_evloop_t *loop, int32_t max_wait) +{ + uint64_t wake_at = h2o_timeout_get_wake_at(&loop->_timeouts); + + update_now(loop); + + if (wake_at <= loop->_now) { + max_wait = 0; + } else { + uint64_t delta = wake_at - loop->_now; + if (delta < max_wait) + max_wait = (int32_t)delta; + } + + return max_wait; +} + +void h2o_socket_notify_write(h2o_socket_t *_sock, h2o_socket_cb cb) +{ + struct st_h2o_evloop_socket_t *sock = (struct st_h2o_evloop_socket_t *)_sock; + assert(sock->super._cb.write == NULL); + assert(sock->_wreq.cnt == 0); + + sock->super._cb.write = cb; + link_to_statechanged(sock); +} + +static void run_socket(struct st_h2o_evloop_socket_t *sock) +{ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + /* is freed in updatestates phase */ + return; + } + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_READ_READY) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_READ_READY; + read_on_ready(sock); + } + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_WRITE_NOTIFY) != 0) { + const char *err = NULL; + assert(sock->super._cb.write != NULL); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_WRITE_NOTIFY; + if (sock->_wreq.cnt != 0) { + /* error */ + err = h2o_socket_error_io; + sock->_wreq.cnt = 0; + } else if ((sock->_flags & H2O_SOCKET_FLAG_IS_CONNECTING) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_CONNECTING; + int so_err = 0; + socklen_t l = sizeof(so_err); + so_err = 0; + if (getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &so_err, &l) != 0 || so_err != 0) { + /* FIXME lookup the error table */ + err = h2o_socket_error_conn_fail; + } + } + on_write_complete(&sock->super, err); + } +} + +static void run_pending(h2o_evloop_t *loop) +{ + struct st_h2o_evloop_socket_t *sock; + + while (loop->_pending_as_server != NULL || loop->_pending_as_client != NULL) { + while ((sock = loop->_pending_as_client) != NULL) { + loop->_pending_as_client = sock->_next_pending; + sock->_next_pending = sock; + run_socket(sock); + } + if ((sock = loop->_pending_as_server) != NULL) { + loop->_pending_as_server = sock->_next_pending; + sock->_next_pending = sock; + run_socket(sock); + } + } +} + +void h2o_evloop_destroy(h2o_evloop_t *loop) +{ + struct st_h2o_evloop_socket_t *sock; + + /* timeouts are governed by the application and MUST be destroyed prior to destroying the loop */ + assert(h2o_linklist_is_empty(&loop->_timeouts)); + + /* dispose all socket */ + while ((sock = loop->_pending_as_client) != NULL) { + loop->_pending_as_client = sock->_next_pending; + sock->_next_pending = sock; + h2o_socket_close((h2o_socket_t *)sock); + } + while ((sock = loop->_pending_as_server) != NULL) { + loop->_pending_as_server = sock->_next_pending; + sock->_next_pending = sock; + h2o_socket_close((h2o_socket_t *)sock); + } + + /* now all socket are disposedand and placed in linked list statechanged + * we can freeing memory in cycle by next_statechanged, + */ + while ((sock = loop->_statechanged.head) != NULL) { + loop->_statechanged.head = sock->_next_statechanged; + free(sock); + } + + /* lastly we need to free loop memory */ + free(loop); +} + +int h2o_evloop_run(h2o_evloop_t *loop, int32_t max_wait) +{ + h2o_linklist_t *node; + + /* update socket states, poll, set readable flags, perform pending writes */ + if (evloop_do_proceed(loop, max_wait) != 0) + return -1; + + /* run the pending callbacks */ + run_pending(loop); + + /* run the timeouts */ + for (node = loop->_timeouts.next; node != &loop->_timeouts; node = node->next) { + h2o_timeout_t *timeout = H2O_STRUCT_FROM_MEMBER(h2o_timeout_t, _link, node); + h2o_timeout_run(loop, timeout, loop->_now); + } + /* assert h2o_timeout_run has called run_pending */ + assert(loop->_pending_as_client == NULL); + assert(loop->_pending_as_server == NULL); + + if (h2o_sliding_counter_is_running(&loop->exec_time_counter)) { + update_now(loop); + h2o_sliding_counter_stop(&loop->exec_time_counter, loop->_now); + } + + return 0; +} + +void h2o_timeout__do_init(h2o_evloop_t *loop, h2o_timeout_t *timeout) +{ + h2o_linklist_insert(&loop->_timeouts, &timeout->_link); +} + +void h2o_timeout__do_dispose(h2o_evloop_t *loop, h2o_timeout_t *timeout) +{ + h2o_linklist_unlink(&timeout->_link); +} + +void h2o_timeout__do_link(h2o_evloop_t *loop, h2o_timeout_t *timeout, h2o_timeout_entry_t *entry) +{ + /* nothing to do */ +} + +void h2o_timeout__do_post_callback(h2o_evloop_t *loop) +{ + run_pending(loop); +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h new file mode 100644 index 000000000..247dac893 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/epoll.c.h @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <limits.h> +#include <stdio.h> +#include <sys/epoll.h> + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_evloop_epoll_t { + h2o_evloop_t super; + int ep; +}; + +static int update_status(struct st_h2o_evloop_epoll_t *loop) +{ + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + free(sock); + } else { + int changed = 0, op, ret; + struct epoll_event ev; + ev.events = 0; + if (h2o_socket_is_reading(&sock->super)) { + ev.events |= EPOLLIN; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + changed = 1; + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + changed = 1; + } + } + if (h2o_socket_is_writing(&sock->super)) { + ev.events |= EPOLLOUT; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + changed = 1; + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + changed = 1; + } + } + if (changed) { + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) != 0) { + if (ev.events != 0) + op = EPOLL_CTL_MOD; + else + op = EPOLL_CTL_DEL; + } else { + assert(ev.events != 0); + op = EPOLL_CTL_ADD; + } + ev.data.ptr = sock; + while ((ret = epoll_ctl(loop->ep, op, sock->fd, &ev)) != 0 && errno == EINTR) + ; + if (ret != 0) + return -1; + if (op == EPOLL_CTL_DEL) + sock->_flags &= ~H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED; + else + sock->_flags |= H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED; + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; + + return 0; +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop; + struct epoll_event events[256]; + int nevents, i; + + /* collect (and update) status */ + if (update_status(loop) != 0) + return -1; + + /* poll */ + max_wait = adjust_max_wait(&loop->super, max_wait); + nevents = epoll_wait(loop->ep, events, sizeof(events) / sizeof(events[0]), max_wait); + update_now(&loop->super); + if (nevents == -1) + return -1; + + if (nevents != 0) + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + + /* update readable flags, perform writes */ + for (i = 0; i != nevents; ++i) { + struct st_h2o_evloop_socket_t *sock = events[i].data.ptr; + int notified = 0; + if ((events[i].events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + notified = 1; + } + } + if ((events[i].events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + write_pending(sock); + notified = 1; + } + } + if (!notified) { + static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + static time_t last_reported = 0; + time_t now = time(NULL); + pthread_mutex_lock(&lock); + if (last_reported + 60 < now) { + last_reported = now; + fprintf(stderr, "ignoring epoll event (fd:%d,event:%x)\n", sock->fd, (int)events[i].events); + } + pthread_mutex_unlock(&lock); + } + } + + return 0; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop; + int ret; + + if (sock->fd == -1) + return; + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) + return; + while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR) + ; + if (ret != 0) + fprintf(stderr, "socket_close: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop; + int ret; + + if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) + return; + while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR) + ; + if (ret != 0) + fprintf(stderr, "socket_export: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)create_evloop(sizeof(*loop)); + + pthread_mutex_lock(&cloexec_mutex); + loop->ep = epoll_create(10); + while (fcntl(loop->ep, F_SETFD, FD_CLOEXEC) == -1) { + if (errno != EAGAIN) { + fprintf(stderr, "h2o_evloop_create: failed to set FD_CLOEXEC to the epoll fd (errno=%d)\n", errno); + abort(); + } + } + pthread_mutex_unlock(&cloexec_mutex); + + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h new file mode 100644 index 000000000..21288ed7c --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/kqueue.c.h @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/event.h> +#include <sys/time.h> + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_socket_loop_kqueue_t { + h2o_evloop_t super; + int kq; +}; + +static void ev_set(struct kevent *ev, int fd, int filter, int flags, struct st_h2o_evloop_socket_t *sock) +{ +#ifdef __NetBSD__ + EV_SET(ev, fd, filter, flags, 0, 0, (intptr_t)sock); +#else + EV_SET(ev, fd, filter, flags, 0, 0, sock); +#endif +} + +static int collect_status(struct st_h2o_socket_loop_kqueue_t *loop, struct kevent *changelist, int changelist_capacity) +{ + int change_index = 0; + +#define SET_AND_UPDATE(filter, flags) \ + do { \ + ev_set(changelist + change_index++, sock->fd, filter, flags, sock); \ + if (change_index == changelist_capacity) { \ + int ret; \ + while ((ret = kevent(loop->kq, changelist, change_index, NULL, 0, NULL)) != 0 && errno == EINTR) \ + ; \ + if (ret == -1) \ + return -1; \ + change_index = 0; \ + } \ + } while (0) + + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + free(sock); + } else { + if (h2o_socket_is_reading(&sock->super)) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + SET_AND_UPDATE(EVFILT_READ, EV_ADD); + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + SET_AND_UPDATE(EVFILT_READ, EV_DELETE); + } + } + if (h2o_socket_is_writing(&sock->super)) { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) { + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + SET_AND_UPDATE(EVFILT_WRITE, EV_ADD); + } + } else { + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + SET_AND_UPDATE(EVFILT_WRITE, EV_DELETE); + } + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; + + return change_index; + +#undef SET_AND_UPDATE +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)_loop; + struct kevent changelist[64], events[128]; + int nchanges, nevents, i; + struct timespec ts; + + /* collect (and update) status */ + if ((nchanges = collect_status(loop, changelist, sizeof(changelist) / sizeof(changelist[0]))) == -1) + return -1; + + /* poll */ + max_wait = adjust_max_wait(&loop->super, max_wait); + ts.tv_sec = max_wait / 1000; + ts.tv_nsec = max_wait % 1000 * 1000 * 1000; + nevents = kevent(loop->kq, changelist, nchanges, events, sizeof(events) / sizeof(events[0]), &ts); + + update_now(&loop->super); + if (nevents == -1) + return -1; + + if (nevents != 0) + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + + /* update readable flags, perform writes */ + for (i = 0; i != nevents; ++i) { + struct st_h2o_evloop_socket_t *sock = (void *)events[i].udata; + assert(sock->fd == events[i].ident); + switch (events[i].filter) { + case EVFILT_READ: + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + } + break; + case EVFILT_WRITE: + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + write_pending(sock); + } + break; + default: + break; /* ??? */ + } + } + + return 0; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (void *)sock->loop; + struct kevent changelist[2]; + int change_index = 0, ret; + + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) + ev_set(changelist + change_index++, sock->fd, EVFILT_READ, EV_DELETE, 0); + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) + ev_set(changelist + change_index++, sock->fd, EVFILT_WRITE, EV_DELETE, 0); + if (change_index == 0) + return; + while ((ret = kevent(loop->kq, changelist, change_index, NULL, 0, NULL)) != 0 && errno == EINTR) + ; + if (ret == -1) + fprintf(stderr, "kevent returned error %d (fd=%d)", errno, sock->fd); +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_socket_loop_kqueue_t *loop = (struct st_h2o_socket_loop_kqueue_t *)create_evloop(sizeof(*loop)); + + loop->kq = kqueue(); + + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h b/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h new file mode 100644 index 000000000..8b3f3d149 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/evloop/poll.c.h @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2014,2015 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <stdio.h> +#include <poll.h> + +#if 0 +#define DEBUG_LOG(...) fprintf(stderr, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif + +struct st_h2o_evloop_poll_t { + h2o_evloop_t super; + H2O_VECTOR(struct st_h2o_evloop_socket_t *) socks; +}; + +static void update_socks(struct st_h2o_evloop_poll_t *loop) +{ + /* update loop->socks */ + while (loop->super._statechanged.head != NULL) { + /* detach the top */ + struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; + loop->super._statechanged.head = sock->_next_statechanged; + sock->_next_statechanged = sock; + /* update the state */ + if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { + assert(sock->fd == -1); + free(sock); + } else { + assert(sock->fd < loop->socks.size); + if (loop->socks.entries[sock->fd] == NULL) { + loop->socks.entries[sock->fd] = sock; + } else { + assert(loop->socks.entries[sock->fd] == sock); + } + if (h2o_socket_is_reading(&sock->super)) { + DEBUG_LOG("setting READ for fd: %d\n", sock->fd); + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + } else { + DEBUG_LOG("clearing READ for fd: %d\n", sock->fd); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; + } + if (h2o_socket_is_writing(&sock->super)) { + DEBUG_LOG("setting WRITE for fd: %d\n", sock->fd); + sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + } else { + DEBUG_LOG("clearing WRITE for fd: %d\n", sock->fd); + sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; + } + } + } + loop->super._statechanged.tail_ref = &loop->super._statechanged.head; +} + +int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)_loop; + H2O_VECTOR(struct pollfd) pollfds = {NULL}; + int fd, ret; + + /* update status */ + update_socks(loop); + + /* build list of fds to be polled */ + for (fd = 0; fd != loop->socks.size; ++fd) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[fd]; + if (sock == NULL) + continue; + assert(fd == sock->fd); + if ((sock->_flags & (H2O_SOCKET_FLAG_IS_POLLED_FOR_READ | H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE)) != 0) { + h2o_vector_reserve(NULL, &pollfds, pollfds.size + 1); + struct pollfd *slot = pollfds.entries + pollfds.size++; + slot->fd = fd; + slot->events = 0; + slot->revents = 0; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) + slot->events |= POLLIN; + if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) + slot->events |= POLLOUT; + } + } + + /* call */ + max_wait = adjust_max_wait(&loop->super, max_wait); + ret = poll(pollfds.entries, (nfds_t)pollfds.size, max_wait); + update_now(&loop->super); + if (ret == -1) + goto Exit; + DEBUG_LOG("poll returned: %d\n", ret); + + /* update readable flags, perform writes */ + if (ret > 0) { + size_t i; + h2o_sliding_counter_start(&loop->super.exec_time_counter, loop->super._now); + for (i = 0; i != pollfds.size; ++i) { + /* set read_ready flag before calling the write cb, since app. code invoked by the latter may close the socket, clearing + * the former flag */ + if ((pollfds.entries[i].revents & POLLIN) != 0) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[pollfds.entries[i].fd]; + assert(sock != NULL); + assert(sock->fd == pollfds.entries[i].fd); + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; + link_to_pending(sock); + DEBUG_LOG("added fd %d as read_ready\n", sock->fd); + } + } + if ((pollfds.entries[i].revents & POLLOUT) != 0) { + struct st_h2o_evloop_socket_t *sock = loop->socks.entries[pollfds.entries[i].fd]; + assert(sock != NULL); + assert(sock->fd == pollfds.entries[i].fd); + if (sock->_flags != H2O_SOCKET_FLAG_IS_DISPOSED) { + DEBUG_LOG("handling pending writes on fd %d\n", fd); + write_pending(sock); + } + } + } + ret = 0; + } + +Exit: + free(pollfds.entries); + return ret; +} + +static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + + if (sock->fd >= loop->socks.size) { + h2o_vector_reserve(NULL, &loop->socks, sock->fd + 1); + memset(loop->socks.entries + loop->socks.size, 0, (sock->fd + 1 - loop->socks.size) * sizeof(loop->socks.entries[0])); + loop->socks.size = sock->fd + 1; + } + + if (loop->socks.entries[sock->fd] != NULL) + assert(loop->socks.entries[sock->fd]->_flags == H2O_SOCKET_FLAG_IS_DISPOSED); +} + +static void evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + + if (sock->fd != -1) + loop->socks.entries[sock->fd] = NULL; +} + +static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)sock->loop; + evloop_do_on_socket_close(sock); + loop->socks.entries[sock->fd] = NULL; +} + +h2o_evloop_t *h2o_evloop_create(void) +{ + struct st_h2o_evloop_poll_t *loop = (struct st_h2o_evloop_poll_t *)create_evloop(sizeof(*loop)); + return &loop->super; +} diff --git a/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h b/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h new file mode 100644 index 000000000..44c71c166 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socket/uv-binding.c.h @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Fastly, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +struct st_h2o_uv_socket_t { + h2o_socket_t super; + struct { + uv_stream_t *stream; + uv_close_cb close_cb; + } uv; + union { + uv_connect_t _creq; + uv_write_t _wreq; + }; +}; + +static void schedule_timer(h2o_timeout_t *timeout); + +static void alloc_inbuf_tcp(uv_handle_t *handle, size_t suggested_size, uv_buf_t *_buf) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + + h2o_iovec_t buf = h2o_buffer_reserve(&sock->super.input, 4096); + memcpy(_buf, &buf, sizeof(buf)); +} + +static void alloc_inbuf_ssl(uv_handle_t *handle, size_t suggested_size, uv_buf_t *_buf) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + + h2o_iovec_t buf = h2o_buffer_reserve(&sock->super.ssl->input.encrypted, 4096); + memcpy(_buf, &buf, sizeof(buf)); +} + +static void on_read_tcp(uv_stream_t *stream, ssize_t nread, const uv_buf_t *_unused) +{ + struct st_h2o_uv_socket_t *sock = stream->data; + + if (nread < 0) { + sock->super.bytes_read = 0; + sock->super._cb.read(&sock->super, h2o_socket_error_closed); + return; + } + + sock->super.input->size += nread; + sock->super.bytes_read = nread; + sock->super._cb.read(&sock->super, NULL); +} + +static void on_read_ssl(uv_stream_t *stream, ssize_t nread, const uv_buf_t *_unused) +{ + struct st_h2o_uv_socket_t *sock = stream->data; + size_t prev_bytes_read = sock->super.input->size; + const char *err = h2o_socket_error_io; + + if (nread > 0) { + sock->super.ssl->input.encrypted->size += nread; + if (sock->super.ssl->handshake.cb == NULL) + err = decode_ssl_input(&sock->super); + else + err = NULL; + } + sock->super.bytes_read = sock->super.input->size - prev_bytes_read; + sock->super._cb.read(&sock->super, err); +} + +static void on_do_write_complete(uv_write_t *wreq, int status) +{ + struct st_h2o_uv_socket_t *sock = H2O_STRUCT_FROM_MEMBER(struct st_h2o_uv_socket_t, _wreq, wreq); + if (sock->super._cb.write != NULL) + on_write_complete(&sock->super, status == 0 ? NULL : h2o_socket_error_io); +} + +static void free_sock(uv_handle_t *handle) +{ + struct st_h2o_uv_socket_t *sock = handle->data; + uv_close_cb cb = sock->uv.close_cb; + free(sock); + cb(handle); +} + +void do_dispose_socket(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + sock->super._cb.write = NULL; /* avoid the write callback getting called when closing the socket (#1249) */ + uv_close((uv_handle_t *)sock->uv.stream, free_sock); +} + +int h2o_socket_get_fd(h2o_socket_t *_sock) +{ + int fd, ret; + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + ret = uv_fileno((uv_handle_t *)sock->uv.stream, (uv_os_fd_t *)&fd); + if (ret) + return -1; + + return fd; +} + +void do_read_start(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + if (sock->super.ssl == NULL) + uv_read_start(sock->uv.stream, alloc_inbuf_tcp, on_read_tcp); + else + uv_read_start(sock->uv.stream, alloc_inbuf_ssl, on_read_ssl); +} + +void do_read_stop(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + uv_read_stop(sock->uv.stream); +} + +void do_write(h2o_socket_t *_sock, h2o_iovec_t *bufs, size_t bufcnt, h2o_socket_cb cb) +{ + struct st_h2o_uv_socket_t *sock = (struct st_h2o_uv_socket_t *)_sock; + + assert(sock->super._cb.write == NULL); + sock->super._cb.write = cb; + + uv_write(&sock->_wreq, sock->uv.stream, (uv_buf_t *)bufs, (int)bufcnt, on_do_write_complete); +} + +static struct st_h2o_uv_socket_t *create_socket(h2o_loop_t *loop) +{ + uv_tcp_t *tcp = h2o_mem_alloc(sizeof(*tcp)); + + if (uv_tcp_init(loop, tcp) != 0) { + free(tcp); + return NULL; + } + return (void *)h2o_uv_socket_create((void *)tcp, (uv_close_cb)free); +} + +int do_export(h2o_socket_t *_sock, h2o_socket_export_t *info) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + uv_os_fd_t fd; + + if (uv_fileno((uv_handle_t *)sock->uv.stream, &fd) != 0) + return -1; + /* FIXME: consider how to overcome the epoll(2) problem; man says, + * "even after a file descriptor that is part of an epoll set has been closed, + * events may be reported for that file descriptor if other file descriptors + * referring to the same underlying file description remain open" + */ + if ((info->fd = dup(fd)) == -1) + return -1; + return 0; +} + +h2o_socket_t *do_import(h2o_loop_t *loop, h2o_socket_export_t *info) +{ + struct st_h2o_uv_socket_t *sock = create_socket(loop); + + if (sock == NULL) + return NULL; + if (uv_tcp_open((uv_tcp_t *)sock->uv.stream, info->fd) != 0) { + h2o_socket_close(&sock->super); + return NULL; + } + + return &sock->super; +} + +h2o_socket_t *h2o_uv_socket_create(uv_stream_t *stream, uv_close_cb close_cb) +{ + struct st_h2o_uv_socket_t *sock = h2o_mem_alloc(sizeof(*sock)); + + memset(sock, 0, sizeof(*sock)); + h2o_buffer_init(&sock->super.input, &h2o_socket_buffer_prototype); + sock->uv.stream = stream; + sock->uv.close_cb = close_cb; + stream->data = sock; + return &sock->super; +} + +static void on_connect(uv_connect_t *conn, int status) +{ + if (status == UV_ECANCELED) + return; + struct st_h2o_uv_socket_t *sock = H2O_STRUCT_FROM_MEMBER(struct st_h2o_uv_socket_t, _creq, conn); + h2o_socket_cb cb = sock->super._cb.write; + sock->super._cb.write = NULL; + cb(&sock->super, status == 0 ? NULL : h2o_socket_error_conn_fail); +} + +h2o_loop_t *h2o_socket_get_loop(h2o_socket_t *_sock) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + return sock->uv.stream->loop; +} + +h2o_socket_t *h2o_socket_connect(h2o_loop_t *loop, struct sockaddr *addr, socklen_t addrlen, h2o_socket_cb cb) +{ + struct st_h2o_uv_socket_t *sock = create_socket(loop); + + if (sock == NULL) + return NULL; + if (uv_tcp_connect(&sock->_creq, (void *)sock->uv.stream, addr, on_connect) != 0) { + h2o_socket_close(&sock->super); + return NULL; + } + sock->super._cb.write = cb; + return &sock->super; +} + +socklen_t h2o_socket_getsockname(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + int len = sizeof(struct sockaddr_storage); + if (uv_tcp_getsockname((void *)sock->uv.stream, sa, &len) != 0) + return 0; + return (socklen_t)len; +} + +socklen_t get_peername_uncached(h2o_socket_t *_sock, struct sockaddr *sa) +{ + struct st_h2o_uv_socket_t *sock = (void *)_sock; + int len = sizeof(struct sockaddr_storage); + if (uv_tcp_getpeername((void *)sock->uv.stream, sa, &len) != 0) + return 0; + return (socklen_t)len; +} + +static void on_timeout(uv_timer_t *timer) +{ + h2o_timeout_t *timeout = H2O_STRUCT_FROM_MEMBER(h2o_timeout_t, _backend.timer, timer); + + h2o_timeout_run(timer->loop, timeout, h2o_now(timer->loop)); + if (!h2o_linklist_is_empty(&timeout->_entries)) + schedule_timer(timeout); +} + +void schedule_timer(h2o_timeout_t *timeout) +{ + h2o_timeout_entry_t *entry = H2O_STRUCT_FROM_MEMBER(h2o_timeout_entry_t, _link, timeout->_entries.next); + uv_timer_start(&timeout->_backend.timer, on_timeout, + entry->registered_at + timeout->timeout - h2o_now(timeout->_backend.timer.loop), 0); +} + +void h2o_timeout__do_init(h2o_loop_t *loop, h2o_timeout_t *timeout) +{ + uv_timer_init(loop, &timeout->_backend.timer); +} + +void h2o_timeout__do_dispose(h2o_loop_t *loop, h2o_timeout_t *timeout) +{ + uv_close((uv_handle_t *)&timeout->_backend.timer, NULL); +} + +void h2o_timeout__do_link(h2o_loop_t *loop, h2o_timeout_t *timeout, h2o_timeout_entry_t *entry) +{ + /* register the timer if the entry just being added is the only entry */ + if (timeout->_entries.next == &entry->_link) + schedule_timer(timeout); +} + +void h2o_timeout__do_post_callback(h2o_loop_t *loop) +{ + /* nothing to do */ +} diff --git a/web/server/h2o/libh2o/lib/common/socketpool.c b/web/server/h2o/libh2o/lib/common/socketpool.c new file mode 100644 index 000000000..da69933f7 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/socketpool.c @@ -0,0 +1,342 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <errno.h> +#include <netdb.h> +#include <stdlib.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <netinet/in.h> +#include "h2o/hostinfo.h" +#include "h2o/linklist.h" +#include "h2o/socketpool.h" +#include "h2o/string_.h" +#include "h2o/timeout.h" + +struct pool_entry_t { + h2o_socket_export_t sockinfo; + h2o_linklist_t link; + uint64_t added_at; +}; + +struct st_h2o_socketpool_connect_request_t { + void *data; + h2o_socketpool_connect_cb cb; + h2o_socketpool_t *pool; + h2o_loop_t *loop; + h2o_hostinfo_getaddr_req_t *getaddr_req; + h2o_socket_t *sock; +}; + +static void destroy_detached(struct pool_entry_t *entry) +{ + h2o_socket_dispose_export(&entry->sockinfo); + free(entry); +} + +static void destroy_attached(struct pool_entry_t *entry) +{ + h2o_linklist_unlink(&entry->link); + destroy_detached(entry); +} + +static void destroy_expired(h2o_socketpool_t *pool) +{ + /* caller should lock the mutex */ + uint64_t expire_before = h2o_now(pool->_interval_cb.loop) - pool->timeout; + while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { + struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + if (entry->added_at > expire_before) + break; + destroy_attached(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + } +} + +static void on_timeout(h2o_timeout_entry_t *timeout_entry) +{ + /* FIXME decrease the frequency of this function being called; the expiration + * check can be (should be) performed in the `connect` fuction as well + */ + h2o_socketpool_t *pool = H2O_STRUCT_FROM_MEMBER(h2o_socketpool_t, _interval_cb.entry, timeout_entry); + + if (pthread_mutex_trylock(&pool->_shared.mutex) == 0) { + destroy_expired(pool); + pthread_mutex_unlock(&pool->_shared.mutex); + } + + h2o_timeout_link(pool->_interval_cb.loop, &pool->_interval_cb.timeout, &pool->_interval_cb.entry); +} + +static void common_init(h2o_socketpool_t *pool, h2o_socketpool_type_t type, h2o_iovec_t host, int is_ssl, size_t capacity) +{ + memset(pool, 0, sizeof(*pool)); + + pool->type = type; + pool->peer.host = h2o_strdup(NULL, host.base, host.len); + pool->is_ssl = is_ssl; + pool->capacity = capacity; + pool->timeout = UINT64_MAX; + + pthread_mutex_init(&pool->_shared.mutex, NULL); + h2o_linklist_init_anchor(&pool->_shared.sockets); +} + +void h2o_socketpool_init_by_address(h2o_socketpool_t *pool, struct sockaddr *sa, socklen_t salen, int is_ssl, size_t capacity) +{ + char host[NI_MAXHOST]; + size_t host_len; + + assert(salen <= sizeof(pool->peer.sockaddr.bytes)); + + if ((host_len = h2o_socket_getnumerichost(sa, salen, host)) == SIZE_MAX) { + if (sa->sa_family != AF_UNIX) + h2o_fatal("failed to convert a non-unix socket address to a numerical representation"); + /* use the sockaddr_un::sun_path as the SNI indicator (is that the right thing to do?) */ + strcpy(host, ((struct sockaddr_un *)sa)->sun_path); + host_len = strlen(host); + } + + common_init(pool, H2O_SOCKETPOOL_TYPE_SOCKADDR, h2o_iovec_init(host, host_len), is_ssl, capacity); + memcpy(&pool->peer.sockaddr.bytes, sa, salen); + pool->peer.sockaddr.len = salen; +} + +void h2o_socketpool_init_by_hostport(h2o_socketpool_t *pool, h2o_iovec_t host, uint16_t port, int is_ssl, size_t capacity) +{ + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + + if (h2o_hostinfo_aton(host, &sin.sin_addr) == 0) { + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + h2o_socketpool_init_by_address(pool, (void *)&sin, sizeof(sin), is_ssl, capacity); + return; + } + + common_init(pool, H2O_SOCKETPOOL_TYPE_NAMED, host, is_ssl, capacity); + pool->peer.named_serv.base = h2o_mem_alloc(sizeof(H2O_UINT16_LONGEST_STR)); + pool->peer.named_serv.len = sprintf(pool->peer.named_serv.base, "%u", (unsigned)port); +} + +void h2o_socketpool_dispose(h2o_socketpool_t *pool) +{ + pthread_mutex_lock(&pool->_shared.mutex); + while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { + struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + destroy_attached(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + } + pthread_mutex_unlock(&pool->_shared.mutex); + pthread_mutex_destroy(&pool->_shared.mutex); + + if (pool->_interval_cb.loop != NULL) { + h2o_timeout_unlink(&pool->_interval_cb.entry); + h2o_timeout_dispose(pool->_interval_cb.loop, &pool->_interval_cb.timeout); + } + free(pool->peer.host.base); + switch (pool->type) { + case H2O_SOCKETPOOL_TYPE_NAMED: + free(pool->peer.named_serv.base); + break; + case H2O_SOCKETPOOL_TYPE_SOCKADDR: + break; + } +} + +void h2o_socketpool_set_timeout(h2o_socketpool_t *pool, h2o_loop_t *loop, uint64_t msec) +{ + pool->timeout = msec; + + pool->_interval_cb.loop = loop; + h2o_timeout_init(loop, &pool->_interval_cb.timeout, 1000); + pool->_interval_cb.entry.cb = on_timeout; + + h2o_timeout_link(loop, &pool->_interval_cb.timeout, &pool->_interval_cb.entry); +} + +static void call_connect_cb(h2o_socketpool_connect_request_t *req, const char *errstr) +{ + h2o_socketpool_connect_cb cb = req->cb; + h2o_socket_t *sock = req->sock; + void *data = req->data; + + free(req); + cb(sock, errstr, data); +} + +static void on_connect(h2o_socket_t *sock, const char *err) +{ + h2o_socketpool_connect_request_t *req = sock->data; + const char *errstr = NULL; + + assert(req->sock == sock); + + if (err != NULL) { + h2o_socket_close(sock); + req->sock = NULL; + errstr = "connection failed"; + } + call_connect_cb(req, errstr); +} + +static void on_close(void *data) +{ + h2o_socketpool_t *pool = data; + __sync_sub_and_fetch(&pool->_shared.count, 1); +} + +static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen) +{ + req->sock = h2o_socket_connect(req->loop, addr, addrlen, on_connect); + if (req->sock == NULL) { + __sync_sub_and_fetch(&req->pool->_shared.count, 1); + call_connect_cb(req, "failed to connect to host"); + return; + } + req->sock->data = req; + req->sock->on_close.cb = on_close; + req->sock->on_close.data = req->pool; +} + +static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req) +{ + h2o_socketpool_connect_request_t *req = _req; + + assert(getaddr_req == req->getaddr_req); + req->getaddr_req = NULL; + + if (errstr != NULL) { + __sync_sub_and_fetch(&req->pool->_shared.count, 1); + call_connect_cb(req, errstr); + return; + } + + struct addrinfo *selected = h2o_hostinfo_select_one(res); + start_connect(req, selected->ai_addr, selected->ai_addrlen); +} + +void h2o_socketpool_connect(h2o_socketpool_connect_request_t **_req, h2o_socketpool_t *pool, h2o_loop_t *loop, + h2o_multithread_receiver_t *getaddr_receiver, h2o_socketpool_connect_cb cb, void *data) +{ + struct pool_entry_t *entry = NULL; + + if (_req != NULL) + *_req = NULL; + + /* fetch an entry and return it */ + pthread_mutex_lock(&pool->_shared.mutex); + destroy_expired(pool); + while (1) { + if (h2o_linklist_is_empty(&pool->_shared.sockets)) + break; + entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, link, pool->_shared.sockets.next); + h2o_linklist_unlink(&entry->link); + pthread_mutex_unlock(&pool->_shared.mutex); + + /* test if the connection is still alive */ + char buf[1]; + ssize_t rret = recv(entry->sockinfo.fd, buf, 1, MSG_PEEK); + if (rret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* yes! return it */ + h2o_socket_t *sock = h2o_socket_import(loop, &entry->sockinfo); + free(entry); + sock->on_close.cb = on_close; + sock->on_close.data = pool; + cb(sock, NULL, data); + return; + } + + /* connection is dead, report, close, and retry */ + if (rret <= 0) { + static long counter = 0; + if (__sync_fetch_and_add(&counter, 1) == 0) + fprintf(stderr, "[WARN] detected close by upstream before the expected timeout (see issue #679)\n"); + } else { + static long counter = 0; + if (__sync_fetch_and_add(&counter, 1) == 0) + fprintf(stderr, "[WARN] unexpectedly received data to a pooled socket (see issue #679)\n"); + } + destroy_detached(entry); + pthread_mutex_lock(&pool->_shared.mutex); + } + pthread_mutex_unlock(&pool->_shared.mutex); + + /* FIXME repsect `capacity` */ + __sync_add_and_fetch(&pool->_shared.count, 1); + + /* prepare request object */ + h2o_socketpool_connect_request_t *req = h2o_mem_alloc(sizeof(*req)); + *req = (h2o_socketpool_connect_request_t){data, cb, pool, loop}; + if (_req != NULL) + *_req = req; + + switch (pool->type) { + case H2O_SOCKETPOOL_TYPE_NAMED: + /* resolve the name, and connect */ + req->getaddr_req = h2o_hostinfo_getaddr(getaddr_receiver, pool->peer.host, pool->peer.named_serv, AF_UNSPEC, SOCK_STREAM, + IPPROTO_TCP, AI_ADDRCONFIG | AI_NUMERICSERV, on_getaddr, req); + break; + case H2O_SOCKETPOOL_TYPE_SOCKADDR: + /* connect (using sockaddr_in) */ + start_connect(req, (void *)&pool->peer.sockaddr.bytes, pool->peer.sockaddr.len); + break; + } +} + +void h2o_socketpool_cancel_connect(h2o_socketpool_connect_request_t *req) +{ + if (req->getaddr_req != NULL) { + h2o_hostinfo_getaddr_cancel(req->getaddr_req); + req->getaddr_req = NULL; + } + if (req->sock != NULL) + h2o_socket_close(req->sock); + free(req); +} + +int h2o_socketpool_return(h2o_socketpool_t *pool, h2o_socket_t *sock) +{ + struct pool_entry_t *entry; + + /* reset the on_close callback */ + assert(sock->on_close.data == pool); + sock->on_close.cb = NULL; + sock->on_close.data = NULL; + + entry = h2o_mem_alloc(sizeof(*entry)); + if (h2o_socket_export(sock, &entry->sockinfo) != 0) { + free(entry); + __sync_sub_and_fetch(&pool->_shared.count, 1); + return -1; + } + memset(&entry->link, 0, sizeof(entry->link)); + entry->added_at = h2o_now(h2o_socket_get_loop(sock)); + + pthread_mutex_lock(&pool->_shared.mutex); + destroy_expired(pool); + h2o_linklist_insert(&pool->_shared.sockets, &entry->link); + pthread_mutex_unlock(&pool->_shared.mutex); + + return 0; +} diff --git a/web/server/h2o/libh2o/lib/common/string.c b/web/server/h2o/libh2o/lib/common/string.c new file mode 100644 index 000000000..3c068f3ad --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/string.c @@ -0,0 +1,594 @@ +/* + * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Justin Zhu, Fastly, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include "h2o/string_.h" + +h2o_iovec_t h2o_strdup(h2o_mem_pool_t *pool, const char *s, size_t slen) +{ + h2o_iovec_t ret; + + if (slen == SIZE_MAX) + slen = strlen(s); + + if (pool != NULL) { + ret.base = h2o_mem_alloc_pool(pool, slen + 1); + } else { + ret.base = h2o_mem_alloc(slen + 1); + } + h2o_memcpy(ret.base, s, slen); + ret.base[slen] = '\0'; + ret.len = slen; + return ret; +} + +h2o_iovec_t h2o_strdup_shared(h2o_mem_pool_t *pool, const char *s, size_t slen) +{ + h2o_iovec_t ret; + + if (slen == SIZE_MAX) + slen = strlen(s); + + ret.base = h2o_mem_alloc_shared(pool, slen + 1, NULL); + memcpy(ret.base, s, slen); + ret.base[slen] = '\0'; + ret.len = slen; + return ret; +} + +h2o_iovec_t h2o_strdup_slashed(h2o_mem_pool_t *pool, const char *src, size_t len) +{ + h2o_iovec_t ret; + + ret.len = len != SIZE_MAX ? len : strlen(src); + ret.base = pool != NULL ? h2o_mem_alloc_pool(pool, ret.len + 2) : h2o_mem_alloc(ret.len + 2); + memcpy(ret.base, src, ret.len); + if (ret.len != 0 && ret.base[ret.len - 1] != '/') + ret.base[ret.len++] = '/'; + ret.base[ret.len] = '\0'; + + return ret; +} + +int h2o__lcstris_core(const char *target, const char *test, size_t test_len) +{ + for (; test_len != 0; --test_len) + if (h2o_tolower(*target++) != *test++) + return 0; + return 1; +} + +size_t h2o_strtosize(const char *s, size_t len) +{ + uint64_t v = 0, m = 1; + const char *p = s + len; + + if (len == 0) + goto Error; + + while (1) { + int ch = *--p; + if (!('0' <= ch && ch <= '9')) + goto Error; + v += (ch - '0') * m; + if (p == s) + break; + m *= 10; + /* do not even try to overflow */ + if (m == 10000000000000000000ULL) + goto Error; + } + + if (v >= SIZE_MAX) + goto Error; + return v; + +Error: + return SIZE_MAX; +} + +size_t h2o_strtosizefwd(char **s, size_t len) +{ + uint64_t v, c; + char *p = *s, *p_end = *s + len; + + if (len == 0) + goto Error; + + int ch = *p++; + if (!('0' <= ch && ch <= '9')) + goto Error; + v = ch - '0'; + c = 1; + + while (1) { + ch = *p; + if (!('0' <= ch && ch <= '9')) + break; + v *= 10; + v += ch - '0'; + p++; + c++; + if (p == p_end) + break; + /* similar as above, do not even try to overflow */ + if (c == 20) + goto Error; + } + + if (v >= SIZE_MAX) + goto Error; + *s = p; + return v; + +Error: + return SIZE_MAX; +} + +static uint32_t decode_base64url_quad(const char *src) +{ + const char *src_end = src + 4; + uint32_t decoded = 0; + + while (1) { + if ('A' <= *src && *src <= 'Z') { + decoded |= *src - 'A'; + } else if ('a' <= *src && *src <= 'z') { + decoded |= *src - 'a' + 26; + } else if ('0' <= *src && *src <= '9') { + decoded |= *src - '0' + 52; + } else if (*src == '-') { + decoded |= 62; + } else if (*src == '_') { + decoded |= 63; +#if 1 /* curl uses normal base64 */ + } else if (*src == '+') { + decoded |= 62; + } else if (*src == '/') { + decoded |= 63; +#endif + } else { + return UINT32_MAX; + } + if (++src == src_end) + break; + decoded <<= 6; + } + + return decoded; +} + +h2o_iovec_t h2o_decode_base64url(h2o_mem_pool_t *pool, const char *src, size_t len) +{ + h2o_iovec_t decoded; + uint32_t t; + uint8_t *dst; + char remaining_input[4]; + + decoded.len = len * 3 / 4; + decoded.base = pool != NULL ? h2o_mem_alloc_pool(pool, decoded.len + 1) : h2o_mem_alloc(decoded.len + 1); + dst = (uint8_t *)decoded.base; + + while (len >= 4) { + if ((t = decode_base64url_quad(src)) == UINT32_MAX) + goto Error; + *dst++ = t >> 16; + *dst++ = t >> 8; + *dst++ = t; + src += 4; + len -= 4; + } + switch (len) { + case 0: + break; + case 1: + goto Error; + case 2: + remaining_input[0] = *src++; + remaining_input[1] = *src++; + remaining_input[2] = 'A'; + remaining_input[3] = 'A'; + if ((t = decode_base64url_quad(remaining_input)) == UINT32_MAX) + goto Error; + *dst++ = t >> 16; + break; + case 3: + remaining_input[0] = *src++; + remaining_input[1] = *src++; + remaining_input[2] = *src++; + remaining_input[3] = 'A'; + if ((t = decode_base64url_quad(remaining_input)) == UINT32_MAX) + goto Error; + *dst++ = t >> 16; + *dst++ = t >> 8; + break; + } + + assert((char *)dst - decoded.base == decoded.len); + decoded.base[decoded.len] = '\0'; + + return decoded; + +Error: + if (pool == NULL) + free(decoded.base); + return h2o_iovec_init(NULL, 0); +} + +size_t h2o_base64_encode(char *_dst, const void *_src, size_t len, int url_encoded) +{ + static const char *MAP = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + static const char *MAP_URL_ENCODED = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789-_"; + + char *dst = _dst; + const uint8_t *src = _src; + const char *map = url_encoded ? MAP_URL_ENCODED : MAP; + uint32_t quad; + + for (; len >= 3; src += 3, len -= 3) { + quad = ((uint32_t)src[0] << 16) | ((uint32_t)src[1] << 8) | src[2]; + *dst++ = map[quad >> 18]; + *dst++ = map[(quad >> 12) & 63]; + *dst++ = map[(quad >> 6) & 63]; + *dst++ = map[quad & 63]; + } + if (len != 0) { + quad = (uint32_t)src[0] << 16; + *dst++ = map[quad >> 18]; + if (len == 2) { + quad |= (uint32_t)src[1] << 8; + *dst++ = map[(quad >> 12) & 63]; + *dst++ = map[(quad >> 6) & 63]; + if (!url_encoded) + *dst++ = '='; + } else { + *dst++ = map[(quad >> 12) & 63]; + if (!url_encoded) { + *dst++ = '='; + *dst++ = '='; + } + } + } + + *dst = '\0'; + return dst - _dst; +} + +static int decode_hex(int ch) +{ + if ('0' <= ch && ch <= '9') + return ch - '0'; + if ('A' <= ch && ch <= 'F') + return ch - 'A' + 0xa; + if ('a' <= ch && ch <= 'f') + return ch - 'a' + 0xa; + return -1; +} + +int h2o_hex_decode(void *_dst, const char *src, size_t src_len) +{ + unsigned char *dst = _dst; + + if (src_len % 2 != 0) + return -1; + for (; src_len != 0; src_len -= 2) { + int hi, lo; + if ((hi = decode_hex(*src++)) == -1 || (lo = decode_hex(*src++)) == -1) + return -1; + *dst++ = (hi << 4) | lo; + } + return 0; +} + +void h2o_hex_encode(char *dst, const void *_src, size_t src_len) +{ + const unsigned char *src = _src, *src_end = src + src_len; + for (; src != src_end; ++src) { + *dst++ = "0123456789abcdef"[*src >> 4]; + *dst++ = "0123456789abcdef"[*src & 0xf]; + } + *dst = '\0'; +} + +h2o_iovec_t h2o_uri_escape(h2o_mem_pool_t *pool, const char *s, size_t l, const char *preserve_chars) +{ + h2o_iovec_t encoded; + size_t i, capacity = l * 3 + 1; + + encoded.base = pool != NULL ? h2o_mem_alloc_pool(pool, capacity) : h2o_mem_alloc(capacity); + encoded.len = 0; + + /* RFC 3986: + path-noscheme = segment-nz-nc *( "/" segment ) + segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" ) + unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" + sub-delims = "!" / "$" / "&" / "'" / "(" / ")" + / "*" / "+" / "," / ";" / "=" + */ + for (i = 0; i != l; ++i) { + int ch = s[i]; + if (('A' <= ch && ch <= 'Z') || ('a' <= ch && ch <= 'z') || ('0' <= ch && ch <= '9') || ch == '-' || ch == '.' || + ch == '_' || ch == '~' || ch == '!' || ch == '$' || ch == '&' || ch == '\'' || ch == '(' || ch == ')' || ch == '*' || + ch == '+' || ch == ',' || ch == ';' || ch == '=' || + (ch != '\0' && preserve_chars != NULL && strchr(preserve_chars, ch) != NULL)) { + encoded.base[encoded.len++] = ch; + } else { + encoded.base[encoded.len++] = '%'; + encoded.base[encoded.len++] = "0123456789ABCDEF"[(ch >> 4) & 0xf]; + encoded.base[encoded.len++] = "0123456789ABCDEF"[ch & 0xf]; + } + } + encoded.base[encoded.len] = '\0'; + + return encoded; +} + +h2o_iovec_t h2o_get_filext(const char *path, size_t len) +{ + const char *end = path + len, *p = end; + + while (--p != path) { + if (*p == '.') { + return h2o_iovec_init(p + 1, end - (p + 1)); + } else if (*p == '/') { + break; + } + } + return h2o_iovec_init(NULL, 0); +} + +static int is_ws(int ch) +{ + return ch == ' ' || ch == '\t' || ch == '\r' || ch == '\n'; +} + +h2o_iovec_t h2o_str_stripws(const char *s, size_t len) +{ + const char *end = s + len; + + while (s != end) { + if (!is_ws(*s)) + break; + ++s; + } + while (s != end) { + if (!is_ws(end[-1])) + break; + --end; + } + return h2o_iovec_init(s, end - s); +} + +size_t h2o_strstr(const char *haysack, size_t haysack_len, const char *needle, size_t needle_len) +{ + /* TODO optimize */ + if (haysack_len >= needle_len) { + size_t off, max = haysack_len - needle_len + 1; + if (needle_len == 0) + return 0; + for (off = 0; off != max; ++off) + if (haysack[off] == needle[0] && memcmp(haysack + off + 1, needle + 1, needle_len - 1) == 0) + return off; + } + return SIZE_MAX; +} + +/* note: returns a zero-width match as well */ +const char *h2o_next_token(h2o_iovec_t *iter, int separator, size_t *element_len, h2o_iovec_t *value) +{ + const char *cur = iter->base, *end = iter->base + iter->len, *token_start, *token_end; + + /* find start */ + for (;; ++cur) { + if (cur == end) + return NULL; + if (!(*cur == ' ' || *cur == '\t')) + break; + } + token_start = cur; + token_end = cur; + + /* find last */ + for (;; ++cur) { + if (cur == end) + break; + if (*cur == separator) { + ++cur; + break; + } + if (*cur == ',') { + if (token_start == cur) { + ++cur; + token_end = cur; + } + break; + } + if (value != NULL && *cur == '=') { + ++cur; + goto FindValue; + } + if (!(*cur == ' ' || *cur == '\t')) + token_end = cur + 1; + } + + /* found */ + *iter = h2o_iovec_init(cur, end - cur); + *element_len = token_end - token_start; + if (value != NULL) + *value = (h2o_iovec_t){NULL}; + return token_start; + +FindValue: + *iter = h2o_iovec_init(cur, end - cur); + *element_len = token_end - token_start; + if ((value->base = (char *)h2o_next_token(iter, separator, &value->len, NULL)) == NULL) { + *value = (h2o_iovec_t){"", 0}; + } else if (h2o_memis(value->base, value->len, H2O_STRLIT(","))) { + *value = (h2o_iovec_t){"", 0}; + iter->base -= 1; + iter->len += 1; + } + return token_start; +} + +int h2o_contains_token(const char *haysack, size_t haysack_len, const char *needle, size_t needle_len, int separator) +{ + h2o_iovec_t iter = h2o_iovec_init(haysack, haysack_len); + const char *token = NULL; + size_t token_len = 0; + + while ((token = h2o_next_token(&iter, separator, &token_len, NULL)) != NULL) { + if (h2o_lcstris(token, token_len, needle, needle_len)) { + return 1; + } + } + return 0; +} + +h2o_iovec_t h2o_htmlescape(h2o_mem_pool_t *pool, const char *src, size_t len) +{ + const char *s, *end = src + len; + size_t add_size = 0; + +#define ENTITY_MAP() \ + ENTITY('"', """); \ + ENTITY('&', "&"); \ + ENTITY('\'', "'"); \ + ENTITY('<', "<"); \ + ENTITY('>', ">"); + + for (s = src; s != end; ++s) { + if ((unsigned)(unsigned char)*s - '"' <= '>' - '"') { + switch (*s) { +#define ENTITY(code, quoted) \ + case code: \ + add_size += sizeof(quoted) - 2; \ + break + ENTITY_MAP() +#undef ENTITY + } + } + } + + /* escape and return the result if necessary */ + if (add_size != 0) { + /* allocate buffer and fill in the chars that are known not to require escaping */ + h2o_iovec_t escaped = {h2o_mem_alloc_pool(pool, len + add_size + 1), 0}; + /* fill-in the rest */ + for (s = src; s != end; ++s) { + switch (*s) { +#define ENTITY(code, quoted) \ + case code: \ + memcpy(escaped.base + escaped.len, quoted, sizeof(quoted) - 1); \ + escaped.len += sizeof(quoted) - 1; \ + break + ENTITY_MAP() +#undef ENTITY + default: + escaped.base[escaped.len++] = *s; + break; + } + } + assert(escaped.len == len + add_size); + escaped.base[escaped.len] = '\0'; + + return escaped; + } + +#undef ENTITY_MAP + + /* no need not escape; return the original */ + return h2o_iovec_init(src, len); +} + +h2o_iovec_t h2o_concat_list(h2o_mem_pool_t *pool, h2o_iovec_t *list, size_t count) +{ + h2o_iovec_t ret = {NULL, 0}; + size_t i; + + /* calc the length */ + for (i = 0; i != count; ++i) { + ret.len += list[i].len; + } + + /* allocate memory */ + if (pool != NULL) + ret.base = h2o_mem_alloc_pool(pool, ret.len + 1); + else + ret.base = h2o_mem_alloc(ret.len + 1); + + /* concatenate */ + ret.len = 0; + for (i = 0; i != count; ++i) { + h2o_memcpy(ret.base + ret.len, list[i].base, list[i].len); + ret.len += list[i].len; + } + ret.base[ret.len] = '\0'; + + return ret; +} + +int h2o_str_at_position(char *buf, const char *src, size_t src_len, int lineno, int column) +{ + const char *src_end = src + src_len; + int i; + + /* find the line */ + if (lineno <= 0 || column <= 0) + return -1; + for (--lineno; lineno != 0; --lineno) { + do { + if (src == src_end) + return -1; + } while (*src++ != '\n'); + } + + /* adjust the starting column */ + while (column > 40) { + if (src != src_end) + ++src; + --column; + } + + /* emit */ + for (i = 1; i <= 76; ++i) { + if (src == src_end || *src == '\n') + break; + *buf++ = *src++; + } + if (i < column) + column = i; + *buf++ = '\n'; + for (i = 1; i < column; ++i) + *buf++ = ' '; + *buf++ = '^'; + *buf++ = '\n'; + *buf = '\0'; + return 0; +} diff --git a/web/server/h2o/libh2o/lib/common/time.c b/web/server/h2o/libh2o/lib/common/time.c new file mode 100644 index 000000000..368143e78 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/time.c @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2015 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <assert.h> +#include <stdio.h> +#include <string.h> +#include "h2o/time_.h" + +static char *emit_wday(char *dst, int wday) +{ + memcpy(dst, ("SunMonTueWedThuFriSat") + wday * 3, 3); + return dst + 3; +} + +static char *emit_mon(char *dst, int mon) +{ + memcpy(dst, ("JanFebMarAprMayJunJulAugSepOctNovDec") + mon * 3, 3); + return dst + 3; +} + +static char *emit_digits(char *dst, int n, size_t cnt) +{ + char *p = dst + cnt; + + /* emit digits from back */ + do { + *--p = '0' + n % 10; + n /= 10; + } while (p != dst); + + return dst + cnt; +} + +void h2o_time2str_rfc1123(char *buf, struct tm *gmt) +{ + char *p = buf; + + /* format: Fri, 19 Sep 2014 05:24:04 GMT */ + p = emit_wday(p, gmt->tm_wday); + *p++ = ','; + *p++ = ' '; + p = emit_digits(p, gmt->tm_mday, 2); + *p++ = ' '; + p = emit_mon(p, gmt->tm_mon); + *p++ = ' '; + p = emit_digits(p, gmt->tm_year + 1900, 4); + *p++ = ' '; + p = emit_digits(p, gmt->tm_hour, 2); + *p++ = ':'; + p = emit_digits(p, gmt->tm_min, 2); + *p++ = ':'; + p = emit_digits(p, gmt->tm_sec, 2); + memcpy(p, " GMT", 4); + p += 4; + *p = '\0'; + + assert(p - buf == H2O_TIMESTR_RFC1123_LEN); +} + +static int fetch_digits(const char *s, size_t n) +{ + int value = 0; + for (; n != 0; ++s, --n) { + if (!('0' <= *s && *s <= '9')) + return -1; + value = value * 10 + *s - '0'; + } + return value; +} + +int h2o_time_parse_rfc1123(const char *s, size_t len, struct tm *tm) +{ + if (len != H2O_TIMESTR_RFC1123_LEN) + return -1; + +/* 1 2 + * 01234567890123456789012345678 + * Fri, 19 Sep 2014 05:24:04 GMT + */ + +#define FETCH(dst, pos, n) \ + if ((dst = fetch_digits(s + pos, n)) == -1) \ + return -1; + FETCH(tm->tm_year, 12, 4); + tm->tm_year -= 1900; + /* month is parsed afterwards */ + FETCH(tm->tm_mday, 5, 2); + FETCH(tm->tm_hour, 17, 2); + FETCH(tm->tm_min, 20, 2); + FETCH(tm->tm_sec, 23, 2); +#undef FETCH + +#define PACK3(a, b, c) (((a)&0xff) << 16 | ((b)&0xff) << 8 | ((c)&0xff)) +#define MAP(c1, c2, c3, value) \ + case PACK3(c1, c2, c3): \ + tm->tm_mon = value; \ + break + switch (PACK3(s[8], s[9], s[10])) { + MAP('J', 'a', 'n', 0); + MAP('F', 'e', 'b', 1); + MAP('M', 'a', 'r', 2); + MAP('A', 'p', 'r', 3); + MAP('M', 'a', 'y', 4); + MAP('J', 'u', 'n', 5); + MAP('J', 'u', 'l', 6); + MAP('A', 'u', 'g', 7); + MAP('S', 'e', 'p', 8); + MAP('O', 'c', 't', 9); + MAP('N', 'o', 'v', 10); + MAP('D', 'e', 'c', 11); + default: + return -1; + } +#undef MAP +#undef PACK3 + + return 0; +} + +static int calc_gmt_offset(time_t t, struct tm *local) +{ + struct tm gmt; + int delta; + + gmtime_r(&t, &gmt); + delta = (local->tm_hour - gmt.tm_hour) * 60 + (local->tm_min - gmt.tm_min); + + if (local->tm_yday != gmt.tm_yday) { + int day_offset; + if (local->tm_year == gmt.tm_year) + day_offset = local->tm_yday - gmt.tm_yday; + else + day_offset = local->tm_year - gmt.tm_year; + delta += day_offset * 24 * 60; + } + return delta; +} + +void h2o_time2str_log(char *buf, time_t time) +{ + struct tm localt; + localtime_r(&time, &localt); + int gmt_off = calc_gmt_offset(time, &localt); + int gmt_sign; + + if (gmt_off >= 0) { + gmt_sign = '+'; + } else { + gmt_off = -gmt_off; + gmt_sign = '-'; + } + + int len = sprintf(buf, "%02d/%s/%d:%02d:%02d:%02d %c%02d%02d", localt.tm_mday, + ("Jan\0Feb\0Mar\0Apr\0May\0Jun\0Jul\0Aug\0Sep\0Oct\0Nov\0Dec\0") + localt.tm_mon * 4, localt.tm_year + 1900, + localt.tm_hour, localt.tm_min, localt.tm_sec, gmt_sign, gmt_off / 60, gmt_off % 60); + assert(len == H2O_TIMESTR_LOG_LEN); +} diff --git a/web/server/h2o/libh2o/lib/common/timeout.c b/web/server/h2o/libh2o/lib/common/timeout.c new file mode 100644 index 000000000..c50be0a3f --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/timeout.c @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2014 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include "h2o/timeout.h" + +void h2o_timeout_run(h2o_loop_t *loop, h2o_timeout_t *timeout, uint64_t now) +{ + uint64_t max_registered_at = now - timeout->timeout; + + while (!h2o_linklist_is_empty(&timeout->_entries)) { + h2o_timeout_entry_t *entry = H2O_STRUCT_FROM_MEMBER(h2o_timeout_entry_t, _link, timeout->_entries.next); + if (entry->registered_at > max_registered_at) { + break; + } + h2o_linklist_unlink(&entry->_link); + entry->registered_at = 0; + entry->cb(entry); + h2o_timeout__do_post_callback(loop); + } +} + +uint64_t h2o_timeout_get_wake_at(h2o_linklist_t *timeouts) +{ + h2o_linklist_t *node; + uint64_t wake_at = UINT64_MAX; + + /* change wake_at to the minimum value of the timeouts */ + for (node = timeouts->next; node != timeouts; node = node->next) { + h2o_timeout_t *timeout = H2O_STRUCT_FROM_MEMBER(h2o_timeout_t, _link, node); + if (!h2o_linklist_is_empty(&timeout->_entries)) { + h2o_timeout_entry_t *entry = H2O_STRUCT_FROM_MEMBER(h2o_timeout_entry_t, _link, timeout->_entries.next); + uint64_t entry_wake_at = entry->registered_at + timeout->timeout; + if (entry_wake_at < wake_at) + wake_at = entry_wake_at; + } + } + + return wake_at; +} + +void h2o_timeout_init(h2o_loop_t *loop, h2o_timeout_t *timeout, uint64_t millis) +{ + memset(timeout, 0, sizeof(*timeout)); + timeout->timeout = millis; + h2o_linklist_init_anchor(&timeout->_entries); + + h2o_timeout__do_init(loop, timeout); +} + +void h2o_timeout_dispose(h2o_loop_t *loop, h2o_timeout_t *timeout) +{ + assert(h2o_linklist_is_empty(&timeout->_entries)); + h2o_timeout__do_dispose(loop, timeout); +} + +void h2o_timeout_link(h2o_loop_t *loop, h2o_timeout_t *timeout, h2o_timeout_entry_t *entry) +{ + /* insert at tail, so that the entries are sorted in ascending order */ + h2o_linklist_insert(&timeout->_entries, &entry->_link); + /* set data */ + entry->registered_at = h2o_now(loop); + + h2o_timeout__do_link(loop, timeout, entry); +} + +void h2o_timeout_unlink(h2o_timeout_entry_t *entry) +{ + if (h2o_linklist_is_linked(&entry->_link)) { + h2o_linklist_unlink(&entry->_link); + entry->registered_at = 0; + } +} diff --git a/web/server/h2o/libh2o/lib/common/url.c b/web/server/h2o/libh2o/lib/common/url.c new file mode 100644 index 000000000..d65d18fb5 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/url.c @@ -0,0 +1,409 @@ +/* + * Copyright (c) 2014,2015 DeNA Co., Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include "h2o/memory.h" +#include "h2o/string_.h" +#include "h2o/url.h" + +const h2o_url_scheme_t H2O_URL_SCHEME_HTTP = {{H2O_STRLIT("http")}, 80}; +const h2o_url_scheme_t H2O_URL_SCHEME_HTTPS = {{H2O_STRLIT("https")}, 443}; + +static int decode_hex(int ch) +{ + if ('0' <= ch && ch <= '9') + return ch - '0'; + if ('A' <= ch && ch <= 'F') + return ch - 'A' + 0xa; + if ('a' <= ch && ch <= 'f') + return ch - 'a' + 0xa; + return -1; +} + +static size_t handle_special_paths(const char *path, size_t off, size_t last_slash) +{ + size_t orig_off = off, part_size = off - last_slash; + + if (part_size == 2 && path[off - 1] == '.') { + --off; + } else if (part_size == 3 && path[off - 2] == '.' && path[off - 1] == '.') { + off -= 2; + if (off > 1) { + for (--off; path[off - 1] != '/'; --off) + ; + } + } + return orig_off - off; +} + +/* Perform path normalization and URL decoding in one pass. + * See h2o_req_t for the purpose of @norm_indexes. */ +static h2o_iovec_t rebuild_path(h2o_mem_pool_t *pool, const char *src, size_t src_len, size_t *query_at, size_t **norm_indexes) +{ + char *dst; + size_t src_off = 0, dst_off = 0, last_slash, rewind; + + { /* locate '?', and set len to the end of input path */ + const char *q = memchr(src, '?', src_len); + if (q != NULL) { + src_len = *query_at = q - src; + } else { + *query_at = SIZE_MAX; + } + } + + /* dst can be 1 byte more than src if src is missing the prefixing '/' */ + dst = h2o_mem_alloc_pool(pool, src_len + 1); + *norm_indexes = h2o_mem_alloc_pool(pool, (src_len + 1) * sizeof(*norm_indexes[0])); + + if (src[0] == '/') + src_off++; + last_slash = dst_off; + dst[dst_off] = '/'; + (*norm_indexes)[dst_off] = src_off; + dst_off++; + + /* decode %xx */ + while (src_off < src_len) { + int hi, lo; + char decoded; + + if (src[src_off] == '%' && (src_off + 2 < src_len) && (hi = decode_hex(src[src_off + 1])) != -1 && + (lo = decode_hex(src[src_off + 2])) != -1) { + decoded = (hi << 4) | lo; + src_off += 3; + } else { + decoded = src[src_off++]; + } + if (decoded == '/') { + rewind = handle_special_paths(dst, dst_off, last_slash); + if (rewind > 0) { + dst_off -= rewind; + last_slash = dst_off - 1; + continue; + } + last_slash = dst_off; + } + dst[dst_off] = decoded; + (*norm_indexes)[dst_off] = src_off; + dst_off++; + } + rewind = handle_special_paths(dst, dst_off, last_slash); + dst_off -= rewind; + + return h2o_iovec_init(dst, dst_off); +} + +h2o_iovec_t h2o_url_normalize_path(h2o_mem_pool_t *pool, const char *path, size_t len, size_t *query_at, size_t **norm_indexes) +{ + const char *p = path, *end = path + len; + h2o_iovec_t ret; + + *query_at = SIZE_MAX; + *norm_indexes = NULL; + + if (len == 0) { + ret = h2o_iovec_init("/", 1); + return ret; + } + + if (path[0] != '/') + goto Rewrite; + + for (; p + 1 < end; ++p) { + if ((p[0] == '/' && p[1] == '.') || p[0] == '%') { + /* detect false positives as well */ + goto Rewrite; + } else if (p[0] == '?') { + *query_at = p - path; + goto Return; + } + } + for (; p < end; ++p) { + if (p[0] == '?') { + *query_at = p - path; + goto Return; + } + } + +Return: + ret.base = (char *)path; + ret.len = p - path; + return ret; + +Rewrite: + ret = rebuild_path(pool, path, len, query_at, norm_indexes); + if (ret.len == 0) + goto RewriteError; + if (ret.base[0] != '/') + goto RewriteError; + if (h2o_strstr(ret.base, ret.len, H2O_STRLIT("/../")) != SIZE_MAX) + goto RewriteError; + if (ret.len >= 3 && memcmp(ret.base + ret.len - 3, "/..", 3) == 0) + goto RewriteError; + return ret; +RewriteError: + fprintf(stderr, "failed to normalize path: `%.*s` => `%.*s`\n", (int)len, path, (int)ret.len, ret.base); + ret = h2o_iovec_init("/", 1); + return ret; +} + +static const char *parse_scheme(const char *s, const char *end, const h2o_url_scheme_t **scheme) +{ + if (end - s >= 5 && memcmp(s, "http:", 5) == 0) { + *scheme = &H2O_URL_SCHEME_HTTP; + return s + 5; + } else if (end - s >= 6 && memcmp(s, "https:", 6) == 0) { + *scheme = &H2O_URL_SCHEME_HTTPS; + return s + 6; + } + return NULL; +} + +const char *h2o_url_parse_hostport(const char *s, size_t len, h2o_iovec_t *host, uint16_t *port) +{ + const char *token_start = s, *token_end, *end = s + len; + + *port = 65535; + + if (token_start == end) + return NULL; + + if (*token_start == '[') { + /* is IPv6 address */ + ++token_start; + if ((token_end = memchr(token_start, ']', end - token_start)) == NULL) + return NULL; + *host = h2o_iovec_init(token_start, token_end - token_start); + token_start = token_end + 1; + } else { + for (token_end = token_start; !(token_end == end || *token_end == '/' || *token_end == ':'); ++token_end) + ; + *host = h2o_iovec_init(token_start, token_end - token_start); + token_start = token_end; + } + + /* disallow zero-length host */ + if (host->len == 0) + return NULL; + + /* parse port */ + if (token_start != end && *token_start == ':') { + size_t p; + ++token_start; + if ((token_end = memchr(token_start, '/', end - token_start)) == NULL) + token_end = end; + if ((p = h2o_strtosize(token_start, token_end - token_start)) >= 65535) + return NULL; + *port = (uint16_t)p; + token_start = token_end; + } + + return token_start; +} + +static int parse_authority_and_path(const char *src, const char *url_end, h2o_url_t *parsed) +{ + const char *p = h2o_url_parse_hostport(src, url_end - src, &parsed->host, &parsed->_port); + if (p == NULL) + return -1; + parsed->authority = h2o_iovec_init(src, p - src); + if (p == url_end) { + parsed->path = h2o_iovec_init(H2O_STRLIT("/")); + } else { + if (*p != '/') + return -1; + parsed->path = h2o_iovec_init(p, url_end - p); + } + return 0; +} + +int h2o_url_parse(const char *url, size_t url_len, h2o_url_t *parsed) +{ + const char *url_end, *p; + + if (url_len == SIZE_MAX) + url_len = strlen(url); + url_end = url + url_len; + + /* check and skip scheme */ + if ((p = parse_scheme(url, url_end, &parsed->scheme)) == NULL) + return -1; + + /* skip "//" */ + if (!(url_end - p >= 2 && p[0] == '/' && p[1] == '/')) + return -1; + p += 2; + + return parse_authority_and_path(p, url_end, parsed); +} + +int h2o_url_parse_relative(const char *url, size_t url_len, h2o_url_t *parsed) +{ + const char *url_end, *p; + + if (url_len == SIZE_MAX) + url_len = strlen(url); + url_end = url + url_len; + + /* obtain scheme and port number */ + if ((p = parse_scheme(url, url_end, &parsed->scheme)) == NULL) { + parsed->scheme = NULL; + p = url; + } + + /* handle "//" */ + if (url_end - p >= 2 && p[0] == '/' && p[1] == '/') + return parse_authority_and_path(p + 2, url_end, parsed); + + /* reset authority, host, port, and set path */ + parsed->authority = (h2o_iovec_t){NULL}; + parsed->host = (h2o_iovec_t){NULL}; + parsed->_port = 65535; + parsed->path = h2o_iovec_init(p, url_end - p); + + return 0; +} + +h2o_iovec_t h2o_url_resolve(h2o_mem_pool_t *pool, const h2o_url_t *base, const h2o_url_t *relative, h2o_url_t *dest) +{ + h2o_iovec_t base_path, relative_path, ret; + + assert(base->path.len != 0); + assert(base->path.base[0] == '/'); + + if (relative == NULL) { + /* build URL using base copied to dest */ + *dest = *base; + base_path = base->path; + relative_path = h2o_iovec_init(NULL, 0); + goto Build; + } + + /* scheme */ + dest->scheme = relative->scheme != NULL ? relative->scheme : base->scheme; + + /* authority (and host:port) */ + if (relative->authority.base != NULL) { + assert(relative->host.base != NULL); + dest->authority = relative->authority; + dest->host = relative->host; + dest->_port = relative->_port; + } else { + assert(relative->host.base == NULL); + assert(relative->_port == 65535); + dest->authority = base->authority; + dest->host = base->host; + dest->_port = base->_port; + } + + /* path */ + base_path = base->path; + if (relative->path.base != NULL) { + relative_path = relative->path; + h2o_url_resolve_path(&base_path, &relative_path); + } else { + assert(relative->path.len == 0); + relative_path = (h2o_iovec_t){NULL}; + } + +Build: + /* build the output */ + ret = h2o_concat(pool, dest->scheme->name, h2o_iovec_init(H2O_STRLIT("://")), dest->authority, base_path, relative_path); + /* adjust dest */ + dest->authority.base = ret.base + dest->scheme->name.len + 3; + dest->host.base = dest->authority.base; + if (dest->authority.len != 0 && dest->authority.base[0] == '[') + ++dest->host.base; + dest->path.base = dest->authority.base + dest->authority.len; + dest->path.len = ret.base + ret.len - dest->path.base; + + return ret; +} + +void h2o_url_resolve_path(h2o_iovec_t *base, h2o_iovec_t *relative) +{ + size_t base_path_len = base->len, rel_path_offset = 0; + + if (relative->len != 0 && relative->base[0] == '/') { + base_path_len = 0; + } else { + /* relative path */ + while (base->base[--base_path_len] != '/') + ; + while (rel_path_offset != relative->len) { + if (relative->base[rel_path_offset] == '.') { + if (relative->len - rel_path_offset >= 2 && relative->base[rel_path_offset + 1] == '.' && + (relative->len - rel_path_offset == 2 || relative->base[rel_path_offset + 2] == '/')) { + if (base_path_len != 0) { + while (base->base[--base_path_len] != '/') + ; + } + rel_path_offset += relative->len - rel_path_offset == 2 ? 2 : 3; + continue; + } + if (relative->len - rel_path_offset == 1) { + rel_path_offset += 1; + continue; + } else if (relative->base[rel_path_offset + 1] == '/') { + rel_path_offset += 2; + continue; + } + } + break; + } + base_path_len += 1; + } + + base->len = base_path_len; + *relative = h2o_iovec_init(relative->base + rel_path_offset, relative->len - rel_path_offset); +} + +void h2o_url_copy(h2o_mem_pool_t *pool, h2o_url_t *dest, const h2o_url_t *src) +{ + dest->scheme = src->scheme; + dest->authority = h2o_strdup(pool, src->authority.base, src->authority.len); + dest->host = h2o_strdup(pool, src->host.base, src->host.len); + dest->path = h2o_strdup(pool, src->path.base, src->path.len); + dest->_port = src->_port; +} + +const char *h2o_url_host_to_sun(h2o_iovec_t host, struct sockaddr_un *sa) +{ +#define PREFIX "unix:" + + if (host.len < sizeof(PREFIX) - 1 || memcmp(host.base, PREFIX, sizeof(PREFIX) - 1) != 0) + return h2o_url_host_to_sun_err_is_not_unix_socket; + + if (host.len - sizeof(PREFIX) - 1 >= sizeof(sa->sun_path)) + return "unix-domain socket path is too long"; + + memset(sa, 0, sizeof(*sa)); + sa->sun_family = AF_UNIX; + memcpy(sa->sun_path, host.base + sizeof(PREFIX) - 1, host.len - (sizeof(PREFIX) - 1)); + return NULL; + +#undef PREFIX +} + +const char *h2o_url_host_to_sun_err_is_not_unix_socket = "supplied name does not look like an unix-domain socket"; |