From be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 04:57:58 +0200 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- web/server/h2o/libh2o/lib/common/multithread.c | 274 +++++++++++++++++++++++++ 1 file changed, 274 insertions(+) create mode 100644 web/server/h2o/libh2o/lib/common/multithread.c (limited to 'web/server/h2o/libh2o/lib/common/multithread.c') diff --git a/web/server/h2o/libh2o/lib/common/multithread.c b/web/server/h2o/libh2o/lib/common/multithread.c new file mode 100644 index 00000000..b4e8ba83 --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/multithread.c @@ -0,0 +1,274 @@ +/* + * Copyright (c) 2015-2016 DeNA Co., Ltd., Kazuho Oku, Tatsuhiko Kubo, + * Chul-Woong Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include +#include +#include "cloexec.h" +#include "h2o/multithread.h" + +struct st_h2o_multithread_queue_t { +#if H2O_USE_LIBUV + uv_async_t async; +#else + struct { + int write; + h2o_socket_t *read; + } async; +#endif + pthread_mutex_t mutex; + struct { + h2o_linklist_t active; + h2o_linklist_t inactive; + } receivers; +}; + +static void queue_cb(h2o_multithread_queue_t *queue) +{ + pthread_mutex_lock(&queue->mutex); + + while (!h2o_linklist_is_empty(&queue->receivers.active)) { + h2o_multithread_receiver_t *receiver = + H2O_STRUCT_FROM_MEMBER(h2o_multithread_receiver_t, _link, queue->receivers.active.next); + /* detach all the messages from the receiver */ + h2o_linklist_t messages; + h2o_linklist_init_anchor(&messages); + h2o_linklist_insert_list(&messages, &receiver->_messages); + /* relink the receiver to the inactive list */ + h2o_linklist_unlink(&receiver->_link); + h2o_linklist_insert(&queue->receivers.inactive, &receiver->_link); + + /* dispatch the messages */ + pthread_mutex_unlock(&queue->mutex); + receiver->cb(receiver, &messages); + assert(h2o_linklist_is_empty(&messages)); + pthread_mutex_lock(&queue->mutex); + } + + pthread_mutex_unlock(&queue->mutex); +} + +#ifdef H2O_NO_64BIT_ATOMICS +pthread_mutex_t h2o_conn_id_mutex = PTHREAD_MUTEX_INITIALIZER; +#endif + +#if H2O_USE_LIBUV +#else + +#include +#include +#include + +static void on_read(h2o_socket_t *sock, const char *err) +{ + if (err != NULL) { + fprintf(stderr, "pipe error\n"); + abort(); + } + + h2o_buffer_consume(&sock->input, sock->input->size); + queue_cb(sock->data); +} + +static void init_async(h2o_multithread_queue_t *queue, h2o_loop_t *loop) +{ + int fds[2]; + + if (cloexec_pipe(fds) != 0) { + perror("pipe"); + abort(); + } + fcntl(fds[1], F_SETFL, O_NONBLOCK); + queue->async.write = fds[1]; + queue->async.read = h2o_evloop_socket_create(loop, fds[0], 0); + queue->async.read->data = queue; + h2o_socket_read_start(queue->async.read, on_read); +} + +#endif + +h2o_multithread_queue_t *h2o_multithread_create_queue(h2o_loop_t *loop) +{ + h2o_multithread_queue_t *queue = h2o_mem_alloc(sizeof(*queue)); + memset(queue, 0, sizeof(*queue)); + +#if H2O_USE_LIBUV + uv_async_init(loop, &queue->async, (uv_async_cb)queue_cb); +#else + init_async(queue, loop); +#endif + pthread_mutex_init(&queue->mutex, NULL); + h2o_linklist_init_anchor(&queue->receivers.active); + h2o_linklist_init_anchor(&queue->receivers.inactive); + + return queue; +} + +void h2o_multithread_destroy_queue(h2o_multithread_queue_t *queue) +{ + assert(h2o_linklist_is_empty(&queue->receivers.active)); + assert(h2o_linklist_is_empty(&queue->receivers.inactive)); +#if H2O_USE_LIBUV + uv_close((uv_handle_t *)&queue->async, (uv_close_cb)free); +#else + h2o_socket_read_stop(queue->async.read); + h2o_socket_close(queue->async.read); + close(queue->async.write); +#endif + pthread_mutex_destroy(&queue->mutex); +} + +void h2o_multithread_register_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver, + h2o_multithread_receiver_cb cb) +{ + receiver->queue = queue; + receiver->_link = (h2o_linklist_t){NULL}; + h2o_linklist_init_anchor(&receiver->_messages); + receiver->cb = cb; + + pthread_mutex_lock(&queue->mutex); + h2o_linklist_insert(&queue->receivers.inactive, &receiver->_link); + pthread_mutex_unlock(&queue->mutex); +} + +void h2o_multithread_unregister_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver) +{ + assert(queue == receiver->queue); + assert(h2o_linklist_is_empty(&receiver->_messages)); + pthread_mutex_lock(&queue->mutex); + h2o_linklist_unlink(&receiver->_link); + pthread_mutex_unlock(&queue->mutex); +} + +void h2o_multithread_send_message(h2o_multithread_receiver_t *receiver, h2o_multithread_message_t *message) +{ + int do_send = 0; + + pthread_mutex_lock(&receiver->queue->mutex); + if (message != NULL) { + assert(!h2o_linklist_is_linked(&message->link)); + if (h2o_linklist_is_empty(&receiver->_messages)) { + h2o_linklist_unlink(&receiver->_link); + h2o_linklist_insert(&receiver->queue->receivers.active, &receiver->_link); + do_send = 1; + } + h2o_linklist_insert(&receiver->_messages, &message->link); + } else { + if (h2o_linklist_is_empty(&receiver->_messages)) + do_send = 1; + } + pthread_mutex_unlock(&receiver->queue->mutex); + + if (do_send) { +#if H2O_USE_LIBUV + uv_async_send(&receiver->queue->async); +#else + while (write(receiver->queue->async.write, "", 1) == -1 && errno == EINTR) + ; +#endif + } +} + +void h2o_multithread_create_thread(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg) +{ + if (pthread_create(tid, attr, func, arg) != 0) { + perror("pthread_create"); + abort(); + } +} + +void h2o_sem_init(h2o_sem_t *sem, ssize_t capacity) +{ + pthread_mutex_init(&sem->_mutex, NULL); + pthread_cond_init(&sem->_cond, NULL); + sem->_cur = capacity; + sem->_capacity = capacity; +} + +void h2o_sem_destroy(h2o_sem_t *sem) +{ + assert(sem->_cur == sem->_capacity); + pthread_cond_destroy(&sem->_cond); + pthread_mutex_destroy(&sem->_mutex); +} + +void h2o_sem_wait(h2o_sem_t *sem) +{ + pthread_mutex_lock(&sem->_mutex); + while (sem->_cur <= 0) + pthread_cond_wait(&sem->_cond, &sem->_mutex); + --sem->_cur; + pthread_mutex_unlock(&sem->_mutex); +} + +void h2o_sem_post(h2o_sem_t *sem) +{ + pthread_mutex_lock(&sem->_mutex); + ++sem->_cur; + pthread_cond_signal(&sem->_cond); + pthread_mutex_unlock(&sem->_mutex); +} + +void h2o_sem_set_capacity(h2o_sem_t *sem, ssize_t new_capacity) +{ + pthread_mutex_lock(&sem->_mutex); + sem->_cur += new_capacity - sem->_capacity; + sem->_capacity = new_capacity; + pthread_cond_broadcast(&sem->_cond); + pthread_mutex_unlock(&sem->_mutex); +} + +/* barrier */ + +void h2o_barrier_init(h2o_barrier_t *barrier, size_t count) +{ + pthread_mutex_init(&barrier->_mutex, NULL); + pthread_cond_init(&barrier->_cond, NULL); + barrier->_count = count; +} + +int h2o_barrier_wait(h2o_barrier_t *barrier) +{ + int ret; + pthread_mutex_lock(&barrier->_mutex); + barrier->_count--; + if (barrier->_count == 0) { + pthread_cond_broadcast(&barrier->_cond); + ret = 1; + } else { + while (barrier->_count) + pthread_cond_wait(&barrier->_cond, &barrier->_mutex); + ret = 0; + } + pthread_mutex_unlock(&barrier->_mutex); + return ret; +} + +int h2o_barrier_done(h2o_barrier_t *barrier) +{ + return __sync_add_and_fetch(&barrier->_count, 0) == 0; +} + +void h2o_barrier_destroy(h2o_barrier_t *barrier) +{ + pthread_mutex_destroy(&barrier->_mutex); + pthread_cond_destroy(&barrier->_cond); +} -- cgit v1.2.3