diff options
Diffstat (limited to '')
-rw-r--r-- | src/lib-dict/dict-redis.c | 831 |
1 files changed, 831 insertions, 0 deletions
diff --git a/src/lib-dict/dict-redis.c b/src/lib-dict/dict-redis.c new file mode 100644 index 0000000..01ec7b0 --- /dev/null +++ b/src/lib-dict/dict-redis.c @@ -0,0 +1,831 @@ +/* Copyright (c) 2008-2018 Dovecot authors, see the included COPYING redis */ + +#include "lib.h" +#include "array.h" +#include "str.h" +#include "istream.h" +#include "ostream.h" +#include "connection.h" +#include "dict-private.h" + +#define REDIS_DEFAULT_PORT 6379 +#define REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30) +#define DICT_USERNAME_SEPARATOR '/' + +enum redis_input_state { + /* expecting +OK reply for AUTH */ + REDIS_INPUT_STATE_AUTH, + /* expecting +OK reply for SELECT */ + REDIS_INPUT_STATE_SELECT, + /* expecting $-1 / $<size> followed by GET reply */ + REDIS_INPUT_STATE_GET, + /* expecting +QUEUED */ + REDIS_INPUT_STATE_MULTI, + /* expecting +OK reply for DISCARD */ + REDIS_INPUT_STATE_DISCARD, + /* expecting *<nreplies> */ + REDIS_INPUT_STATE_EXEC, + /* expecting EXEC reply */ + REDIS_INPUT_STATE_EXEC_REPLY +}; + +struct redis_connection { + struct connection conn; + struct redis_dict *dict; + + string_t *last_reply; + unsigned int bytes_left; + bool value_not_found; + bool value_received; +}; + +struct redis_dict_reply { + unsigned int reply_count; + dict_transaction_commit_callback_t *callback; + void *context; +}; + +struct redis_dict { + struct dict dict; + char *password, *key_prefix, *expire_value; + unsigned int timeout_msecs, db_id; + + struct redis_connection conn; + + ARRAY(enum redis_input_state) input_states; + ARRAY(struct redis_dict_reply) replies; + + bool connected; + bool transaction_open; + bool db_id_set; +}; + +struct redis_dict_transaction_context { + struct dict_transaction_context ctx; + unsigned int cmd_count; + char *error; +}; + +static struct connection_list *redis_connections; + +static void +redis_input_state_add(struct redis_dict *dict, enum redis_input_state state) +{ + array_push_back(&dict->input_states, &state); +} + +static void redis_input_state_remove(struct redis_dict *dict) +{ + array_pop_front(&dict->input_states); +} + +static void redis_reply_callback(struct redis_connection *conn, + const struct redis_dict_reply *reply, + const struct dict_commit_result *result) +{ + if (conn->dict->dict.prev_ioloop != NULL) + io_loop_set_current(conn->dict->dict.prev_ioloop); + reply->callback(result, reply->context); + if (conn->dict->dict.prev_ioloop != NULL) + io_loop_set_current(conn->dict->dict.ioloop); +} + +static void +redis_disconnected(struct redis_connection *conn, const char *reason) +{ + const struct dict_commit_result result = { + DICT_COMMIT_RET_FAILED, reason + }; + const struct redis_dict_reply *reply; + + conn->dict->db_id_set = FALSE; + conn->dict->connected = FALSE; + connection_disconnect(&conn->conn); + + array_foreach(&conn->dict->replies, reply) + redis_reply_callback(conn, reply, &result); + array_clear(&conn->dict->replies); + array_clear(&conn->dict->input_states); + + if (conn->dict->dict.ioloop != NULL) + io_loop_stop(conn->dict->dict.ioloop); +} + +static void redis_conn_destroy(struct connection *_conn) +{ + struct redis_connection *conn = (struct redis_connection *)_conn; + + redis_disconnected(conn, connection_disconnect_reason(_conn)); +} + +static void redis_dict_wait_timeout(struct redis_dict *dict) +{ + const char *reason = t_strdup_printf( + "redis: Commit timed out in %u.%03u secs", + dict->timeout_msecs/1000, dict->timeout_msecs%1000); + redis_disconnected(&dict->conn, reason); +} + +static void redis_wait(struct redis_dict *dict) +{ + struct timeout *to; + + i_assert(dict->dict.ioloop == NULL); + + dict->dict.prev_ioloop = current_ioloop; + dict->dict.ioloop = io_loop_create(); + to = timeout_add(dict->timeout_msecs, redis_dict_wait_timeout, dict); + connection_switch_ioloop(&dict->conn.conn); + + do { + io_loop_run(dict->dict.ioloop); + } while (array_count(&dict->input_states) > 0); + + timeout_remove(&to); + io_loop_set_current(dict->dict.prev_ioloop); + connection_switch_ioloop(&dict->conn.conn); + io_loop_set_current(dict->dict.ioloop); + io_loop_destroy(&dict->dict.ioloop); + dict->dict.prev_ioloop = NULL; +} + +static int redis_input_get(struct redis_connection *conn, const char **error_r) +{ + const unsigned char *data; + size_t size; + const char *line; + + if (conn->bytes_left == 0) { + /* read the size first */ + line = i_stream_next_line(conn->conn.input); + if (line == NULL) + return 0; + if (strcmp(line, "$-1") == 0) { + conn->value_received = TRUE; + conn->value_not_found = TRUE; + if (conn->dict->dict.ioloop != NULL) + io_loop_stop(conn->dict->dict.ioloop); + redis_input_state_remove(conn->dict); + return 1; + } + if (line[0] != '$' || str_to_uint(line+1, &conn->bytes_left) < 0) { + *error_r = t_strdup_printf( + "redis: Unexpected input (wanted $size): %s", line); + return -1; + } + conn->bytes_left += 2; /* include trailing CRLF */ + } + + data = i_stream_get_data(conn->conn.input, &size); + if (size > conn->bytes_left) + size = conn->bytes_left; + str_append_data(conn->last_reply, data, size); + + conn->bytes_left -= size; + i_stream_skip(conn->conn.input, size); + + if (conn->bytes_left > 0) + return 0; + + /* reply fully read - drop trailing CRLF */ + conn->value_received = TRUE; + str_truncate(conn->last_reply, str_len(conn->last_reply)-2); + + if (conn->dict->dict.ioloop != NULL) + io_loop_stop(conn->dict->dict.ioloop); + redis_input_state_remove(conn->dict); + return 1; +} + +static int +redis_conn_input_more(struct redis_connection *conn, const char **error_r) +{ + struct redis_dict *dict = conn->dict; + struct redis_dict_reply *reply; + const enum redis_input_state *states; + enum redis_input_state state; + unsigned int count, num_replies; + const char *line; + + states = array_get(&dict->input_states, &count); + if (count == 0) { + line = i_stream_next_line(conn->conn.input); + if (line == NULL) + return 0; + *error_r = t_strdup_printf( + "redis: Unexpected input (expected nothing): %s", line); + return -1; + } + state = states[0]; + if (state == REDIS_INPUT_STATE_GET) + return redis_input_get(conn, error_r); + + line = i_stream_next_line(conn->conn.input); + if (line == NULL) + return 0; + + redis_input_state_remove(dict); + switch (state) { + case REDIS_INPUT_STATE_GET: + i_unreached(); + case REDIS_INPUT_STATE_AUTH: + case REDIS_INPUT_STATE_SELECT: + case REDIS_INPUT_STATE_MULTI: + case REDIS_INPUT_STATE_DISCARD: + if (line[0] != '+') + break; + return 1; + case REDIS_INPUT_STATE_EXEC: + if (line[0] != '*' || str_to_uint(line+1, &num_replies) < 0) + break; + + reply = array_front_modifiable(&dict->replies); + i_assert(reply->reply_count > 0); + if (reply->reply_count != num_replies) { + *error_r = t_strdup_printf( + "redis: EXEC expected %u replies, not %u", + reply->reply_count, num_replies); + return -1; + } + return 1; + case REDIS_INPUT_STATE_EXEC_REPLY: + if (*line != '+' && *line != ':') + break; + /* success, just ignore the actual reply */ + reply = array_front_modifiable(&dict->replies); + i_assert(reply->reply_count > 0); + if (--reply->reply_count == 0) { + const struct dict_commit_result result = { + DICT_COMMIT_RET_OK, NULL + }; + redis_reply_callback(conn, reply, &result); + array_pop_front(&dict->replies); + /* if we're running in a dict-ioloop, we're handling a + synchronous commit and need to stop now */ + if (array_count(&dict->replies) == 0 && + conn->dict->dict.ioloop != NULL) + io_loop_stop(conn->dict->dict.ioloop); + } + return 1; + } + str_truncate(dict->conn.last_reply, 0); + str_append(dict->conn.last_reply, line); + *error_r = t_strdup_printf("redis: Unexpected input (state=%d): %s", state, line); + return -1; +} + +static void redis_conn_input(struct connection *_conn) +{ + struct redis_connection *conn = (struct redis_connection *)_conn; + const char *error = NULL; + int ret; + + switch (i_stream_read(_conn->input)) { + case 0: + return; + case -1: + redis_disconnected(conn, i_stream_get_error(_conn->input)); + return; + default: + break; + } + + while ((ret = redis_conn_input_more(conn, &error)) > 0) ; + if (ret < 0) { + i_assert(error != NULL); + redis_disconnected(conn, error); + } +} + +static void redis_conn_connected(struct connection *_conn, bool success) +{ + struct redis_connection *conn = (struct redis_connection *)_conn; + + if (!success) { + e_error(conn->conn.event, "connect() failed: %m"); + } else { + conn->dict->connected = TRUE; + } + if (conn->dict->dict.ioloop != NULL) + io_loop_stop(conn->dict->dict.ioloop); +} + +static const struct connection_settings redis_conn_set = { + .input_max_size = SIZE_MAX, + .output_max_size = SIZE_MAX, + .client = TRUE +}; + +static const struct connection_vfuncs redis_conn_vfuncs = { + .destroy = redis_conn_destroy, + .input = redis_conn_input, + .client_connected = redis_conn_connected +}; + +static const char *redis_escape_username(const char *username) +{ + const char *p; + string_t *str = t_str_new(64); + + for (p = username; *p != '\0'; p++) { + switch (*p) { + case DICT_USERNAME_SEPARATOR: + str_append(str, "\\-"); + break; + case '\\': + str_append(str, "\\\\"); + break; + default: + str_append_c(str, *p); + } + } + return str_c(str); +} + +static int +redis_dict_init(struct dict *driver, const char *uri, + const struct dict_settings *set, + struct dict **dict_r, const char **error_r) +{ + struct redis_dict *dict; + struct ip_addr ip; + unsigned int secs; + in_port_t port = REDIS_DEFAULT_PORT; + const char *const *args, *unix_path = NULL; + int ret = 0; + + if (redis_connections == NULL) { + redis_connections = + connection_list_init(&redis_conn_set, + &redis_conn_vfuncs); + } + + dict = i_new(struct redis_dict, 1); + if (net_addr2ip("127.0.0.1", &ip) < 0) + i_unreached(); + dict->timeout_msecs = REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS; + dict->key_prefix = i_strdup(""); + dict->password = i_strdup(""); + + args = t_strsplit(uri, ":"); + for (; *args != NULL; args++) { + if (str_begins(*args, "path=")) { + unix_path = *args + 5; + } else if (str_begins(*args, "host=")) { + if (net_addr2ip(*args+5, &ip) < 0) { + *error_r = t_strdup_printf("Invalid IP: %s", + *args+5); + ret = -1; + } + } else if (str_begins(*args, "port=")) { + if (net_str2port(*args+5, &port) < 0) { + *error_r = t_strdup_printf("Invalid port: %s", + *args+5); + ret = -1; + } + } else if (str_begins(*args, "prefix=")) { + i_free(dict->key_prefix); + dict->key_prefix = i_strdup(*args + 7); + } else if (str_begins(*args, "db=")) { + if (str_to_uint(*args+3, &dict->db_id) < 0) { + *error_r = t_strdup_printf( + "Invalid db number: %s", *args+3); + ret = -1; + } + } else if (str_begins(*args, "expire_secs=")) { + const char *value = *args + 12; + + if (str_to_uint(value, &secs) < 0 || secs == 0) { + *error_r = t_strdup_printf( + "Invalid expire_secs: %s", value); + ret = -1; + } + i_free(dict->expire_value); + dict->expire_value = i_strdup(value); + } else if (str_begins(*args, "timeout_msecs=")) { + if (str_to_uint(*args+14, &dict->timeout_msecs) < 0) { + *error_r = t_strdup_printf( + "Invalid timeout_msecs: %s", *args+14); + ret = -1; + } + } else if (str_begins(*args, "password=")) { + i_free(dict->password); + dict->password = i_strdup(*args + 9); + } else { + *error_r = t_strdup_printf("Unknown parameter: %s", + *args); + ret = -1; + } + } + if (ret < 0) { + i_free(dict->password); + i_free(dict->key_prefix); + i_free(dict); + return -1; + } + + dict->conn.conn.event_parent = set->event_parent; + + if (unix_path != NULL) { + connection_init_client_unix(redis_connections, &dict->conn.conn, + unix_path); + } else { + connection_init_client_ip(redis_connections, &dict->conn.conn, + NULL, &ip, port); + } + event_set_append_log_prefix(dict->conn.conn.event, "redis: "); + dict->dict = *driver; + dict->conn.last_reply = str_new(default_pool, 256); + dict->conn.dict = dict; + + i_array_init(&dict->input_states, 4); + i_array_init(&dict->replies, 4); + + *dict_r = &dict->dict; + return 0; +} + +static void redis_dict_deinit(struct dict *_dict) +{ + struct redis_dict *dict = (struct redis_dict *)_dict; + + if (array_count(&dict->input_states) > 0) { + i_assert(dict->connected); + redis_wait(dict); + } + connection_deinit(&dict->conn.conn); + str_free(&dict->conn.last_reply); + array_free(&dict->replies); + array_free(&dict->input_states); + i_free(dict->expire_value); + i_free(dict->key_prefix); + i_free(dict->password); + i_free(dict); + + if (redis_connections->connections == NULL) + connection_list_deinit(&redis_connections); +} + +static void redis_dict_wait(struct dict *_dict) +{ + struct redis_dict *dict = (struct redis_dict *)_dict; + + if (array_count(&dict->input_states) > 0) + redis_wait(dict); +} + +static void redis_dict_lookup_timeout(struct redis_dict *dict) +{ + const char *reason = t_strdup_printf( + "redis: Lookup timed out in %u.%03u secs", + dict->timeout_msecs/1000, dict->timeout_msecs%1000); + redis_disconnected(&dict->conn, reason); +} + +static const char * +redis_dict_get_full_key(struct redis_dict *dict, const char *username, + const char *key) +{ + const char *username_sp = strchr(username, DICT_USERNAME_SEPARATOR); + + if (str_begins(key, DICT_PATH_SHARED)) + key += strlen(DICT_PATH_SHARED); + else if (str_begins(key, DICT_PATH_PRIVATE)) { + key = t_strdup_printf("%s%c%s", + username_sp == NULL ? username : + redis_escape_username(username), + DICT_USERNAME_SEPARATOR, + key + strlen(DICT_PATH_PRIVATE)); + } else { + i_unreached(); + } + if (*dict->key_prefix != '\0') + key = t_strconcat(dict->key_prefix, key, NULL); + return key; +} + +static void redis_dict_auth(struct redis_dict *dict) +{ + const char *cmd; + + if (*dict->password == '\0') + return; + + cmd = t_strdup_printf("*2\r\n$4\r\nAUTH\r\n$%d\r\n%s\r\n", + (int)strlen(dict->password), dict->password); + o_stream_nsend_str(dict->conn.conn.output, cmd); + redis_input_state_add(dict, REDIS_INPUT_STATE_AUTH); +} + +static void redis_dict_select_db(struct redis_dict *dict) +{ + const char *cmd, *db_str; + + if (dict->db_id_set) + return; + dict->db_id_set = TRUE; + if (dict->db_id == 0) { + /* 0 is the default */ + return; + } + db_str = dec2str(dict->db_id); + cmd = t_strdup_printf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", + (int)strlen(db_str), db_str); + o_stream_nsend_str(dict->conn.conn.output, cmd); + redis_input_state_add(dict, REDIS_INPUT_STATE_SELECT); +} + +static int redis_dict_lookup(struct dict *_dict, + const struct dict_op_settings *set, + pool_t pool, const char *key, + const char **value_r, const char **error_r) +{ + struct redis_dict *dict = (struct redis_dict *)_dict; + struct timeout *to; + const char *cmd; + + key = redis_dict_get_full_key(dict, set->username, key); + + dict->conn.value_received = FALSE; + dict->conn.value_not_found = FALSE; + + i_assert(dict->dict.ioloop == NULL); + + dict->dict.prev_ioloop = current_ioloop; + dict->dict.ioloop = io_loop_create(); + connection_switch_ioloop(&dict->conn.conn); + + if (dict->conn.conn.fd_in == -1 && + connection_client_connect(&dict->conn.conn) < 0) { + e_error(dict->conn.conn.event, "Couldn't connect"); + } else { + to = timeout_add(dict->timeout_msecs, + redis_dict_lookup_timeout, dict); + if (!dict->connected) { + /* wait for connection */ + io_loop_run(dict->dict.ioloop); + if (dict->connected) + redis_dict_auth(dict); + } + + if (dict->connected) { + redis_dict_select_db(dict); + cmd = t_strdup_printf("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", + (int)strlen(key), key); + o_stream_nsend_str(dict->conn.conn.output, cmd); + + str_truncate(dict->conn.last_reply, 0); + redis_input_state_add(dict, REDIS_INPUT_STATE_GET); + do { + io_loop_run(dict->dict.ioloop); + } while (array_count(&dict->input_states) > 0); + } + timeout_remove(&to); + } + + io_loop_set_current(dict->dict.prev_ioloop); + connection_switch_ioloop(&dict->conn.conn); + io_loop_set_current(dict->dict.ioloop); + io_loop_destroy(&dict->dict.ioloop); + dict->dict.prev_ioloop = NULL; + + if (!dict->conn.value_received) { + /* we failed in some way. make sure we disconnect since the + connection state isn't known anymore */ + *error_r = t_strdup_printf("redis: Communication failure (last reply: %s)", + str_c(dict->conn.last_reply)); + redis_disconnected(&dict->conn, *error_r); + return -1; + } + if (dict->conn.value_not_found) + return 0; + + *value_r = p_strdup(pool, str_c(dict->conn.last_reply)); + return 1; +} + +static struct dict_transaction_context * +redis_transaction_init(struct dict *_dict) +{ + struct redis_dict *dict = (struct redis_dict *)_dict; + struct redis_dict_transaction_context *ctx; + + i_assert(!dict->transaction_open); + dict->transaction_open = TRUE; + + ctx = i_new(struct redis_dict_transaction_context, 1); + ctx->ctx.dict = _dict; + + if (dict->conn.conn.fd_in == -1 && + connection_client_connect(&dict->conn.conn) < 0) { + e_error(dict->conn.conn.event, "Couldn't connect"); + } else if (!dict->connected) { + /* wait for connection */ + redis_wait(dict); + if (dict->connected) + redis_dict_auth(dict); + } + if (dict->connected) + redis_dict_select_db(dict); + return &ctx->ctx; +} + +static void +redis_transaction_commit(struct dict_transaction_context *_ctx, bool async, + dict_transaction_commit_callback_t *callback, + void *context) +{ + struct redis_dict_transaction_context *ctx = + (struct redis_dict_transaction_context *)_ctx; + struct redis_dict *dict = (struct redis_dict *)_ctx->dict; + struct redis_dict_reply *reply; + unsigned int i; + struct dict_commit_result result = { .ret = DICT_COMMIT_RET_OK }; + + i_assert(dict->transaction_open); + dict->transaction_open = FALSE; + + if (ctx->error != NULL) { + /* make sure we're disconnected */ + redis_disconnected(&dict->conn, ctx->error); + result.ret = -1; + result.error = ctx->error; + } else if (_ctx->changed) { + i_assert(ctx->cmd_count > 0); + + o_stream_nsend_str(dict->conn.conn.output, + "*1\r\n$4\r\nEXEC\r\n"); + reply = array_append_space(&dict->replies); + reply->callback = callback; + reply->context = context; + reply->reply_count = ctx->cmd_count; + redis_input_state_add(dict, REDIS_INPUT_STATE_EXEC); + for (i = 0; i < ctx->cmd_count; i++) + redis_input_state_add(dict, REDIS_INPUT_STATE_EXEC_REPLY); + if (async) { + i_free(ctx); + return; + } + redis_wait(dict); + } + callback(&result, context); + i_free(ctx->error); + i_free(ctx); +} + +static void redis_transaction_rollback(struct dict_transaction_context *_ctx) +{ + struct redis_dict_transaction_context *ctx = + (struct redis_dict_transaction_context *)_ctx; + struct redis_dict *dict = (struct redis_dict *)_ctx->dict; + struct redis_dict_reply *reply; + + i_assert(dict->transaction_open); + dict->transaction_open = FALSE; + + if (ctx->error != NULL) { + /* make sure we're disconnected */ + redis_disconnected(&dict->conn, ctx->error); + } else if (_ctx->changed) { + o_stream_nsend_str(dict->conn.conn.output, + "*1\r\n$7\r\nDISCARD\r\n"); + reply = array_append_space(&dict->replies); + reply->reply_count = 1; + redis_input_state_add(dict, REDIS_INPUT_STATE_DISCARD); + } + i_free(ctx->error); + i_free(ctx); +} + +static int redis_check_transaction(struct redis_dict_transaction_context *ctx) +{ + struct redis_dict *dict = (struct redis_dict *)ctx->ctx.dict; + + if (ctx->error != NULL) + return -1; + if (!dict->connected) { + ctx->error = i_strdup("Disconnected during transaction"); + return -1; + } + if (ctx->ctx.changed) + return 0; + + redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); + if (o_stream_send_str(dict->conn.conn.output, + "*1\r\n$5\r\nMULTI\r\n") < 0) { + ctx->error = i_strdup_printf("write() failed: %s", + o_stream_get_error(dict->conn.conn.output)); + return -1; + } + return 0; +} + +static void +redis_append_expire(struct redis_dict_transaction_context *ctx, + string_t *cmd, const char *key) +{ + struct redis_dict *dict = (struct redis_dict *)ctx->ctx.dict; + + if (dict->expire_value == NULL) + return; + + str_printfa(cmd, "*3\r\n$6\r\nEXPIRE\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n", + (unsigned int)strlen(key), key, + (unsigned int)strlen(dict->expire_value), + dict->expire_value); + redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); + ctx->cmd_count++; +} + +static void redis_set(struct dict_transaction_context *_ctx, + const char *key, const char *value) +{ + struct redis_dict_transaction_context *ctx = + (struct redis_dict_transaction_context *)_ctx; + struct redis_dict *dict = (struct redis_dict *)_ctx->dict; + const struct dict_op_settings_private *set = &_ctx->set; + string_t *cmd; + + if (redis_check_transaction(ctx) < 0) + return; + + key = redis_dict_get_full_key(dict, set->username, key); + cmd = t_str_new(128); + str_printfa(cmd, "*3\r\n$3\r\nSET\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n", + (unsigned int)strlen(key), key, + (unsigned int)strlen(value), value); + redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); + ctx->cmd_count++; + redis_append_expire(ctx, cmd, key); + if (o_stream_send(dict->conn.conn.output, str_data(cmd), str_len(cmd)) < 0) { + ctx->error = i_strdup_printf("write() failed: %s", + o_stream_get_error(dict->conn.conn.output)); + } +} + +static void redis_unset(struct dict_transaction_context *_ctx, + const char *key) +{ + struct redis_dict_transaction_context *ctx = + (struct redis_dict_transaction_context *)_ctx; + struct redis_dict *dict = (struct redis_dict *)_ctx->dict; + const struct dict_op_settings_private *set = &_ctx->set; + const char *cmd; + + if (redis_check_transaction(ctx) < 0) + return; + + key = redis_dict_get_full_key(dict, set->username, key); + cmd = t_strdup_printf("*2\r\n$3\r\nDEL\r\n$%u\r\n%s\r\n", + (unsigned int)strlen(key), key); + if (o_stream_send_str(dict->conn.conn.output, cmd) < 0) { + ctx->error = i_strdup_printf("write() failed: %s", + o_stream_get_error(dict->conn.conn.output)); + } + redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); + ctx->cmd_count++; +} + +static void redis_atomic_inc(struct dict_transaction_context *_ctx, + const char *key, long long diff) +{ + struct redis_dict_transaction_context *ctx = + (struct redis_dict_transaction_context *)_ctx; + struct redis_dict *dict = (struct redis_dict *)_ctx->dict; + const struct dict_op_settings_private *set = &_ctx->set; + const char *diffstr; + string_t *cmd; + + if (redis_check_transaction(ctx) < 0) + return; + + key = redis_dict_get_full_key(dict, set->username, key); + diffstr = t_strdup_printf("%lld", diff); + cmd = t_str_new(128); + str_printfa(cmd, "*3\r\n$6\r\nINCRBY\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n", + (unsigned int)strlen(key), key, + (unsigned int)strlen(diffstr), diffstr); + redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI); + ctx->cmd_count++; + redis_append_expire(ctx, cmd, key); + if (o_stream_send(dict->conn.conn.output, str_data(cmd), str_len(cmd)) < 0) { + ctx->error = i_strdup_printf("write() failed: %s", + o_stream_get_error(dict->conn.conn.output)); + } +} + +struct dict dict_driver_redis = { + .name = "redis", + { + .init = redis_dict_init, + .deinit = redis_dict_deinit, + .wait = redis_dict_wait, + .lookup = redis_dict_lookup, + .transaction_init = redis_transaction_init, + .transaction_commit = redis_transaction_commit, + .transaction_rollback = redis_transaction_rollback, + .set = redis_set, + .unset = redis_unset, + .atomic_inc = redis_atomic_inc, + } +}; |