/* Copyright (c) 2008-2018 Dovecot authors, see the included COPYING redis */ #include "lib.h" #include "array.h" #include "str.h" #include "istream.h" #include "ostream.h" #include "connection.h" #include "dict-private.h" #define REDIS_DEFAULT_PORT 6379 #define REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30) #define DICT_USERNAME_SEPARATOR '/' enum redis_input_state { /* expecting +OK reply for AUTH */ REDIS_INPUT_STATE_AUTH, /* expecting +OK reply for SELECT */ REDIS_INPUT_STATE_SELECT, /* expecting $-1 / $ followed by GET reply */ REDIS_INPUT_STATE_GET, /* expecting +QUEUED */ REDIS_INPUT_STATE_MULTI, /* expecting +OK reply for DISCARD */ REDIS_INPUT_STATE_DISCARD, /* expecting * */ REDIS_INPUT_STATE_EXEC, /* expecting EXEC reply */ REDIS_INPUT_STATE_EXEC_REPLY }; struct redis_connection { struct connection conn; struct redis_dict *dict; string_t *last_reply; unsigned int bytes_left; bool value_not_found; bool value_received; }; struct redis_dict_reply { unsigned int reply_count; dict_transaction_commit_callback_t *callback; void *context; }; struct redis_dict { struct dict dict; char *password, *key_prefix, *expire_value; unsigned int timeout_msecs, db_id; struct redis_connection conn; ARRAY(enum redis_input_state) input_states; ARRAY(struct redis_dict_reply) replies; bool connected; bool transaction_open; bool db_id_set; }; struct redis_dict_transaction_context { struct dict_transaction_context ctx; unsigned int cmd_count; char *error; }; static struct connection_list *redis_connections; static void redis_input_state_add(struct redis_dict *dict, enum redis_input_state state) { array_push_back(&dict->input_states, &state); } static void redis_input_state_remove(struct redis_dict *dict) { array_pop_front(&dict->input_states); } static void redis_reply_callback(struct redis_connection *conn, const struct redis_dict_reply *reply, const struct dict_commit_result *result) { i_assert(reply->callback != NULL); if (conn->dict->dict.prev_ioloop != NULL) io_loop_set_current(conn->dict->dict.prev_ioloop); reply->callback(result, reply->context); if (conn->dict->dict.prev_ioloop != NULL) io_loop_set_current(conn->dict->dict.ioloop); } static void redis_disconnected(struct redis_connection *conn, const char *reason) { const struct dict_commit_result result = { DICT_COMMIT_RET_FAILED, reason }; const struct redis_dict_reply *reply; conn->dict->db_id_set = FALSE; conn->dict->connected = FALSE; connection_disconnect(&conn->conn); array_foreach(&conn->dict->replies, reply) redis_reply_callback(conn, reply, &result); array_clear(&conn->dict->replies); array_clear(&conn->dict->input_states); if (conn->dict->dict.ioloop != NULL) io_loop_stop(conn->dict->dict.ioloop); } static void redis_conn_destroy(struct connection *_conn) { struct redis_connection *conn = (struct redis_connection *)_conn; redis_disconnected(conn, connection_disconnect_reason(_conn)); } static void redis_dict_wait_timeout(struct redis_dict *dict) { const char *reason = t_strdup_printf( "redis: Commit timed out in %u.%03u secs", dict->timeout_msecs/1000, dict->timeout_msecs%1000); redis_disconnected(&dict->conn, reason); } static void redis_wait(struct redis_dict *dict) { struct timeout *to; i_assert(dict->dict.ioloop == NULL); dict->dict.prev_ioloop = current_ioloop; dict->dict.ioloop = io_loop_create(); to = timeout_add(dict->timeout_msecs, redis_dict_wait_timeout, dict); connection_switch_ioloop(&dict->conn.conn); do { io_loop_run(dict->dict.ioloop); } while (array_count(&dict->input_states) > 0); timeout_remove(&to); io_loop_set_current(dict->dict.prev_ioloop); connection_switch_ioloop(&dict->conn.conn); io_loop_set_current(dict->dict.ioloop); io_loop_destroy(&dict->dict.ioloop); dict->dict.prev_ioloop = NULL; } static int redis_input_get(struct redis_connection *conn, const char **error_r) { const unsigned char *data; size_t size; const char *line; if (conn->bytes_left == 0) { /* read the size first */ line = i_stream_next_line(conn->conn.input); if (line == NULL) return 0; if (strcmp(line, "$-1") == 0) { conn->value_received = TRUE; conn->value_not_found = TRUE; if (conn->dict->dict.ioloop != NULL) io_loop_stop(conn->dict->dict.ioloop); redis_input_state_remove(conn->dict); return 1; } if (line[0] != '$' || str_to_uint(line+1, &conn->bytes_left) < 0) { *error_r = t_strdup_printf( "redis: Unexpected input (wanted $size): %s", line); return -1; } conn->bytes_left += 2; /* include trailing CRLF */ } data = i_stream_get_data(conn->conn.input, &size); if (size > conn->bytes_left) size = conn->bytes_left; str_append_data(conn->last_reply, data, size); conn->bytes_left -= size; i_stream_skip(conn->conn.input, size); if (conn->bytes_left > 0) return 0; /* reply fully read - drop trailing CRLF */ conn->value_received = TRUE; str_truncate(conn->last_reply, str_len(conn->last_reply)-2); if (conn->dict->dict.ioloop != NULL) io_loop_stop(conn->dict->dict.ioloop); redis_input_state_remove(conn->dict); return 1; } static int redis_conn_input_more(struct redis_connection *conn, const char **error_r) { struct redis_dict *dict = conn->dict; struct redis_dict_reply *reply; const enum redis_input_state *states; enum redis_input_state state; unsigned int count, num_replies; const char *line; states = array_get(&dict->input_states, &count); if (count == 0) { line = i_stream_next_line(conn->conn.input); if (line == NULL) return 0; *error_r = t_strdup_printf( "redis: Unexpected input (expected nothing): %s", line); return -1; } state = states[0]; if (state == REDIS_INPUT_STATE_GET) return redis_input_get(conn, error_r); line = i_stream_next_line(conn->conn.input); if (line == NULL) return 0; redis_input_state_remove(dict); switch (state) { case REDIS_INPUT_STATE_GET: i_unreached(); case REDIS_INPUT_STATE_AUTH: case REDIS_INPUT_STATE_SELECT: case REDIS_INPUT_STATE_MULTI: case REDIS_INPUT_STATE_DISCARD: if (line[0] != '+') break; return 1; case REDIS_INPUT_STATE_EXEC: if (line[0] != '*' || str_to_uint(line+1, &num_replies) < 0) break; reply = array_front_modifiable(&dict->replies); i_assert(reply->reply_count > 0); if (reply->reply_count != num_replies) { *error_r = t_strdup_printf( "redis: EXEC expected %u replies, not %u", reply->reply_count, num_replies); return -1; } return 1; case REDIS_INPUT_STATE_EXEC_REPLY: if (*line != '+' && *line != ':') break; /* success, just ignore the actual reply */ reply = array_front_modifiable(&dict->replies); i_assert(reply->reply_count > 0); if (--reply->reply_count == 0) { const struct dict_commit_result result = { DICT_COMMIT_RET_OK, NULL }; redis_reply_callback(conn, reply, &result); array_pop_front(&dict->replies); /* if we're running in a dict-ioloop, we're handling a synchronous commit and need to stop now */ if (array_count(&dict->replies) == 0 && conn->dict->dict.ioloop != NULL) io_loop_stop(conn->dict->dict.ioloop); } return 1; } str_truncate(dict->conn.last_reply, 0); str_append(dict->conn.last_reply, line); *error_r = t_strdup_printf("redis: Unexpected input (state=%d): %s", state, line); return -1; } static void redis_conn_input(struct connection *_conn) { struct redis_connection *conn = (struct redis_connection *)_conn; const char *error = NULL; int ret; switch (i_stream_read(_conn->input)) { case 0: return; case -1: redis_disconnected(conn, i_stream_get_error(_conn->input)); return; default: break; } while ((ret = redis_conn_input_more(conn, &error)) > 0) ; if (ret < 0) { i_assert(error != NULL); redis_disconnected(conn, error); } } static void redis_conn_connected(struct connection *_conn, bool success) { struct redis_connection *conn = (struct redis_connection *)_conn; if (!success) { e_error(conn->conn.event, "connect() failed: %m"); } else { conn->dict->connected = TRUE; } if (conn->dict->dict.ioloop != NULL) io_loop_stop(conn->dict->dict.ioloop); } static const struct connection_settings redis_conn_set = { .input_max_size = SIZE_MAX, .output_max_size = SIZE_MAX, .client = TRUE }; static const struct connection_vfuncs redis_conn_vfuncs = { .destroy = redis_conn_destroy, .input = redis_conn_input, .client_connected = redis_conn_connected }; static const char *redis_escape_username(const char *username) { const char *p; string_t *str = t_str_new(64); for (p = username; *p != '\0'; p++) { switch (*p) { case DICT_USERNAME_SEPARATOR: str_append(str, "\\-"); break; case '\\': str_append(str, "\\\\"); break; default: str_append_c(str, *p); } } return str_c(str); } static int redis_dict_init(struct dict *driver, const char *uri, const struct dict_settings *set, struct dict **dict_r, const char **error_r) { struct redis_dict *dict; struct ip_addr ip; unsigned int secs; in_port_t port = REDIS_DEFAULT_PORT; const char *const *args, *unix_path = NULL; int ret = 0; if (redis_connections == NULL) { redis_connections = connection_list_init(&redis_conn_set, &redis_conn_vfuncs); } dict = i_new(struct redis_dict, 1); if (net_addr2ip("127.0.0.1", &ip) < 0) i_unreached(); dict->timeout_msecs = REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS; dict->key_prefix = i_strdup(""); dict->password = i_strdup(""); args = t_strsplit(uri, ":"); for (; *args != NULL; args++) { if (str_begins(*args, "path=")) { unix_path = *args + 5; } else if (str_begins(*args, "host=")) { if (net_addr2ip(*args+5, &ip) < 0) { *error_r = t_strdup_printf("Invalid IP: %s", *args+5); ret = -1; } } else if (str_begins(*args, "port=")) { if (net_str2port(*args+5, &port) < 0) { *error_r = t_strdup_printf("Invalid port: %s", *args+5); ret = -1; } } else if (str_begins(*args, "prefix=")) { i_free(dict->key_prefix); dict->key_prefix = i_strdup(*args + 7); } else if (str_begins(*args, "db=")) { if (str_to_uint(*args+3, &dict->db_id) < 0) { *error_r = t_strdup_printf( "Invalid db number: %s", *args+3); ret = -1; } } else if (str_begins(*args, "expire_secs=")) { const char *value = *args + 12; if (str_to_uint(value, &secs) < 0 || secs == 0) { *error_r = t_strdup_printf( "Invalid expire_secs: %s", value); ret = -1; } i_free(dict->expire_value); dict->expire_value = i_strdup(value); } else if (str_begins(*args, "timeout_msecs=")) { if (str_to_uint(*args+14, &dict->timeout_msecs) < 0) { *error_r = t_strdup_printf( "Invalid timeout_msecs: %s", *args+14); ret = -1; } } else if (str_begins(*args, "password=")) { i_free(dict->password); dict->password = i_strdup(*args + 9); } else { *error_r = t_strdup_printf("Unknown parameter: %s", *args); ret = -1; } } if (ret < 0) { i_free(dict->password); i_free(dict->key_prefix); i_free(dict); return -1; } dict->conn.conn.event_parent = set->event_parent; if (unix_path != NULL) { connection_init_client_unix(redis_connections, &dict->conn.conn, unix_path); } else { connection_init_client_ip(redis_connections, &dict->conn.conn, NULL, &ip, port); } event_set_append_log_prefix(dict->conn.conn.event, "redis: "); dict->dict = *driver; dict->conn.last_reply = str_new(default_pool, 256); dict->conn.dict = dict; i_array_init(&dict->input_states, 4); i_array_init(&dict->replies, 4); *dict_r = &dict->dict; return 0; } static void redis_dict_deinit(struct dict *_dict) { struct redis_dict *dict = (struct redis_dict *)_dict; if (array_count(&dict->input_states) > 0) { i_assert(dict->connected); redis_wait(dict); } connection_deinit(&dict->conn.conn); str_free(&dict->conn.last_reply); array_free(&dict->replies); array_free(&dict->input_states); i_free(dict->expire_value); i_free(dict->key_prefix); i_free(dict->password); i_free(dict); if (redis_connections->connections == NULL) connection_list_deinit(&redis_connections); } static void redis_dict_wait(struct dict *_dict) { struct redis_dict *dict = (struct redis_dict *)_dict; if (array_count(&dict->input_states) > 0) redis_wait(dict); } static void redis_dict_lookup_timeout(struct redis_dict *dict) { const char *reason = t_strdup_printf( "redis: Lookup timed out in %u.%03u secs", dict->timeout_msecs/1000, dict->timeout_msecs%1000); redis_disconnected(&dict->conn, reason); } static const char * redis_dict_get_full_key(struct redis_dict *dict, const char *username, const char *key) { const char *username_sp = strchr(username, DICT_USERNAME_SEPARATOR); if (str_begins(key, DICT_PATH_SHARED)) key += strlen(DICT_PATH_SHARED); else if (str_begins(key, DICT_PATH_PRIVATE)) { key = t_strdup_printf("%s%c%s", username_sp == NULL ? username : redis_escape_username(username), DICT_USERNAME_SEPARATOR, key + strlen(DICT_PATH_PRIVATE)); } else { i_unreached(); } if (*dict->key_prefix != '\0') key = t_strconcat(dict->key_prefix, key, NULL); return key; } static void redis_dict_auth(struct redis_dict *dict) { const char *cmd; if (*dict->password == '\0') return; cmd = t_strdup_printf("*2\r\n$4\r\nAUTH\r\n$%d\r\n%s\r\n", (int)strlen(dict->password), dict->password); o_stream_nsend_str(dict->conn.conn.output, cmd); redis_input_state_add(dict, REDIS_INPUT_STATE_AUTH); } static void redis_dict_select_db(struct redis_dict *dict) { const char *cmd, *db_str; if (dict->db_id_set) return; dict->db_id_set = TRUE; if (dict->db_id == 0) { /* 0 is the default */ return; } db_str = dec2str(dict->db_id); cmd = t_strdup_printf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", (int)strlen(db_str), db_str); o_stream_nsend_str(dict->conn.conn.output, cmd); redis_input_state_add(dict, REDIS_INPUT_STATE_SELECT); } static int redis_dict_lookup(struct dict *_dict, const struct dict_op_settings *set, pool_t pool, const char *key, const char **value_r, const char **error_r) { struct redis_dict *dict = (struct redis_dict *)_dict; struct timeout *to; const char *cmd; key = redis_dict_get_full_key(dict, set->username, key); dict->conn.value_received = FALSE; dict->conn.value_not_found = FALSE; i_assert(dict->dict.ioloop == NULL); dict->dict.prev_ioloop = current_ioloop; dict->dict.ioloop = io_loop_create(); connection_switch_ioloop(&dict->conn.conn); if (dict->conn.conn.fd_in == -1 && connection_client_connect(&dict->conn.conn) < 0) { e_error(dict->conn.conn.event, "Couldn't connect"); } else { to = timeout_add(dict->timeout_msecs, redis_dict_lookup_timeout, dict); if (!dict->connected) { /* wait for connection */ io_loop_run(dict->dict.ioloop); if (dict->connected) redis_dict_auth(dict); } if (dict->connected) { redis_dict_select_db(dict); cmd = t_strdup_printf("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", (int)strlen(key), key); o_stream_nsend_str(dict->conn.conn.output, cmd); str_truncate(dict->conn.last_reply, 0); redis_input_state_add(dict, REDIS_INPUT_STATE_GET); do { io_loop_run(dict->dict.ioloop); } while (array_count(&dict->input_states) > 0); } timeout_remove(&to); } io_loop_set_current(dict->dict.prev_ioloop); connection_switch_ioloop(&dict->conn.conn); io_loop_set_current(dict->dict.ioloop); io_loop_destroy(&dict->dict.ioloop); dict->dict.prev_ioloop = NULL; if (!dict->conn.value_received) { /* we failed in some way. make sure we disconnect since the connection state isn't known anymore */ *error_r = t_strdup_printf("redis: Communication failure (last reply: %s)", str_c(dict->conn.last_reply)); redis_disconnected(&dict->conn, *error_r); return -1; } if (dict->conn.value_not_found) return 0; *value_r = p_strdup(pool, str_c(dict->conn.last_reply)); return 1; } static struct dict_transaction_context * redis_transaction_init(struct dict *_dict) { struct redis_dict *dict = (struct redis_dict *)_dict; struct redis_dict_transaction_context *ctx; i_assert(!dict->transaction_open); dict->transaction_open = TRUE; ctx = i_new(struct redis_dict_transaction_context, 1); ctx->ctx.dict = _dict; if (dict->conn.conn.fd_in == -1 && connection_client_connect(&dict->conn.conn) < 0) { e_error(dict->conn.conn.event, "Couldn't connect"); } else if (!dict->connected) { /* wait for connection */ redis_wait(dict); if (dict->connected) redis_dict_auth(dict); } if (dict->connected) redis_dict_select_db(dict); return &ctx->ctx; } static void redis_transaction_commit(struct dict_transaction_context *_ctx, bool async, dict_transaction_commit_callback_t *callback, void *context) { struct redis_dict_transaction_context *ctx = (struct redis_dict_transaction_context *)_ctx; struct redis_dict *dict = (struct redis_dict *)_ctx->dict; struct redis_dict_reply *reply; unsigned int i; struct dict_commit_result result = { .ret = DICT_COMMIT_RET_OK }; i_assert(dict->transaction_open); dict->transaction_open = FALSE; if (ctx->error != NULL) { /* make sure we're disconnected */ redis_disconnected(&dict->conn, ctx->error); result.ret = -1; result.error = ctx->error; callback(&result, context); } else if (_ctx->changed) { i_assert(ctx->cmd_count > 0); o_stream_nsend_str(dict->conn.conn.output, "*1\r\n$4\r\nEXEC\r\n"); reply = array_append_space(&dict->replies); reply->callback = callback; reply->context = context; reply->reply_count = ctx->cmd_count; redis_input_state_add(dict, REDIS_INPUT_STATE_EXEC); for (i = 0; i < ctx->cmd_count; i++) redis_input_state_add(dict, REDIS_INPUT_STATE_EXEC_REPLY); if (async) { i_free(ctx); return; } redis_wait(dict); } else { callback(&result, context); } i_free(ctx->error); i_free(ctx); } static void redis_transaction_rollback(struct dict_transaction_context *_ctx) { struct redis_dict_transaction_context *ctx = (struct redis_dict_transaction_context *)_ctx; struct redis_dict *dict = (struct redis_dict *)_ctx->dict; i_assert(dict->transaction_open); dict->transaction_open = FALSE; if (ctx->error != NULL) { /* make sure we're disconnected */ redis_disconnected(&dict->conn, ctx->error); } else if (_ctx->changed) { o_stream_nsend_str(dict->conn.conn.output, "*1\r\n$7\r\nDISCARD\r\n"); redis_input_state_add(dict, REDIS_INPUT_STATE_DISCARD); } i_free(ctx->error); i_free(ctx); } static int redis_check_transaction(struct redis_dict_transaction_context *ctx) { struct redis_dict *dict = (struct redis_dict *)ctx->ctx.dict; if (ctx->error != NULL) return -1; if (!dict->connected) { ctx->error = i_strdup("Disconnected during transaction"); return -1; } if (ctx->ctx.changed) return 0; redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); if (o_stream_send_str(dict->conn.conn.output, "*1\r\n$5\r\nMULTI\r\n") < 0) { ctx->error = i_strdup_printf("write() failed: %s", o_stream_get_error(dict->conn.conn.output)); return -1; } return 0; } static void redis_append_expire(struct redis_dict_transaction_context *ctx, string_t *cmd, const char *key) { struct redis_dict *dict = (struct redis_dict *)ctx->ctx.dict; if (dict->expire_value == NULL) return; str_printfa(cmd, "*3\r\n$6\r\nEXPIRE\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n", (unsigned int)strlen(key), key, (unsigned int)strlen(dict->expire_value), dict->expire_value); redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); ctx->cmd_count++; } static void redis_set(struct dict_transaction_context *_ctx, const char *key, const char *value) { struct redis_dict_transaction_context *ctx = (struct redis_dict_transaction_context *)_ctx; struct redis_dict *dict = (struct redis_dict *)_ctx->dict; const struct dict_op_settings_private *set = &_ctx->set; string_t *cmd; if (redis_check_transaction(ctx) < 0) return; key = redis_dict_get_full_key(dict, set->username, key); cmd = t_str_new(128); str_printfa(cmd, "*3\r\n$3\r\nSET\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n", (unsigned int)strlen(key), key, (unsigned int)strlen(value), value); redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); ctx->cmd_count++; redis_append_expire(ctx, cmd, key); if (o_stream_send(dict->conn.conn.output, str_data(cmd), str_len(cmd)) < 0) { ctx->error = i_strdup_printf("write() failed: %s", o_stream_get_error(dict->conn.conn.output)); } } static void redis_unset(struct dict_transaction_context *_ctx, const char *key) { struct redis_dict_transaction_context *ctx = (struct redis_dict_transaction_context *)_ctx; struct redis_dict *dict = (struct redis_dict *)_ctx->dict; const struct dict_op_settings_private *set = &_ctx->set; const char *cmd; if (redis_check_transaction(ctx) < 0) return; key = redis_dict_get_full_key(dict, set->username, key); cmd = t_strdup_printf("*2\r\n$3\r\nDEL\r\n$%u\r\n%s\r\n", (unsigned int)strlen(key), key); if (o_stream_send_str(dict->conn.conn.output, cmd) < 0) { ctx->error = i_strdup_printf("write() failed: %s", o_stream_get_error(dict->conn.conn.output)); } redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); ctx->cmd_count++; } static void redis_atomic_inc(struct dict_transaction_context *_ctx, const char *key, long long diff) { struct redis_dict_transaction_context *ctx = (struct redis_dict_transaction_context *)_ctx; struct redis_dict *dict = (struct redis_dict *)_ctx->dict; const struct dict_op_settings_private *set = &_ctx->set; const char *diffstr; string_t *cmd; if (redis_check_transaction(ctx) < 0) return; key = redis_dict_get_full_key(dict, set->username, key); diffstr = t_strdup_printf("%lld", diff); cmd = t_str_new(128); str_printfa(cmd, "*3\r\n$6\r\nINCRBY\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n", (unsigned int)strlen(key), key, (unsigned int)strlen(diffstr), diffstr); redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); ctx->cmd_count++; redis_append_expire(ctx, cmd, key); if (o_stream_send(dict->conn.conn.output, str_data(cmd), str_len(cmd)) < 0) { ctx->error = i_strdup_printf("write() failed: %s", o_stream_get_error(dict->conn.conn.output)); } } struct dict dict_driver_redis = { .name = "redis", { .init = redis_dict_init, .deinit = redis_dict_deinit, .wait = redis_dict_wait, .lookup = redis_dict_lookup, .transaction_init = redis_transaction_init, .transaction_commit = redis_transaction_commit, .transaction_rollback = redis_transaction_rollback, .set = redis_set, .unset = redis_unset, .atomic_inc = redis_atomic_inc, } };