diff options
Diffstat (limited to 'src/lib-dict/dict-memcached.c')
-rw-r--r-- | src/lib-dict/dict-memcached.c | 373 |
1 files changed, 373 insertions, 0 deletions
diff --git a/src/lib-dict/dict-memcached.c b/src/lib-dict/dict-memcached.c new file mode 100644 index 0000000..c5b7ce6 --- /dev/null +++ b/src/lib-dict/dict-memcached.c @@ -0,0 +1,373 @@ +/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING memcached */ + +#include "lib.h" +#include "array.h" +#include "str.h" +#include "istream.h" +#include "ostream.h" +#include "connection.h" +#include "dict-private.h" + +#define MEMCACHED_DEFAULT_PORT 11211 +#define MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30) + +/* we need only very limited memcached functionality, so just define the binary + protocol ourself instead requiring protocol_binary.h */ +#define MEMCACHED_REQUEST_HDR_MAGIC 0x80 +#define MEMCACHED_REPLY_HDR_MAGIC 0x81 + +#define MEMCACHED_REQUEST_HDR_LENGTH 24 +#define MEMCACHED_REPLY_HDR_LENGTH 24 + +#define MEMCACHED_CMD_GET 0x00 + +#define MEMCACHED_DATA_TYPE_RAW 0x00 + +enum memcached_response { + MEMCACHED_RESPONSE_OK = 0x0000, + MEMCACHED_RESPONSE_NOTFOUND = 0x0001, + MEMCACHED_RESPONSE_INTERNALERROR= 0x0084, + MEMCACHED_RESPONSE_BUSY = 0x0085, + MEMCACHED_RESPONSE_TEMPFAILURE = 0x0086, +}; + +struct memcached_connection { + struct connection conn; + struct memcached_dict *dict; + + buffer_t *cmd; + struct { + const unsigned char *value; + size_t value_len; + uint16_t status; /* enum memcached_response */ + bool reply_received; + } reply; +}; + +struct memcached_dict { + struct dict dict; + struct ip_addr ip; + char *key_prefix; + in_port_t port; + unsigned int timeout_msecs; + + struct memcached_connection conn; + + bool connected; +}; + +static struct connection_list *memcached_connections; + +static void memcached_conn_destroy(struct connection *_conn) +{ + struct memcached_connection *conn = (struct memcached_connection *)_conn; + + conn->dict->connected = FALSE; + connection_disconnect(_conn); + + if (conn->dict->dict.ioloop != NULL) + io_loop_stop(conn->dict->dict.ioloop); +} + +static int memcached_input_get(struct memcached_connection *conn) +{ + const unsigned char *data; + size_t size; + uint32_t body_len, value_pos; + uint16_t key_len, key_pos, status; + uint8_t extras_len, data_type; + + data = i_stream_get_data(conn->conn.input, &size); + if (size < MEMCACHED_REPLY_HDR_LENGTH) + return 0; + + if (data[0] != MEMCACHED_REPLY_HDR_MAGIC) { + e_error(conn->conn.event, "Invalid reply magic: %u != %u", + data[0], MEMCACHED_REPLY_HDR_MAGIC); + return -1; + } + memcpy(&body_len, data+8, 4); body_len = ntohl(body_len); + body_len += MEMCACHED_REPLY_HDR_LENGTH; + if (size < body_len) { + /* we haven't read the whole response yet */ + return 0; + } + + memcpy(&key_len, data+2, 2); key_len = ntohs(key_len); + extras_len = data[4]; + data_type = data[5]; + memcpy(&status, data+6, 2); status = ntohs(status); + if (data_type != MEMCACHED_DATA_TYPE_RAW) { + e_error(conn->conn.event, "Unsupported data type: %u != %u", + data[0], MEMCACHED_DATA_TYPE_RAW); + return -1; + } + + key_pos = MEMCACHED_REPLY_HDR_LENGTH + extras_len; + value_pos = key_pos + key_len; + if (value_pos > body_len) { + e_error(conn->conn.event, "Invalid key/extras lengths"); + return -1; + } + conn->reply.value = data + value_pos; + conn->reply.value_len = body_len - value_pos; + conn->reply.status = status; + + i_stream_skip(conn->conn.input, body_len); + conn->reply.reply_received = TRUE; + + if (conn->dict->dict.ioloop != NULL) + io_loop_stop(conn->dict->dict.ioloop); + return 1; +} + +static void memcached_conn_input(struct connection *_conn) +{ + struct memcached_connection *conn = (struct memcached_connection *)_conn; + + switch (i_stream_read(_conn->input)) { + case 0: + return; + case -1: + memcached_conn_destroy(_conn); + return; + default: + break; + } + + if (memcached_input_get(conn) < 0) + memcached_conn_destroy(_conn); +} + +static void memcached_conn_connected(struct connection *_conn, bool success) +{ + struct memcached_connection *conn = + (struct memcached_connection *)_conn; + + if (!success) { + e_error(conn->conn.event, "connect() failed: %m"); + } else { + conn->dict->connected = TRUE; + } + if (conn->dict->dict.ioloop != NULL) + io_loop_stop(conn->dict->dict.ioloop); +} + +static const struct connection_settings memcached_conn_set = { + .input_max_size = SIZE_MAX, + .output_max_size = SIZE_MAX, + .client = TRUE +}; + +static const struct connection_vfuncs memcached_conn_vfuncs = { + .destroy = memcached_conn_destroy, + .input = memcached_conn_input, + .client_connected = memcached_conn_connected +}; + +static int +memcached_dict_init(struct dict *driver, const char *uri, + const struct dict_settings *set, + struct dict **dict_r, const char **error_r) +{ + struct memcached_dict *dict; + const char *const *args; + int ret = 0; + + if (memcached_connections == NULL) { + memcached_connections = + connection_list_init(&memcached_conn_set, + &memcached_conn_vfuncs); + } + + dict = i_new(struct memcached_dict, 1); + if (net_addr2ip("127.0.0.1", &dict->ip) < 0) + i_unreached(); + dict->port = MEMCACHED_DEFAULT_PORT; + dict->timeout_msecs = MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS; + dict->key_prefix = i_strdup(""); + + args = t_strsplit(uri, ":"); + for (; *args != NULL; args++) { + if (str_begins(*args, "host=")) { + if (net_addr2ip(*args+5, &dict->ip) < 0) { + *error_r = t_strdup_printf("Invalid IP: %s", + *args+5); + ret = -1; + } + } else if (str_begins(*args, "port=")) { + if (net_str2port(*args+5, &dict->port) < 0) { + *error_r = t_strdup_printf("Invalid port: %s", + *args+5); + ret = -1; + } + } else if (str_begins(*args, "prefix=")) { + i_free(dict->key_prefix); + dict->key_prefix = i_strdup(*args + 7); + } else if (str_begins(*args, "timeout_msecs=")) { + if (str_to_uint(*args+14, &dict->timeout_msecs) < 0) { + *error_r = t_strdup_printf( + "Invalid timeout_msecs: %s", *args+14); + ret = -1; + } + } else { + *error_r = t_strdup_printf("Unknown parameter: %s", + *args); + ret = -1; + } + } + if (ret < 0) { + i_free(dict->key_prefix); + i_free(dict); + return -1; + } + + dict->conn.conn.event_parent = set->event_parent; + + connection_init_client_ip(memcached_connections, &dict->conn.conn, + NULL, &dict->ip, dict->port); + event_set_append_log_prefix(dict->conn.conn.event, "memcached: "); + dict->dict = *driver; + dict->conn.cmd = buffer_create_dynamic(default_pool, 256); + dict->conn.dict = dict; + *dict_r = &dict->dict; + return 0; +} + +static void memcached_dict_deinit(struct dict *_dict) +{ + struct memcached_dict *dict = (struct memcached_dict *)_dict; + + connection_deinit(&dict->conn.conn); + buffer_free(&dict->conn.cmd); + i_free(dict->key_prefix); + i_free(dict); + + if (memcached_connections->connections == NULL) + connection_list_deinit(&memcached_connections); +} + +static void memcached_dict_lookup_timeout(struct memcached_dict *dict) +{ + e_error(dict->dict.event, "Lookup timed out in %u.%03u secs", + dict->timeout_msecs/1000, dict->timeout_msecs%1000); + io_loop_stop(dict->dict.ioloop); +} + +static void memcached_add_header(buffer_t *buf, unsigned int key_len) +{ + uint32_t body_len = htonl(key_len); + + i_assert(key_len <= 0xffff); + + buffer_append_c(buf, MEMCACHED_REQUEST_HDR_MAGIC); + buffer_append_c(buf, MEMCACHED_CMD_GET); + buffer_append_c(buf, (key_len >> 8) & 0xff); + buffer_append_c(buf, key_len & 0xff); + buffer_append_c(buf, 0); /* extras length */ + buffer_append_c(buf, MEMCACHED_DATA_TYPE_RAW); + buffer_append_zero(buf, 2); /* vbucket id - we probably don't care? */ + buffer_append(buf, &body_len, sizeof(body_len)); + buffer_append_zero(buf, 4+8); /* opaque + cas */ + i_assert(buf->used == MEMCACHED_REQUEST_HDR_LENGTH); +} + +static int +memcached_dict_lookup(struct dict *_dict, const struct dict_op_settings *set ATTR_UNUSED, + pool_t pool, const char *key, const char **value_r, + const char **error_r) +{ + struct memcached_dict *dict = (struct memcached_dict *)_dict; + struct ioloop *prev_ioloop = current_ioloop; + struct timeout *to; + size_t key_len; + + if (str_begins(key, DICT_PATH_SHARED)) + key += strlen(DICT_PATH_SHARED); + else { + *error_r = t_strdup_printf("memcached: Only shared keys supported currently"); + return -1; + } + if (*dict->key_prefix != '\0') + key = t_strconcat(dict->key_prefix, key, NULL); + key_len = strlen(key); + if (key_len > 0xffff) { + *error_r = t_strdup_printf( + "memcached: Key is too long (%zu bytes): %s", key_len, key); + return -1; + } + + i_assert(dict->dict.ioloop == NULL); + + dict->dict.ioloop = io_loop_create(); + connection_switch_ioloop(&dict->conn.conn); + + if (dict->conn.conn.fd_in == -1 && + connection_client_connect(&dict->conn.conn) < 0) { + e_error(dict->conn.conn.event, "Couldn't connect"); + } else { + to = timeout_add(dict->timeout_msecs, + memcached_dict_lookup_timeout, dict); + if (!dict->connected) { + /* wait for connection */ + io_loop_run(dict->dict.ioloop); + } + + if (dict->connected) { + buffer_set_used_size(dict->conn.cmd, 0); + memcached_add_header(dict->conn.cmd, key_len); + buffer_append(dict->conn.cmd, key, key_len); + + o_stream_nsend(dict->conn.conn.output, + dict->conn.cmd->data, + dict->conn.cmd->used); + + i_zero(&dict->conn.reply); + io_loop_run(dict->dict.ioloop); + } + timeout_remove(&to); + } + + io_loop_set_current(prev_ioloop); + connection_switch_ioloop(&dict->conn.conn); + io_loop_set_current(dict->dict.ioloop); + io_loop_destroy(&dict->dict.ioloop); + + if (!dict->conn.reply.reply_received) { + /* we failed in some way. make sure we disconnect since the + connection state isn't known anymore */ + memcached_conn_destroy(&dict->conn.conn); + *error_r = "Communication failure"; + return -1; + } + switch (dict->conn.reply.status) { + case MEMCACHED_RESPONSE_OK: + *value_r = p_strndup(pool, dict->conn.reply.value, + dict->conn.reply.value_len); + return 1; + case MEMCACHED_RESPONSE_NOTFOUND: + return 0; + case MEMCACHED_RESPONSE_INTERNALERROR: + *error_r = "Lookup failed: Internal error"; + return -1; + case MEMCACHED_RESPONSE_BUSY: + *error_r = "Lookup failed: Busy"; + return -1; + case MEMCACHED_RESPONSE_TEMPFAILURE: + *error_r = "Lookup failed: Temporary failure"; + return -1; + } + + *error_r = t_strdup_printf("Lookup failed: Error code=%u", + dict->conn.reply.status); + return -1; +} + +struct dict dict_driver_memcached = { + .name = "memcached", + { + .init = memcached_dict_init, + .deinit = memcached_dict_deinit, + .lookup = memcached_dict_lookup, + } +}; |