summaryrefslogtreecommitdiffstats
path: root/web/server/h2o/libh2o/lib/common/memcached.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--web/server/h2o/libh2o/lib/common/memcached.c429
1 files changed, 429 insertions, 0 deletions
diff --git a/web/server/h2o/libh2o/lib/common/memcached.c b/web/server/h2o/libh2o/lib/common/memcached.c
new file mode 100644
index 00000000..752ea2fc
--- /dev/null
+++ b/web/server/h2o/libh2o/lib/common/memcached.c
@@ -0,0 +1,429 @@
+/*
+ * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+#include <errno.h>
+#include <inttypes.h>
+#include <unistd.h>
+#include "yrmcds.h"
+#include "h2o/linklist.h"
+#include "h2o/memcached.h"
+#include "h2o/rand.h"
+#include "h2o/string_.h"
+
+struct st_h2o_memcached_context_t {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ h2o_linklist_t pending;
+ size_t num_threads_connected;
+ char *host;
+ uint16_t port;
+ int text_protocol;
+ h2o_iovec_t prefix;
+};
+
+struct st_h2o_memcached_conn_t {
+ h2o_memcached_context_t *ctx;
+ yrmcds yrmcds;
+ pthread_mutex_t mutex;
+ h2o_linklist_t inflight;
+ int writer_exit_requested;
+};
+
+enum en_h2o_memcached_req_type_t { REQ_TYPE_GET, REQ_TYPE_SET, REQ_TYPE_DELETE };
+
+struct st_h2o_memcached_req_t {
+ enum en_h2o_memcached_req_type_t type;
+ h2o_linklist_t pending;
+ h2o_linklist_t inflight;
+ union {
+ struct {
+ h2o_multithread_receiver_t *receiver;
+ h2o_multithread_message_t message;
+ h2o_memcached_get_cb cb;
+ void *cb_data;
+ int value_is_encoded;
+ h2o_iovec_t value;
+ uint32_t serial;
+ } get;
+ struct {
+ h2o_iovec_t value;
+ uint32_t expiration;
+ } set;
+ } data;
+ struct {
+ size_t len;
+ char base[1];
+ } key;
+};
+
+static h2o_memcached_req_t *create_req(h2o_memcached_context_t *ctx, enum en_h2o_memcached_req_type_t type, h2o_iovec_t key,
+ int encode_key)
+{
+ h2o_memcached_req_t *req = h2o_mem_alloc(offsetof(h2o_memcached_req_t, key.base) + ctx->prefix.len +
+ (encode_key ? (key.len + 2) / 3 * 4 + 1 : key.len));
+ req->type = type;
+ req->pending = (h2o_linklist_t){NULL};
+ req->inflight = (h2o_linklist_t){NULL};
+ memset(&req->data, 0, sizeof(req->data));
+ if (ctx->prefix.len != 0)
+ memcpy(req->key.base, ctx->prefix.base, ctx->prefix.len);
+ req->key.len = ctx->prefix.len;
+ if (encode_key) {
+ req->key.len += h2o_base64_encode(req->key.base + req->key.len, key.base, key.len, 1);
+ } else {
+ memcpy(req->key.base + req->key.len, key.base, key.len);
+ req->key.len += key.len;
+ }
+ return req;
+}
+
+static void free_req(h2o_memcached_req_t *req)
+{
+ assert(!h2o_linklist_is_linked(&req->pending));
+ switch (req->type) {
+ case REQ_TYPE_GET:
+ assert(!h2o_linklist_is_linked(&req->data.get.message.link));
+ h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len);
+ free(req->data.get.value.base);
+ break;
+ case REQ_TYPE_SET:
+ h2o_mem_set_secure(req->data.set.value.base, 0, req->data.set.value.len);
+ free(req->data.set.value.base);
+ break;
+ case REQ_TYPE_DELETE:
+ break;
+ default:
+ assert(!"FIXME");
+ break;
+ }
+ free(req);
+}
+
+static void discard_req(h2o_memcached_req_t *req)
+{
+ switch (req->type) {
+ case REQ_TYPE_GET:
+ h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
+ break;
+ default:
+ free_req(req);
+ break;
+ }
+}
+
+static h2o_memcached_req_t *pop_inflight(struct st_h2o_memcached_conn_t *conn, uint32_t serial)
+{
+ h2o_memcached_req_t *req;
+
+ pthread_mutex_lock(&conn->mutex);
+
+ if (conn->yrmcds.text_mode) {
+ /* in text mode, responses are returned in order (and we may receive responses for commands other than GET) */
+ if (!h2o_linklist_is_empty(&conn->inflight)) {
+ req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn->inflight.next);
+ assert(req->type == REQ_TYPE_GET);
+ if (req->data.get.serial == serial)
+ goto Found;
+ }
+ } else {
+ /* in binary mode, responses are received out-of-order (and we would only recieve responses for GET) */
+ h2o_linklist_t *node;
+ for (node = conn->inflight.next; node != &conn->inflight; node = node->next) {
+ req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, node);
+ assert(req->type == REQ_TYPE_GET);
+ if (req->data.get.serial == serial)
+ goto Found;
+ }
+ }
+
+ /* not found */
+ pthread_mutex_unlock(&conn->mutex);
+ return NULL;
+
+Found:
+ h2o_linklist_unlink(&req->inflight);
+ pthread_mutex_unlock(&conn->mutex);
+ return req;
+}
+
+static void *writer_main(void *_conn)
+{
+ struct st_h2o_memcached_conn_t *conn = _conn;
+ yrmcds_error err;
+
+ pthread_mutex_lock(&conn->ctx->mutex);
+
+ while (!__sync_add_and_fetch(&conn->writer_exit_requested, 0)) {
+ while (!h2o_linklist_is_empty(&conn->ctx->pending)) {
+ h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn->ctx->pending.next);
+ h2o_linklist_unlink(&req->pending);
+ pthread_mutex_unlock(&conn->ctx->mutex);
+
+ switch (req->type) {
+ case REQ_TYPE_GET:
+ pthread_mutex_lock(&conn->mutex);
+ h2o_linklist_insert(&conn->inflight, &req->inflight);
+ pthread_mutex_unlock(&conn->mutex);
+ if ((err = yrmcds_get(&conn->yrmcds, req->key.base, req->key.len, 0, &req->data.get.serial)) != YRMCDS_OK)
+ goto Error;
+ break;
+ case REQ_TYPE_SET:
+ err = yrmcds_set(&conn->yrmcds, req->key.base, req->key.len, req->data.set.value.base, req->data.set.value.len, 0,
+ req->data.set.expiration, 0, !conn->yrmcds.text_mode, NULL);
+ discard_req(req);
+ if (err != YRMCDS_OK)
+ goto Error;
+ break;
+ case REQ_TYPE_DELETE:
+ err = yrmcds_remove(&conn->yrmcds, req->key.base, req->key.len, !conn->yrmcds.text_mode, NULL);
+ discard_req(req);
+ if (err != YRMCDS_OK)
+ goto Error;
+ break;
+ default:
+ fprintf(stderr, "[lib/common/memcached.c] unknown type:%d\n", (int)req->type);
+ err = YRMCDS_NOT_IMPLEMENTED;
+ goto Error;
+ }
+
+ pthread_mutex_lock(&conn->ctx->mutex);
+ }
+ pthread_cond_wait(&conn->ctx->cond, &conn->ctx->mutex);
+ }
+
+ pthread_mutex_unlock(&conn->ctx->mutex);
+ return NULL;
+
+Error:
+ fprintf(stderr, "[lib/common/memcached.c] failed to send request; %s\n", yrmcds_strerror(err));
+ /* doc says the call can be used to interrupt yrmcds_recv */
+ yrmcds_shutdown(&conn->yrmcds);
+
+ return NULL;
+}
+
+static void connect_to_server(h2o_memcached_context_t *ctx, yrmcds *yrmcds)
+{
+ size_t failcnt;
+ yrmcds_error err;
+
+ for (failcnt = 0; (err = yrmcds_connect(yrmcds, ctx->host, ctx->port)) != YRMCDS_OK; ++failcnt) {
+ if (failcnt == 0) {
+ fprintf(stderr, "[lib/common/memcached.c] failed to connect to memcached at %s:%" PRIu16 ", %s\n", ctx->host, ctx->port,
+ yrmcds_strerror(err));
+ }
+ ++failcnt;
+ usleep(2000000 + h2o_rand() % 3000000); /* sleep 2 to 5 seconds */
+ }
+ /* connected */
+ if (ctx->text_protocol)
+ yrmcds_text_mode(yrmcds);
+ fprintf(stderr, "[lib/common/memcached.c] connected to memcached at %s:%" PRIu16 "\n", ctx->host, ctx->port);
+}
+
+static void reader_main(h2o_memcached_context_t *ctx)
+{
+ struct st_h2o_memcached_conn_t conn = {ctx, {0}, PTHREAD_MUTEX_INITIALIZER, {&conn.inflight, &conn.inflight}, 0};
+ pthread_t writer_thread;
+ yrmcds_response resp;
+ yrmcds_error err;
+
+ /* connect to server and start the writer thread */
+ connect_to_server(conn.ctx, &conn.yrmcds);
+ if (pthread_create(&writer_thread, NULL, writer_main, &conn) != 0) {
+ perror("pthread_create");
+ abort();
+ }
+
+ pthread_mutex_lock(&conn.ctx->mutex);
+ ++conn.ctx->num_threads_connected;
+ pthread_mutex_unlock(&conn.ctx->mutex);
+
+ /* receive data until an error occurs */
+ while (1) {
+ if ((err = yrmcds_recv(&conn.yrmcds, &resp)) != YRMCDS_OK) {
+ fprintf(stderr, "[lib/common/memcached.c] yrmcds_recv:%s\n", yrmcds_strerror(err));
+ break;
+ }
+ h2o_memcached_req_t *req = pop_inflight(&conn, resp.serial);
+ if (req == NULL) {
+ if (conn.yrmcds.text_mode)
+ continue;
+ fprintf(stderr, "[lib/common/memcached.c] received unexpected serial\n");
+ break;
+ }
+ if (resp.status == YRMCDS_STATUS_OK) {
+ req->data.get.value = h2o_iovec_init(h2o_mem_alloc(resp.data_len), resp.data_len);
+ memcpy(req->data.get.value.base, resp.data, resp.data_len);
+ h2o_mem_set_secure((void *)resp.data, 0, resp.data_len);
+ }
+ h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
+ }
+
+ /* send error to all the reqs in-flight */
+ pthread_mutex_lock(&conn.mutex);
+ while (!h2o_linklist_is_empty(&conn.inflight)) {
+ h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn.inflight.next);
+ h2o_linklist_unlink(&req->inflight);
+ assert(req->type == REQ_TYPE_GET);
+ h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
+ }
+ pthread_mutex_unlock(&conn.mutex);
+
+ /* stop the writer thread */
+ __sync_add_and_fetch(&conn.writer_exit_requested, 1);
+ pthread_mutex_lock(&conn.ctx->mutex);
+ pthread_cond_broadcast(&conn.ctx->cond);
+ pthread_mutex_unlock(&conn.ctx->mutex);
+ pthread_join(writer_thread, NULL);
+
+ /* decrement num_threads_connected, and discard all the pending requests if no connections are alive */
+ pthread_mutex_lock(&conn.ctx->mutex);
+ if (--conn.ctx->num_threads_connected == 0) {
+ while (!h2o_linklist_is_empty(&conn.ctx->pending)) {
+ h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn.ctx->pending.next);
+ h2o_linklist_unlink(&req->pending);
+ discard_req(req);
+ }
+ }
+ pthread_mutex_unlock(&conn.ctx->mutex);
+
+ /* close the connection */
+ yrmcds_close(&conn.yrmcds);
+}
+
+static void *thread_main(void *_ctx)
+{
+ h2o_memcached_context_t *ctx = _ctx;
+
+ while (1)
+ reader_main(ctx);
+ return NULL;
+}
+
+static void dispatch(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req)
+{
+ pthread_mutex_lock(&ctx->mutex);
+
+ if (ctx->num_threads_connected != 0) {
+ h2o_linklist_insert(&ctx->pending, &req->pending);
+ pthread_cond_signal(&ctx->cond);
+ } else {
+ discard_req(req);
+ }
+
+ pthread_mutex_unlock(&ctx->mutex);
+}
+
+void h2o_memcached_receiver(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages)
+{
+ while (!h2o_linklist_is_empty(messages)) {
+ h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, data.get.message.link, messages->next);
+ h2o_linklist_unlink(&req->data.get.message.link);
+ assert(req->type == REQ_TYPE_GET);
+ if (req->data.get.cb != NULL) {
+ if (req->data.get.value_is_encoded && req->data.get.value.len != 0) {
+ h2o_iovec_t decoded = h2o_decode_base64url(NULL, req->data.get.value.base, req->data.get.value.len);
+ h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len);
+ free(req->data.get.value.base);
+ req->data.get.value = decoded;
+ }
+ req->data.get.cb(req->data.get.value, req->data.get.cb_data);
+ }
+ free_req(req);
+ }
+}
+
+h2o_memcached_req_t *h2o_memcached_get(h2o_memcached_context_t *ctx, h2o_multithread_receiver_t *receiver, h2o_iovec_t key,
+ h2o_memcached_get_cb cb, void *cb_data, int flags)
+{
+ h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_GET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
+ req->data.get.receiver = receiver;
+ req->data.get.cb = cb;
+ req->data.get.cb_data = cb_data;
+ req->data.get.value_is_encoded = (flags & H2O_MEMCACHED_ENCODE_VALUE) != 0;
+ dispatch(ctx, req);
+ return req;
+}
+
+void h2o_memcached_cancel_get(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req)
+{
+ int do_free = 0;
+
+ pthread_mutex_lock(&ctx->mutex);
+ req->data.get.cb = NULL;
+ if (h2o_linklist_is_linked(&req->pending)) {
+ h2o_linklist_unlink(&req->pending);
+ do_free = 1;
+ }
+ pthread_mutex_unlock(&ctx->mutex);
+
+ if (do_free)
+ free_req(req);
+}
+
+void h2o_memcached_set(h2o_memcached_context_t *ctx, h2o_iovec_t key, h2o_iovec_t value, uint32_t expiration, int flags)
+{
+ h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_SET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
+ if ((flags & H2O_MEMCACHED_ENCODE_VALUE) != 0) {
+ req->data.set.value.base = h2o_mem_alloc((value.len + 2) / 3 * 4 + 1);
+ req->data.set.value.len = h2o_base64_encode(req->data.set.value.base, value.base, value.len, 1);
+ } else {
+ req->data.set.value = h2o_iovec_init(h2o_mem_alloc(value.len), value.len);
+ memcpy(req->data.set.value.base, value.base, value.len);
+ }
+ req->data.set.expiration = expiration;
+ dispatch(ctx, req);
+}
+
+void h2o_memcached_delete(h2o_memcached_context_t *ctx, h2o_iovec_t key, int flags)
+{
+ h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_DELETE, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
+ dispatch(ctx, req);
+}
+
+h2o_memcached_context_t *h2o_memcached_create_context(const char *host, uint16_t port, int text_protocol, size_t num_threads,
+ const char *prefix)
+{
+ h2o_memcached_context_t *ctx = h2o_mem_alloc(sizeof(*ctx));
+
+ pthread_mutex_init(&ctx->mutex, NULL);
+ pthread_cond_init(&ctx->cond, NULL);
+ h2o_linklist_init_anchor(&ctx->pending);
+ ctx->num_threads_connected = 0;
+ ctx->host = h2o_strdup(NULL, host, SIZE_MAX).base;
+ ctx->port = port;
+ ctx->text_protocol = text_protocol;
+ ctx->prefix = h2o_strdup(NULL, prefix, SIZE_MAX);
+
+ { /* start the threads */
+ pthread_t tid;
+ pthread_attr_t attr;
+ size_t i;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, 1);
+ for (i = 0; i != num_threads; ++i)
+ h2o_multithread_create_thread(&tid, &attr, thread_main, ctx);
+ pthread_attr_destroy(&attr);
+ }
+
+ return ctx;
+}