From be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 04:57:58 +0200 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- web/server/h2o/libh2o/lib/common/memcached.c | 429 +++++++++++++++++++++++++++ 1 file changed, 429 insertions(+) create mode 100644 web/server/h2o/libh2o/lib/common/memcached.c (limited to 'web/server/h2o/libh2o/lib/common/memcached.c') diff --git a/web/server/h2o/libh2o/lib/common/memcached.c b/web/server/h2o/libh2o/lib/common/memcached.c new file mode 100644 index 00000000..752ea2fc --- /dev/null +++ b/web/server/h2o/libh2o/lib/common/memcached.c @@ -0,0 +1,429 @@ +/* + * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include +#include +#include +#include "yrmcds.h" +#include "h2o/linklist.h" +#include "h2o/memcached.h" +#include "h2o/rand.h" +#include "h2o/string_.h" + +struct st_h2o_memcached_context_t { + pthread_mutex_t mutex; + pthread_cond_t cond; + h2o_linklist_t pending; + size_t num_threads_connected; + char *host; + uint16_t port; + int text_protocol; + h2o_iovec_t prefix; +}; + +struct st_h2o_memcached_conn_t { + h2o_memcached_context_t *ctx; + yrmcds yrmcds; + pthread_mutex_t mutex; + h2o_linklist_t inflight; + int writer_exit_requested; +}; + +enum en_h2o_memcached_req_type_t { REQ_TYPE_GET, REQ_TYPE_SET, REQ_TYPE_DELETE }; + +struct st_h2o_memcached_req_t { + enum en_h2o_memcached_req_type_t type; + h2o_linklist_t pending; + h2o_linklist_t inflight; + union { + struct { + h2o_multithread_receiver_t *receiver; + h2o_multithread_message_t message; + h2o_memcached_get_cb cb; + void *cb_data; + int value_is_encoded; + h2o_iovec_t value; + uint32_t serial; + } get; + struct { + h2o_iovec_t value; + uint32_t expiration; + } set; + } data; + struct { + size_t len; + char base[1]; + } key; +}; + +static h2o_memcached_req_t *create_req(h2o_memcached_context_t *ctx, enum en_h2o_memcached_req_type_t type, h2o_iovec_t key, + int encode_key) +{ + h2o_memcached_req_t *req = h2o_mem_alloc(offsetof(h2o_memcached_req_t, key.base) + ctx->prefix.len + + (encode_key ? (key.len + 2) / 3 * 4 + 1 : key.len)); + req->type = type; + req->pending = (h2o_linklist_t){NULL}; + req->inflight = (h2o_linklist_t){NULL}; + memset(&req->data, 0, sizeof(req->data)); + if (ctx->prefix.len != 0) + memcpy(req->key.base, ctx->prefix.base, ctx->prefix.len); + req->key.len = ctx->prefix.len; + if (encode_key) { + req->key.len += h2o_base64_encode(req->key.base + req->key.len, key.base, key.len, 1); + } else { + memcpy(req->key.base + req->key.len, key.base, key.len); + req->key.len += key.len; + } + return req; +} + +static void free_req(h2o_memcached_req_t *req) +{ + assert(!h2o_linklist_is_linked(&req->pending)); + switch (req->type) { + case REQ_TYPE_GET: + assert(!h2o_linklist_is_linked(&req->data.get.message.link)); + h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len); + free(req->data.get.value.base); + break; + case REQ_TYPE_SET: + h2o_mem_set_secure(req->data.set.value.base, 0, req->data.set.value.len); + free(req->data.set.value.base); + break; + case REQ_TYPE_DELETE: + break; + default: + assert(!"FIXME"); + break; + } + free(req); +} + +static void discard_req(h2o_memcached_req_t *req) +{ + switch (req->type) { + case REQ_TYPE_GET: + h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); + break; + default: + free_req(req); + break; + } +} + +static h2o_memcached_req_t *pop_inflight(struct st_h2o_memcached_conn_t *conn, uint32_t serial) +{ + h2o_memcached_req_t *req; + + pthread_mutex_lock(&conn->mutex); + + if (conn->yrmcds.text_mode) { + /* in text mode, responses are returned in order (and we may receive responses for commands other than GET) */ + if (!h2o_linklist_is_empty(&conn->inflight)) { + req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn->inflight.next); + assert(req->type == REQ_TYPE_GET); + if (req->data.get.serial == serial) + goto Found; + } + } else { + /* in binary mode, responses are received out-of-order (and we would only recieve responses for GET) */ + h2o_linklist_t *node; + for (node = conn->inflight.next; node != &conn->inflight; node = node->next) { + req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, node); + assert(req->type == REQ_TYPE_GET); + if (req->data.get.serial == serial) + goto Found; + } + } + + /* not found */ + pthread_mutex_unlock(&conn->mutex); + return NULL; + +Found: + h2o_linklist_unlink(&req->inflight); + pthread_mutex_unlock(&conn->mutex); + return req; +} + +static void *writer_main(void *_conn) +{ + struct st_h2o_memcached_conn_t *conn = _conn; + yrmcds_error err; + + pthread_mutex_lock(&conn->ctx->mutex); + + while (!__sync_add_and_fetch(&conn->writer_exit_requested, 0)) { + while (!h2o_linklist_is_empty(&conn->ctx->pending)) { + h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn->ctx->pending.next); + h2o_linklist_unlink(&req->pending); + pthread_mutex_unlock(&conn->ctx->mutex); + + switch (req->type) { + case REQ_TYPE_GET: + pthread_mutex_lock(&conn->mutex); + h2o_linklist_insert(&conn->inflight, &req->inflight); + pthread_mutex_unlock(&conn->mutex); + if ((err = yrmcds_get(&conn->yrmcds, req->key.base, req->key.len, 0, &req->data.get.serial)) != YRMCDS_OK) + goto Error; + break; + case REQ_TYPE_SET: + err = yrmcds_set(&conn->yrmcds, req->key.base, req->key.len, req->data.set.value.base, req->data.set.value.len, 0, + req->data.set.expiration, 0, !conn->yrmcds.text_mode, NULL); + discard_req(req); + if (err != YRMCDS_OK) + goto Error; + break; + case REQ_TYPE_DELETE: + err = yrmcds_remove(&conn->yrmcds, req->key.base, req->key.len, !conn->yrmcds.text_mode, NULL); + discard_req(req); + if (err != YRMCDS_OK) + goto Error; + break; + default: + fprintf(stderr, "[lib/common/memcached.c] unknown type:%d\n", (int)req->type); + err = YRMCDS_NOT_IMPLEMENTED; + goto Error; + } + + pthread_mutex_lock(&conn->ctx->mutex); + } + pthread_cond_wait(&conn->ctx->cond, &conn->ctx->mutex); + } + + pthread_mutex_unlock(&conn->ctx->mutex); + return NULL; + +Error: + fprintf(stderr, "[lib/common/memcached.c] failed to send request; %s\n", yrmcds_strerror(err)); + /* doc says the call can be used to interrupt yrmcds_recv */ + yrmcds_shutdown(&conn->yrmcds); + + return NULL; +} + +static void connect_to_server(h2o_memcached_context_t *ctx, yrmcds *yrmcds) +{ + size_t failcnt; + yrmcds_error err; + + for (failcnt = 0; (err = yrmcds_connect(yrmcds, ctx->host, ctx->port)) != YRMCDS_OK; ++failcnt) { + if (failcnt == 0) { + fprintf(stderr, "[lib/common/memcached.c] failed to connect to memcached at %s:%" PRIu16 ", %s\n", ctx->host, ctx->port, + yrmcds_strerror(err)); + } + ++failcnt; + usleep(2000000 + h2o_rand() % 3000000); /* sleep 2 to 5 seconds */ + } + /* connected */ + if (ctx->text_protocol) + yrmcds_text_mode(yrmcds); + fprintf(stderr, "[lib/common/memcached.c] connected to memcached at %s:%" PRIu16 "\n", ctx->host, ctx->port); +} + +static void reader_main(h2o_memcached_context_t *ctx) +{ + struct st_h2o_memcached_conn_t conn = {ctx, {0}, PTHREAD_MUTEX_INITIALIZER, {&conn.inflight, &conn.inflight}, 0}; + pthread_t writer_thread; + yrmcds_response resp; + yrmcds_error err; + + /* connect to server and start the writer thread */ + connect_to_server(conn.ctx, &conn.yrmcds); + if (pthread_create(&writer_thread, NULL, writer_main, &conn) != 0) { + perror("pthread_create"); + abort(); + } + + pthread_mutex_lock(&conn.ctx->mutex); + ++conn.ctx->num_threads_connected; + pthread_mutex_unlock(&conn.ctx->mutex); + + /* receive data until an error occurs */ + while (1) { + if ((err = yrmcds_recv(&conn.yrmcds, &resp)) != YRMCDS_OK) { + fprintf(stderr, "[lib/common/memcached.c] yrmcds_recv:%s\n", yrmcds_strerror(err)); + break; + } + h2o_memcached_req_t *req = pop_inflight(&conn, resp.serial); + if (req == NULL) { + if (conn.yrmcds.text_mode) + continue; + fprintf(stderr, "[lib/common/memcached.c] received unexpected serial\n"); + break; + } + if (resp.status == YRMCDS_STATUS_OK) { + req->data.get.value = h2o_iovec_init(h2o_mem_alloc(resp.data_len), resp.data_len); + memcpy(req->data.get.value.base, resp.data, resp.data_len); + h2o_mem_set_secure((void *)resp.data, 0, resp.data_len); + } + h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); + } + + /* send error to all the reqs in-flight */ + pthread_mutex_lock(&conn.mutex); + while (!h2o_linklist_is_empty(&conn.inflight)) { + h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn.inflight.next); + h2o_linklist_unlink(&req->inflight); + assert(req->type == REQ_TYPE_GET); + h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); + } + pthread_mutex_unlock(&conn.mutex); + + /* stop the writer thread */ + __sync_add_and_fetch(&conn.writer_exit_requested, 1); + pthread_mutex_lock(&conn.ctx->mutex); + pthread_cond_broadcast(&conn.ctx->cond); + pthread_mutex_unlock(&conn.ctx->mutex); + pthread_join(writer_thread, NULL); + + /* decrement num_threads_connected, and discard all the pending requests if no connections are alive */ + pthread_mutex_lock(&conn.ctx->mutex); + if (--conn.ctx->num_threads_connected == 0) { + while (!h2o_linklist_is_empty(&conn.ctx->pending)) { + h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn.ctx->pending.next); + h2o_linklist_unlink(&req->pending); + discard_req(req); + } + } + pthread_mutex_unlock(&conn.ctx->mutex); + + /* close the connection */ + yrmcds_close(&conn.yrmcds); +} + +static void *thread_main(void *_ctx) +{ + h2o_memcached_context_t *ctx = _ctx; + + while (1) + reader_main(ctx); + return NULL; +} + +static void dispatch(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req) +{ + pthread_mutex_lock(&ctx->mutex); + + if (ctx->num_threads_connected != 0) { + h2o_linklist_insert(&ctx->pending, &req->pending); + pthread_cond_signal(&ctx->cond); + } else { + discard_req(req); + } + + pthread_mutex_unlock(&ctx->mutex); +} + +void h2o_memcached_receiver(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages) +{ + while (!h2o_linklist_is_empty(messages)) { + h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, data.get.message.link, messages->next); + h2o_linklist_unlink(&req->data.get.message.link); + assert(req->type == REQ_TYPE_GET); + if (req->data.get.cb != NULL) { + if (req->data.get.value_is_encoded && req->data.get.value.len != 0) { + h2o_iovec_t decoded = h2o_decode_base64url(NULL, req->data.get.value.base, req->data.get.value.len); + h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len); + free(req->data.get.value.base); + req->data.get.value = decoded; + } + req->data.get.cb(req->data.get.value, req->data.get.cb_data); + } + free_req(req); + } +} + +h2o_memcached_req_t *h2o_memcached_get(h2o_memcached_context_t *ctx, h2o_multithread_receiver_t *receiver, h2o_iovec_t key, + h2o_memcached_get_cb cb, void *cb_data, int flags) +{ + h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_GET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); + req->data.get.receiver = receiver; + req->data.get.cb = cb; + req->data.get.cb_data = cb_data; + req->data.get.value_is_encoded = (flags & H2O_MEMCACHED_ENCODE_VALUE) != 0; + dispatch(ctx, req); + return req; +} + +void h2o_memcached_cancel_get(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req) +{ + int do_free = 0; + + pthread_mutex_lock(&ctx->mutex); + req->data.get.cb = NULL; + if (h2o_linklist_is_linked(&req->pending)) { + h2o_linklist_unlink(&req->pending); + do_free = 1; + } + pthread_mutex_unlock(&ctx->mutex); + + if (do_free) + free_req(req); +} + +void h2o_memcached_set(h2o_memcached_context_t *ctx, h2o_iovec_t key, h2o_iovec_t value, uint32_t expiration, int flags) +{ + h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_SET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); + if ((flags & H2O_MEMCACHED_ENCODE_VALUE) != 0) { + req->data.set.value.base = h2o_mem_alloc((value.len + 2) / 3 * 4 + 1); + req->data.set.value.len = h2o_base64_encode(req->data.set.value.base, value.base, value.len, 1); + } else { + req->data.set.value = h2o_iovec_init(h2o_mem_alloc(value.len), value.len); + memcpy(req->data.set.value.base, value.base, value.len); + } + req->data.set.expiration = expiration; + dispatch(ctx, req); +} + +void h2o_memcached_delete(h2o_memcached_context_t *ctx, h2o_iovec_t key, int flags) +{ + h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_DELETE, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); + dispatch(ctx, req); +} + +h2o_memcached_context_t *h2o_memcached_create_context(const char *host, uint16_t port, int text_protocol, size_t num_threads, + const char *prefix) +{ + h2o_memcached_context_t *ctx = h2o_mem_alloc(sizeof(*ctx)); + + pthread_mutex_init(&ctx->mutex, NULL); + pthread_cond_init(&ctx->cond, NULL); + h2o_linklist_init_anchor(&ctx->pending); + ctx->num_threads_connected = 0; + ctx->host = h2o_strdup(NULL, host, SIZE_MAX).base; + ctx->port = port; + ctx->text_protocol = text_protocol; + ctx->prefix = h2o_strdup(NULL, prefix, SIZE_MAX); + + { /* start the threads */ + pthread_t tid; + pthread_attr_t attr; + size_t i; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, 1); + for (i = 0; i != num_threads; ++i) + h2o_multithread_create_thread(&tid, &attr, thread_main, ctx); + pthread_attr_destroy(&attr); + } + + return ctx; +} -- cgit v1.2.3