summaryrefslogtreecommitdiffstats
path: root/debian/vendor-h2o/lib/common/memcached.c
diff options
context:
space:
mode:
Diffstat (limited to 'debian/vendor-h2o/lib/common/memcached.c')
-rw-r--r--debian/vendor-h2o/lib/common/memcached.c429
1 files changed, 0 insertions, 429 deletions
diff --git a/debian/vendor-h2o/lib/common/memcached.c b/debian/vendor-h2o/lib/common/memcached.c
deleted file mode 100644
index 752ea2f..0000000
--- a/debian/vendor-h2o/lib/common/memcached.c
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to
- * deal in the Software without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
-#include <errno.h>
-#include <inttypes.h>
-#include <unistd.h>
-#include "yrmcds.h"
-#include "h2o/linklist.h"
-#include "h2o/memcached.h"
-#include "h2o/rand.h"
-#include "h2o/string_.h"
-
-struct st_h2o_memcached_context_t {
- pthread_mutex_t mutex;
- pthread_cond_t cond;
- h2o_linklist_t pending;
- size_t num_threads_connected;
- char *host;
- uint16_t port;
- int text_protocol;
- h2o_iovec_t prefix;
-};
-
-struct st_h2o_memcached_conn_t {
- h2o_memcached_context_t *ctx;
- yrmcds yrmcds;
- pthread_mutex_t mutex;
- h2o_linklist_t inflight;
- int writer_exit_requested;
-};
-
-enum en_h2o_memcached_req_type_t { REQ_TYPE_GET, REQ_TYPE_SET, REQ_TYPE_DELETE };
-
-struct st_h2o_memcached_req_t {
- enum en_h2o_memcached_req_type_t type;
- h2o_linklist_t pending;
- h2o_linklist_t inflight;
- union {
- struct {
- h2o_multithread_receiver_t *receiver;
- h2o_multithread_message_t message;
- h2o_memcached_get_cb cb;
- void *cb_data;
- int value_is_encoded;
- h2o_iovec_t value;
- uint32_t serial;
- } get;
- struct {
- h2o_iovec_t value;
- uint32_t expiration;
- } set;
- } data;
- struct {
- size_t len;
- char base[1];
- } key;
-};
-
-static h2o_memcached_req_t *create_req(h2o_memcached_context_t *ctx, enum en_h2o_memcached_req_type_t type, h2o_iovec_t key,
- int encode_key)
-{
- h2o_memcached_req_t *req = h2o_mem_alloc(offsetof(h2o_memcached_req_t, key.base) + ctx->prefix.len +
- (encode_key ? (key.len + 2) / 3 * 4 + 1 : key.len));
- req->type = type;
- req->pending = (h2o_linklist_t){NULL};
- req->inflight = (h2o_linklist_t){NULL};
- memset(&req->data, 0, sizeof(req->data));
- if (ctx->prefix.len != 0)
- memcpy(req->key.base, ctx->prefix.base, ctx->prefix.len);
- req->key.len = ctx->prefix.len;
- if (encode_key) {
- req->key.len += h2o_base64_encode(req->key.base + req->key.len, key.base, key.len, 1);
- } else {
- memcpy(req->key.base + req->key.len, key.base, key.len);
- req->key.len += key.len;
- }
- return req;
-}
-
-static void free_req(h2o_memcached_req_t *req)
-{
- assert(!h2o_linklist_is_linked(&req->pending));
- switch (req->type) {
- case REQ_TYPE_GET:
- assert(!h2o_linklist_is_linked(&req->data.get.message.link));
- h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len);
- free(req->data.get.value.base);
- break;
- case REQ_TYPE_SET:
- h2o_mem_set_secure(req->data.set.value.base, 0, req->data.set.value.len);
- free(req->data.set.value.base);
- break;
- case REQ_TYPE_DELETE:
- break;
- default:
- assert(!"FIXME");
- break;
- }
- free(req);
-}
-
-static void discard_req(h2o_memcached_req_t *req)
-{
- switch (req->type) {
- case REQ_TYPE_GET:
- h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
- break;
- default:
- free_req(req);
- break;
- }
-}
-
-static h2o_memcached_req_t *pop_inflight(struct st_h2o_memcached_conn_t *conn, uint32_t serial)
-{
- h2o_memcached_req_t *req;
-
- pthread_mutex_lock(&conn->mutex);
-
- if (conn->yrmcds.text_mode) {
- /* in text mode, responses are returned in order (and we may receive responses for commands other than GET) */
- if (!h2o_linklist_is_empty(&conn->inflight)) {
- req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn->inflight.next);
- assert(req->type == REQ_TYPE_GET);
- if (req->data.get.serial == serial)
- goto Found;
- }
- } else {
- /* in binary mode, responses are received out-of-order (and we would only recieve responses for GET) */
- h2o_linklist_t *node;
- for (node = conn->inflight.next; node != &conn->inflight; node = node->next) {
- req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, node);
- assert(req->type == REQ_TYPE_GET);
- if (req->data.get.serial == serial)
- goto Found;
- }
- }
-
- /* not found */
- pthread_mutex_unlock(&conn->mutex);
- return NULL;
-
-Found:
- h2o_linklist_unlink(&req->inflight);
- pthread_mutex_unlock(&conn->mutex);
- return req;
-}
-
-static void *writer_main(void *_conn)
-{
- struct st_h2o_memcached_conn_t *conn = _conn;
- yrmcds_error err;
-
- pthread_mutex_lock(&conn->ctx->mutex);
-
- while (!__sync_add_and_fetch(&conn->writer_exit_requested, 0)) {
- while (!h2o_linklist_is_empty(&conn->ctx->pending)) {
- h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn->ctx->pending.next);
- h2o_linklist_unlink(&req->pending);
- pthread_mutex_unlock(&conn->ctx->mutex);
-
- switch (req->type) {
- case REQ_TYPE_GET:
- pthread_mutex_lock(&conn->mutex);
- h2o_linklist_insert(&conn->inflight, &req->inflight);
- pthread_mutex_unlock(&conn->mutex);
- if ((err = yrmcds_get(&conn->yrmcds, req->key.base, req->key.len, 0, &req->data.get.serial)) != YRMCDS_OK)
- goto Error;
- break;
- case REQ_TYPE_SET:
- err = yrmcds_set(&conn->yrmcds, req->key.base, req->key.len, req->data.set.value.base, req->data.set.value.len, 0,
- req->data.set.expiration, 0, !conn->yrmcds.text_mode, NULL);
- discard_req(req);
- if (err != YRMCDS_OK)
- goto Error;
- break;
- case REQ_TYPE_DELETE:
- err = yrmcds_remove(&conn->yrmcds, req->key.base, req->key.len, !conn->yrmcds.text_mode, NULL);
- discard_req(req);
- if (err != YRMCDS_OK)
- goto Error;
- break;
- default:
- fprintf(stderr, "[lib/common/memcached.c] unknown type:%d\n", (int)req->type);
- err = YRMCDS_NOT_IMPLEMENTED;
- goto Error;
- }
-
- pthread_mutex_lock(&conn->ctx->mutex);
- }
- pthread_cond_wait(&conn->ctx->cond, &conn->ctx->mutex);
- }
-
- pthread_mutex_unlock(&conn->ctx->mutex);
- return NULL;
-
-Error:
- fprintf(stderr, "[lib/common/memcached.c] failed to send request; %s\n", yrmcds_strerror(err));
- /* doc says the call can be used to interrupt yrmcds_recv */
- yrmcds_shutdown(&conn->yrmcds);
-
- return NULL;
-}
-
-static void connect_to_server(h2o_memcached_context_t *ctx, yrmcds *yrmcds)
-{
- size_t failcnt;
- yrmcds_error err;
-
- for (failcnt = 0; (err = yrmcds_connect(yrmcds, ctx->host, ctx->port)) != YRMCDS_OK; ++failcnt) {
- if (failcnt == 0) {
- fprintf(stderr, "[lib/common/memcached.c] failed to connect to memcached at %s:%" PRIu16 ", %s\n", ctx->host, ctx->port,
- yrmcds_strerror(err));
- }
- ++failcnt;
- usleep(2000000 + h2o_rand() % 3000000); /* sleep 2 to 5 seconds */
- }
- /* connected */
- if (ctx->text_protocol)
- yrmcds_text_mode(yrmcds);
- fprintf(stderr, "[lib/common/memcached.c] connected to memcached at %s:%" PRIu16 "\n", ctx->host, ctx->port);
-}
-
-static void reader_main(h2o_memcached_context_t *ctx)
-{
- struct st_h2o_memcached_conn_t conn = {ctx, {0}, PTHREAD_MUTEX_INITIALIZER, {&conn.inflight, &conn.inflight}, 0};
- pthread_t writer_thread;
- yrmcds_response resp;
- yrmcds_error err;
-
- /* connect to server and start the writer thread */
- connect_to_server(conn.ctx, &conn.yrmcds);
- if (pthread_create(&writer_thread, NULL, writer_main, &conn) != 0) {
- perror("pthread_create");
- abort();
- }
-
- pthread_mutex_lock(&conn.ctx->mutex);
- ++conn.ctx->num_threads_connected;
- pthread_mutex_unlock(&conn.ctx->mutex);
-
- /* receive data until an error occurs */
- while (1) {
- if ((err = yrmcds_recv(&conn.yrmcds, &resp)) != YRMCDS_OK) {
- fprintf(stderr, "[lib/common/memcached.c] yrmcds_recv:%s\n", yrmcds_strerror(err));
- break;
- }
- h2o_memcached_req_t *req = pop_inflight(&conn, resp.serial);
- if (req == NULL) {
- if (conn.yrmcds.text_mode)
- continue;
- fprintf(stderr, "[lib/common/memcached.c] received unexpected serial\n");
- break;
- }
- if (resp.status == YRMCDS_STATUS_OK) {
- req->data.get.value = h2o_iovec_init(h2o_mem_alloc(resp.data_len), resp.data_len);
- memcpy(req->data.get.value.base, resp.data, resp.data_len);
- h2o_mem_set_secure((void *)resp.data, 0, resp.data_len);
- }
- h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
- }
-
- /* send error to all the reqs in-flight */
- pthread_mutex_lock(&conn.mutex);
- while (!h2o_linklist_is_empty(&conn.inflight)) {
- h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn.inflight.next);
- h2o_linklist_unlink(&req->inflight);
- assert(req->type == REQ_TYPE_GET);
- h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
- }
- pthread_mutex_unlock(&conn.mutex);
-
- /* stop the writer thread */
- __sync_add_and_fetch(&conn.writer_exit_requested, 1);
- pthread_mutex_lock(&conn.ctx->mutex);
- pthread_cond_broadcast(&conn.ctx->cond);
- pthread_mutex_unlock(&conn.ctx->mutex);
- pthread_join(writer_thread, NULL);
-
- /* decrement num_threads_connected, and discard all the pending requests if no connections are alive */
- pthread_mutex_lock(&conn.ctx->mutex);
- if (--conn.ctx->num_threads_connected == 0) {
- while (!h2o_linklist_is_empty(&conn.ctx->pending)) {
- h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn.ctx->pending.next);
- h2o_linklist_unlink(&req->pending);
- discard_req(req);
- }
- }
- pthread_mutex_unlock(&conn.ctx->mutex);
-
- /* close the connection */
- yrmcds_close(&conn.yrmcds);
-}
-
-static void *thread_main(void *_ctx)
-{
- h2o_memcached_context_t *ctx = _ctx;
-
- while (1)
- reader_main(ctx);
- return NULL;
-}
-
-static void dispatch(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req)
-{
- pthread_mutex_lock(&ctx->mutex);
-
- if (ctx->num_threads_connected != 0) {
- h2o_linklist_insert(&ctx->pending, &req->pending);
- pthread_cond_signal(&ctx->cond);
- } else {
- discard_req(req);
- }
-
- pthread_mutex_unlock(&ctx->mutex);
-}
-
-void h2o_memcached_receiver(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages)
-{
- while (!h2o_linklist_is_empty(messages)) {
- h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, data.get.message.link, messages->next);
- h2o_linklist_unlink(&req->data.get.message.link);
- assert(req->type == REQ_TYPE_GET);
- if (req->data.get.cb != NULL) {
- if (req->data.get.value_is_encoded && req->data.get.value.len != 0) {
- h2o_iovec_t decoded = h2o_decode_base64url(NULL, req->data.get.value.base, req->data.get.value.len);
- h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len);
- free(req->data.get.value.base);
- req->data.get.value = decoded;
- }
- req->data.get.cb(req->data.get.value, req->data.get.cb_data);
- }
- free_req(req);
- }
-}
-
-h2o_memcached_req_t *h2o_memcached_get(h2o_memcached_context_t *ctx, h2o_multithread_receiver_t *receiver, h2o_iovec_t key,
- h2o_memcached_get_cb cb, void *cb_data, int flags)
-{
- h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_GET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
- req->data.get.receiver = receiver;
- req->data.get.cb = cb;
- req->data.get.cb_data = cb_data;
- req->data.get.value_is_encoded = (flags & H2O_MEMCACHED_ENCODE_VALUE) != 0;
- dispatch(ctx, req);
- return req;
-}
-
-void h2o_memcached_cancel_get(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req)
-{
- int do_free = 0;
-
- pthread_mutex_lock(&ctx->mutex);
- req->data.get.cb = NULL;
- if (h2o_linklist_is_linked(&req->pending)) {
- h2o_linklist_unlink(&req->pending);
- do_free = 1;
- }
- pthread_mutex_unlock(&ctx->mutex);
-
- if (do_free)
- free_req(req);
-}
-
-void h2o_memcached_set(h2o_memcached_context_t *ctx, h2o_iovec_t key, h2o_iovec_t value, uint32_t expiration, int flags)
-{
- h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_SET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
- if ((flags & H2O_MEMCACHED_ENCODE_VALUE) != 0) {
- req->data.set.value.base = h2o_mem_alloc((value.len + 2) / 3 * 4 + 1);
- req->data.set.value.len = h2o_base64_encode(req->data.set.value.base, value.base, value.len, 1);
- } else {
- req->data.set.value = h2o_iovec_init(h2o_mem_alloc(value.len), value.len);
- memcpy(req->data.set.value.base, value.base, value.len);
- }
- req->data.set.expiration = expiration;
- dispatch(ctx, req);
-}
-
-void h2o_memcached_delete(h2o_memcached_context_t *ctx, h2o_iovec_t key, int flags)
-{
- h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_DELETE, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
- dispatch(ctx, req);
-}
-
-h2o_memcached_context_t *h2o_memcached_create_context(const char *host, uint16_t port, int text_protocol, size_t num_threads,
- const char *prefix)
-{
- h2o_memcached_context_t *ctx = h2o_mem_alloc(sizeof(*ctx));
-
- pthread_mutex_init(&ctx->mutex, NULL);
- pthread_cond_init(&ctx->cond, NULL);
- h2o_linklist_init_anchor(&ctx->pending);
- ctx->num_threads_connected = 0;
- ctx->host = h2o_strdup(NULL, host, SIZE_MAX).base;
- ctx->port = port;
- ctx->text_protocol = text_protocol;
- ctx->prefix = h2o_strdup(NULL, prefix, SIZE_MAX);
-
- { /* start the threads */
- pthread_t tid;
- pthread_attr_t attr;
- size_t i;
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, 1);
- for (i = 0; i != num_threads; ++i)
- h2o_multithread_create_thread(&tid, &attr, thread_main, ctx);
- pthread_attr_destroy(&attr);
- }
-
- return ctx;
-}