summaryrefslogtreecommitdiffstats
path: root/src/lib-dict/dict-memcached.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib-dict/dict-memcached.c')
-rw-r--r--src/lib-dict/dict-memcached.c373
1 files changed, 373 insertions, 0 deletions
diff --git a/src/lib-dict/dict-memcached.c b/src/lib-dict/dict-memcached.c
new file mode 100644
index 0000000..c5b7ce6
--- /dev/null
+++ b/src/lib-dict/dict-memcached.c
@@ -0,0 +1,373 @@
+/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING memcached */
+
+#include "lib.h"
+#include "array.h"
+#include "str.h"
+#include "istream.h"
+#include "ostream.h"
+#include "connection.h"
+#include "dict-private.h"
+
+#define MEMCACHED_DEFAULT_PORT 11211
+#define MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30)
+
+/* we need only very limited memcached functionality, so just define the binary
+ protocol ourself instead requiring protocol_binary.h */
+#define MEMCACHED_REQUEST_HDR_MAGIC 0x80
+#define MEMCACHED_REPLY_HDR_MAGIC 0x81
+
+#define MEMCACHED_REQUEST_HDR_LENGTH 24
+#define MEMCACHED_REPLY_HDR_LENGTH 24
+
+#define MEMCACHED_CMD_GET 0x00
+
+#define MEMCACHED_DATA_TYPE_RAW 0x00
+
+enum memcached_response {
+ MEMCACHED_RESPONSE_OK = 0x0000,
+ MEMCACHED_RESPONSE_NOTFOUND = 0x0001,
+ MEMCACHED_RESPONSE_INTERNALERROR= 0x0084,
+ MEMCACHED_RESPONSE_BUSY = 0x0085,
+ MEMCACHED_RESPONSE_TEMPFAILURE = 0x0086,
+};
+
+struct memcached_connection {
+ struct connection conn;
+ struct memcached_dict *dict;
+
+ buffer_t *cmd;
+ struct {
+ const unsigned char *value;
+ size_t value_len;
+ uint16_t status; /* enum memcached_response */
+ bool reply_received;
+ } reply;
+};
+
+struct memcached_dict {
+ struct dict dict;
+ struct ip_addr ip;
+ char *key_prefix;
+ in_port_t port;
+ unsigned int timeout_msecs;
+
+ struct memcached_connection conn;
+
+ bool connected;
+};
+
+static struct connection_list *memcached_connections;
+
+static void memcached_conn_destroy(struct connection *_conn)
+{
+ struct memcached_connection *conn = (struct memcached_connection *)_conn;
+
+ conn->dict->connected = FALSE;
+ connection_disconnect(_conn);
+
+ if (conn->dict->dict.ioloop != NULL)
+ io_loop_stop(conn->dict->dict.ioloop);
+}
+
+static int memcached_input_get(struct memcached_connection *conn)
+{
+ const unsigned char *data;
+ size_t size;
+ uint32_t body_len, value_pos;
+ uint16_t key_len, key_pos, status;
+ uint8_t extras_len, data_type;
+
+ data = i_stream_get_data(conn->conn.input, &size);
+ if (size < MEMCACHED_REPLY_HDR_LENGTH)
+ return 0;
+
+ if (data[0] != MEMCACHED_REPLY_HDR_MAGIC) {
+ e_error(conn->conn.event, "Invalid reply magic: %u != %u",
+ data[0], MEMCACHED_REPLY_HDR_MAGIC);
+ return -1;
+ }
+ memcpy(&body_len, data+8, 4); body_len = ntohl(body_len);
+ body_len += MEMCACHED_REPLY_HDR_LENGTH;
+ if (size < body_len) {
+ /* we haven't read the whole response yet */
+ return 0;
+ }
+
+ memcpy(&key_len, data+2, 2); key_len = ntohs(key_len);
+ extras_len = data[4];
+ data_type = data[5];
+ memcpy(&status, data+6, 2); status = ntohs(status);
+ if (data_type != MEMCACHED_DATA_TYPE_RAW) {
+ e_error(conn->conn.event, "Unsupported data type: %u != %u",
+ data[0], MEMCACHED_DATA_TYPE_RAW);
+ return -1;
+ }
+
+ key_pos = MEMCACHED_REPLY_HDR_LENGTH + extras_len;
+ value_pos = key_pos + key_len;
+ if (value_pos > body_len) {
+ e_error(conn->conn.event, "Invalid key/extras lengths");
+ return -1;
+ }
+ conn->reply.value = data + value_pos;
+ conn->reply.value_len = body_len - value_pos;
+ conn->reply.status = status;
+
+ i_stream_skip(conn->conn.input, body_len);
+ conn->reply.reply_received = TRUE;
+
+ if (conn->dict->dict.ioloop != NULL)
+ io_loop_stop(conn->dict->dict.ioloop);
+ return 1;
+}
+
+static void memcached_conn_input(struct connection *_conn)
+{
+ struct memcached_connection *conn = (struct memcached_connection *)_conn;
+
+ switch (i_stream_read(_conn->input)) {
+ case 0:
+ return;
+ case -1:
+ memcached_conn_destroy(_conn);
+ return;
+ default:
+ break;
+ }
+
+ if (memcached_input_get(conn) < 0)
+ memcached_conn_destroy(_conn);
+}
+
+static void memcached_conn_connected(struct connection *_conn, bool success)
+{
+ struct memcached_connection *conn =
+ (struct memcached_connection *)_conn;
+
+ if (!success) {
+ e_error(conn->conn.event, "connect() failed: %m");
+ } else {
+ conn->dict->connected = TRUE;
+ }
+ if (conn->dict->dict.ioloop != NULL)
+ io_loop_stop(conn->dict->dict.ioloop);
+}
+
+static const struct connection_settings memcached_conn_set = {
+ .input_max_size = SIZE_MAX,
+ .output_max_size = SIZE_MAX,
+ .client = TRUE
+};
+
+static const struct connection_vfuncs memcached_conn_vfuncs = {
+ .destroy = memcached_conn_destroy,
+ .input = memcached_conn_input,
+ .client_connected = memcached_conn_connected
+};
+
+static int
+memcached_dict_init(struct dict *driver, const char *uri,
+ const struct dict_settings *set,
+ struct dict **dict_r, const char **error_r)
+{
+ struct memcached_dict *dict;
+ const char *const *args;
+ int ret = 0;
+
+ if (memcached_connections == NULL) {
+ memcached_connections =
+ connection_list_init(&memcached_conn_set,
+ &memcached_conn_vfuncs);
+ }
+
+ dict = i_new(struct memcached_dict, 1);
+ if (net_addr2ip("127.0.0.1", &dict->ip) < 0)
+ i_unreached();
+ dict->port = MEMCACHED_DEFAULT_PORT;
+ dict->timeout_msecs = MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS;
+ dict->key_prefix = i_strdup("");
+
+ args = t_strsplit(uri, ":");
+ for (; *args != NULL; args++) {
+ if (str_begins(*args, "host=")) {
+ if (net_addr2ip(*args+5, &dict->ip) < 0) {
+ *error_r = t_strdup_printf("Invalid IP: %s",
+ *args+5);
+ ret = -1;
+ }
+ } else if (str_begins(*args, "port=")) {
+ if (net_str2port(*args+5, &dict->port) < 0) {
+ *error_r = t_strdup_printf("Invalid port: %s",
+ *args+5);
+ ret = -1;
+ }
+ } else if (str_begins(*args, "prefix=")) {
+ i_free(dict->key_prefix);
+ dict->key_prefix = i_strdup(*args + 7);
+ } else if (str_begins(*args, "timeout_msecs=")) {
+ if (str_to_uint(*args+14, &dict->timeout_msecs) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid timeout_msecs: %s", *args+14);
+ ret = -1;
+ }
+ } else {
+ *error_r = t_strdup_printf("Unknown parameter: %s",
+ *args);
+ ret = -1;
+ }
+ }
+ if (ret < 0) {
+ i_free(dict->key_prefix);
+ i_free(dict);
+ return -1;
+ }
+
+ dict->conn.conn.event_parent = set->event_parent;
+
+ connection_init_client_ip(memcached_connections, &dict->conn.conn,
+ NULL, &dict->ip, dict->port);
+ event_set_append_log_prefix(dict->conn.conn.event, "memcached: ");
+ dict->dict = *driver;
+ dict->conn.cmd = buffer_create_dynamic(default_pool, 256);
+ dict->conn.dict = dict;
+ *dict_r = &dict->dict;
+ return 0;
+}
+
+static void memcached_dict_deinit(struct dict *_dict)
+{
+ struct memcached_dict *dict = (struct memcached_dict *)_dict;
+
+ connection_deinit(&dict->conn.conn);
+ buffer_free(&dict->conn.cmd);
+ i_free(dict->key_prefix);
+ i_free(dict);
+
+ if (memcached_connections->connections == NULL)
+ connection_list_deinit(&memcached_connections);
+}
+
+static void memcached_dict_lookup_timeout(struct memcached_dict *dict)
+{
+ e_error(dict->dict.event, "Lookup timed out in %u.%03u secs",
+ dict->timeout_msecs/1000, dict->timeout_msecs%1000);
+ io_loop_stop(dict->dict.ioloop);
+}
+
+static void memcached_add_header(buffer_t *buf, unsigned int key_len)
+{
+ uint32_t body_len = htonl(key_len);
+
+ i_assert(key_len <= 0xffff);
+
+ buffer_append_c(buf, MEMCACHED_REQUEST_HDR_MAGIC);
+ buffer_append_c(buf, MEMCACHED_CMD_GET);
+ buffer_append_c(buf, (key_len >> 8) & 0xff);
+ buffer_append_c(buf, key_len & 0xff);
+ buffer_append_c(buf, 0); /* extras length */
+ buffer_append_c(buf, MEMCACHED_DATA_TYPE_RAW);
+ buffer_append_zero(buf, 2); /* vbucket id - we probably don't care? */
+ buffer_append(buf, &body_len, sizeof(body_len));
+ buffer_append_zero(buf, 4+8); /* opaque + cas */
+ i_assert(buf->used == MEMCACHED_REQUEST_HDR_LENGTH);
+}
+
+static int
+memcached_dict_lookup(struct dict *_dict, const struct dict_op_settings *set ATTR_UNUSED,
+ pool_t pool, const char *key, const char **value_r,
+ const char **error_r)
+{
+ struct memcached_dict *dict = (struct memcached_dict *)_dict;
+ struct ioloop *prev_ioloop = current_ioloop;
+ struct timeout *to;
+ size_t key_len;
+
+ if (str_begins(key, DICT_PATH_SHARED))
+ key += strlen(DICT_PATH_SHARED);
+ else {
+ *error_r = t_strdup_printf("memcached: Only shared keys supported currently");
+ return -1;
+ }
+ if (*dict->key_prefix != '\0')
+ key = t_strconcat(dict->key_prefix, key, NULL);
+ key_len = strlen(key);
+ if (key_len > 0xffff) {
+ *error_r = t_strdup_printf(
+ "memcached: Key is too long (%zu bytes): %s", key_len, key);
+ return -1;
+ }
+
+ i_assert(dict->dict.ioloop == NULL);
+
+ dict->dict.ioloop = io_loop_create();
+ connection_switch_ioloop(&dict->conn.conn);
+
+ if (dict->conn.conn.fd_in == -1 &&
+ connection_client_connect(&dict->conn.conn) < 0) {
+ e_error(dict->conn.conn.event, "Couldn't connect");
+ } else {
+ to = timeout_add(dict->timeout_msecs,
+ memcached_dict_lookup_timeout, dict);
+ if (!dict->connected) {
+ /* wait for connection */
+ io_loop_run(dict->dict.ioloop);
+ }
+
+ if (dict->connected) {
+ buffer_set_used_size(dict->conn.cmd, 0);
+ memcached_add_header(dict->conn.cmd, key_len);
+ buffer_append(dict->conn.cmd, key, key_len);
+
+ o_stream_nsend(dict->conn.conn.output,
+ dict->conn.cmd->data,
+ dict->conn.cmd->used);
+
+ i_zero(&dict->conn.reply);
+ io_loop_run(dict->dict.ioloop);
+ }
+ timeout_remove(&to);
+ }
+
+ io_loop_set_current(prev_ioloop);
+ connection_switch_ioloop(&dict->conn.conn);
+ io_loop_set_current(dict->dict.ioloop);
+ io_loop_destroy(&dict->dict.ioloop);
+
+ if (!dict->conn.reply.reply_received) {
+ /* we failed in some way. make sure we disconnect since the
+ connection state isn't known anymore */
+ memcached_conn_destroy(&dict->conn.conn);
+ *error_r = "Communication failure";
+ return -1;
+ }
+ switch (dict->conn.reply.status) {
+ case MEMCACHED_RESPONSE_OK:
+ *value_r = p_strndup(pool, dict->conn.reply.value,
+ dict->conn.reply.value_len);
+ return 1;
+ case MEMCACHED_RESPONSE_NOTFOUND:
+ return 0;
+ case MEMCACHED_RESPONSE_INTERNALERROR:
+ *error_r = "Lookup failed: Internal error";
+ return -1;
+ case MEMCACHED_RESPONSE_BUSY:
+ *error_r = "Lookup failed: Busy";
+ return -1;
+ case MEMCACHED_RESPONSE_TEMPFAILURE:
+ *error_r = "Lookup failed: Temporary failure";
+ return -1;
+ }
+
+ *error_r = t_strdup_printf("Lookup failed: Error code=%u",
+ dict->conn.reply.status);
+ return -1;
+}
+
+struct dict dict_driver_memcached = {
+ .name = "memcached",
+ {
+ .init = memcached_dict_init,
+ .deinit = memcached_dict_deinit,
+ .lookup = memcached_dict_lookup,
+ }
+};