diff options
Diffstat (limited to '')
-rw-r--r-- | src/lib-dict/dict-client.c | 1492 |
1 files changed, 1492 insertions, 0 deletions
diff --git a/src/lib-dict/dict-client.c b/src/lib-dict/dict-client.c new file mode 100644 index 0000000..32b2da1 --- /dev/null +++ b/src/lib-dict/dict-client.c @@ -0,0 +1,1492 @@ +/* Copyright (c) 2005-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "llist.h" +#include "str.h" +#include "strescape.h" +#include "file-lock.h" +#include "time-util.h" +#include "connection.h" +#include "ostream.h" +#include "eacces-error.h" +#include "dict-private.h" +#include "dict-client.h" + +#include <unistd.h> +#include <fcntl.h> + +/* Disconnect from dict server after this many milliseconds of idling after + sending a command. Because dict server does blocking dict accesses, it can + handle only one client at a time. This is why the default timeout is zero, + so that there won't be many dict processes just doing nothing. Zero means + that the socket is disconnected immediately after returning to ioloop. */ +#define DICT_CLIENT_DEFAULT_TIMEOUT_MSECS 0 + +/* Abort dict lookup after this many seconds. */ +#define DICT_CLIENT_REQUEST_TIMEOUT_MSECS 30000 +/* When dict lookup timeout is reached, wait a bit longer if the last dict + ioloop wait was shorter than this. */ +#define DICT_CLIENT_REQUEST_TIMEOUT_MIN_LAST_IOLOOP_WAIT_MSECS 1000 +/* Log a warning if dict lookup takes longer than this many milliseconds. */ +#define DICT_CLIENT_DEFAULT_WARN_SLOW_MSECS 5000 + +struct client_dict_cmd { + int refcount; + struct client_dict *dict; + struct timeval start_time; + char *query; + unsigned int async_id; + struct timeval async_id_received_time; + + uint64_t start_global_ioloop_usecs; + uint64_t start_dict_ioloop_usecs; + uint64_t start_lock_usecs; + + bool reconnected; + bool retry_errors; + bool no_replies; + bool unfinished; + bool background; + + void (*callback)(struct client_dict_cmd *cmd, + enum dict_protocol_reply reply, const char *value, + const char *const *extra_args, const char *error, + bool disconnected); + struct client_dict_iterate_context *iter; + struct client_dict_transaction_context *trans; + + struct { + dict_lookup_callback_t *lookup; + dict_transaction_commit_callback_t *commit; + void *context; + } api_callback; +}; + +struct dict_client_connection { + struct connection conn; + struct client_dict *dict; +}; + +struct client_dict { + struct dict dict; + struct dict_client_connection conn; + + char *uri; + enum dict_data_type value_type; + unsigned warn_slow_msecs; + + time_t last_failed_connect; + char *last_connect_error; + + struct io_wait_timer *wait_timer; + uint64_t last_timer_switch_usecs; + struct timeout *to_requests; + struct timeout *to_idle; + unsigned int idle_msecs; + + ARRAY(struct client_dict_cmd *) cmds; + struct client_dict_transaction_context *transactions; + + unsigned int transaction_id_counter; +}; + +struct client_dict_iter_result { + const char *key, *const *values; +}; + +struct client_dict_iterate_context { + struct dict_iterate_context ctx; + char *error; + char *path; + enum dict_iterate_flags flags; + int refcount; + + pool_t results_pool; + ARRAY(struct client_dict_iter_result) results; + unsigned int result_idx; + + bool cmd_sent; + bool seen_results; + bool finished; + bool deinit; +}; + +struct client_dict_transaction_context { + struct dict_transaction_context ctx; + struct client_dict_transaction_context *prev, *next; + + char *first_query; + char *error; + + unsigned int id; + unsigned int query_count; + + bool sent_begin:1; +}; + +static struct connection_list *dict_connections; + +static int client_dict_connect(struct client_dict *dict, const char **error_r); +static int client_dict_reconnect(struct client_dict *dict, const char *reason, + const char **error_r); +static void client_dict_disconnect(struct client_dict *dict, const char *reason); +static const char *dict_wait_warnings(const struct client_dict_cmd *cmd); + +static struct client_dict_cmd * +client_dict_cmd_init(struct client_dict *dict, const char *query) +{ + struct client_dict_cmd *cmd; + + io_loop_time_refresh(); + + cmd = i_new(struct client_dict_cmd, 1); + cmd->refcount = 1; + cmd->dict = dict; + cmd->query = i_strdup(query); + cmd->start_time = ioloop_timeval; + cmd->start_global_ioloop_usecs = ioloop_global_wait_usecs; + cmd->start_dict_ioloop_usecs = io_wait_timer_get_usecs(dict->wait_timer); + cmd->start_lock_usecs = file_lock_wait_get_total_usecs(); + return cmd; +} + +static void client_dict_cmd_ref(struct client_dict_cmd *cmd) +{ + i_assert(cmd->refcount > 0); + cmd->refcount++; +} + +static bool client_dict_cmd_unref(struct client_dict_cmd *cmd) +{ + i_assert(cmd->refcount > 0); + if (--cmd->refcount > 0) + return TRUE; + + i_assert(cmd->trans == NULL); + + i_free(cmd->query); + i_free(cmd); + return FALSE; +} + +static bool +dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *const *args) +{ + const char *value = args[0]; + enum dict_protocol_reply reply; + + if (value == NULL) { + /* "" is a valid iteration reply */ + reply = DICT_PROTOCOL_REPLY_ITER_FINISHED; + } else { + reply = value[0]; + value++; + args++; + } + + cmd->unfinished = FALSE; + cmd->callback(cmd, reply, value, args, NULL, FALSE); + return !cmd->unfinished; +} + +static void +dict_cmd_callback_error(struct client_dict_cmd *cmd, const char *error, + bool disconnected) +{ + const char *null_arg = NULL; + + cmd->unfinished = FALSE; + if (cmd->callback != NULL) { + cmd->callback(cmd, DICT_PROTOCOL_REPLY_ERROR, + "", &null_arg, error, disconnected); + } + i_assert(!cmd->unfinished); +} + +static struct client_dict_cmd * +client_dict_cmd_first_nonbg(struct client_dict *dict) +{ + struct client_dict_cmd *const *cmds; + unsigned int i, count; + + cmds = array_get(&dict->cmds, &count); + for (i = 0; i < count; i++) { + if (!cmds[i]->background) + return cmds[i]; + } + return NULL; +} + +static void client_dict_input_timeout(struct client_dict *dict) +{ + struct client_dict_cmd *cmd; + const char *error; + uint64_t msecs_in_last_dict_ioloop_wait; + int cmd_diff; + + /* find the first non-background command. there must be at least one. */ + cmd = client_dict_cmd_first_nonbg(dict); + i_assert(cmd != NULL); + + cmd_diff = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time); + if (cmd_diff < DICT_CLIENT_REQUEST_TIMEOUT_MSECS) { + /* need to re-create this timeout. the currently-oldest + command was added when another command was still + running with an older timeout. */ + timeout_remove(&dict->to_requests); + dict->to_requests = + timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS - cmd_diff, + client_dict_input_timeout, dict); + return; + } + + /* If we've gotten here because all the time was spent in other ioloops + or locks, make sure there's a bit of time waiting for the dict + ioloop as well. There's a good chance that the reply can be read. */ + msecs_in_last_dict_ioloop_wait = + (io_wait_timer_get_usecs(dict->wait_timer) - + dict->last_timer_switch_usecs + 999) / 1000; + if (msecs_in_last_dict_ioloop_wait < DICT_CLIENT_REQUEST_TIMEOUT_MIN_LAST_IOLOOP_WAIT_MSECS) { + timeout_remove(&dict->to_requests); + dict->to_requests = + timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MIN_LAST_IOLOOP_WAIT_MSECS - + msecs_in_last_dict_ioloop_wait, + client_dict_input_timeout, dict); + return; + } + + (void)client_dict_reconnect(dict, t_strdup_printf( + "Dict server timeout: %s " + "(%u commands pending, oldest sent %u.%03u secs ago: %s, %s)", + connection_input_timeout_reason(&dict->conn.conn), + array_count(&dict->cmds), + cmd_diff/1000, cmd_diff%1000, cmd->query, + dict_wait_warnings(cmd)), &error); +} + +static int +client_dict_cmd_query_send(struct client_dict *dict, const char *query) +{ + struct const_iovec iov[2]; + ssize_t ret; + + iov[0].iov_base = query; + iov[0].iov_len = strlen(query); + iov[1].iov_base = "\n"; + iov[1].iov_len = 1; + ret = o_stream_sendv(dict->conn.conn.output, iov, 2); + if (ret < 0) + return -1; + i_assert((size_t)ret == iov[0].iov_len + 1); + return 0; +} + +static bool +client_dict_cmd_send(struct client_dict *dict, struct client_dict_cmd **_cmd, + const char **error_r) +{ + struct client_dict_cmd *cmd = *_cmd; + const char *error = NULL; + bool retry = cmd->retry_errors; + int ret; + + *_cmd = NULL; + + /* we're no longer idling. even with no_replies=TRUE we're going to + wait for COMMIT/ROLLBACK. */ + timeout_remove(&dict->to_idle); + + if (client_dict_connect(dict, &error) < 0) { + retry = FALSE; + ret = -1; + } else { + ret = client_dict_cmd_query_send(dict, cmd->query); + if (ret < 0) { + error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name, + o_stream_get_error(dict->conn.conn.output)); + } + } + if (ret < 0 && retry) { + /* Reconnect and try again. */ + if (client_dict_reconnect(dict, error, &error) < 0) + ; + else if (client_dict_cmd_query_send(dict, cmd->query) < 0) { + error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name, + o_stream_get_error(dict->conn.conn.output)); + } else { + ret = 0; + } + } + + if (cmd->no_replies) { + /* just send and forget */ + client_dict_cmd_unref(cmd); + return TRUE; + } else if (ret < 0) { + i_assert(error != NULL); + /* we didn't successfully send this command to dict */ + dict_cmd_callback_error(cmd, error, cmd->reconnected); + client_dict_cmd_unref(cmd); + if (error_r != NULL) + *error_r = error; + return FALSE; + } else { + if (dict->to_requests == NULL && !cmd->background) { + dict->to_requests = + timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS, + client_dict_input_timeout, dict); + } + array_push_back(&dict->cmds, &cmd); + return TRUE; + } +} + +static bool +client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx, + const struct dict_op_settings_private *set) +{ + struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; + struct client_dict_cmd *cmd; + const char *query, *error; + + i_assert(ctx->error == NULL); + + ctx->sent_begin = TRUE; + + /* transactions commands don't have replies. only COMMIT has. */ + query = t_strdup_printf("%c%u\t%s", DICT_PROTOCOL_CMD_BEGIN, + ctx->id, + set->username == NULL ? "" : str_tabescape(set->username)); + cmd = client_dict_cmd_init(dict, query); + cmd->no_replies = TRUE; + cmd->retry_errors = TRUE; + if (!client_dict_cmd_send(dict, &cmd, &error)) { + ctx->error = i_strdup(error); + return FALSE; + } + return TRUE; +} + +static void +client_dict_send_transaction_query(struct client_dict_transaction_context *ctx, + const char *query) +{ + struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; + const struct dict_op_settings_private *set = &ctx->ctx.set; + struct client_dict_cmd *cmd; + const char *error; + + if (ctx->error != NULL) + return; + + if (!ctx->sent_begin) { + if (!client_dict_transaction_send_begin(ctx, set)) + return; + } + + ctx->query_count++; + if (ctx->first_query == NULL) + ctx->first_query = i_strdup(query); + + cmd = client_dict_cmd_init(dict, query); + cmd->no_replies = TRUE; + if (!client_dict_cmd_send(dict, &cmd, &error)) + ctx->error = i_strdup(error); +} + +static bool client_dict_is_finished(struct client_dict *dict) +{ + return dict->transactions == NULL && array_count(&dict->cmds) == 0; +} + +static void client_dict_timeout(struct client_dict *dict) +{ + if (client_dict_is_finished(dict)) + client_dict_disconnect(dict, "Idle disconnection"); + else + timeout_remove(&dict->to_idle); +} + +static bool client_dict_have_nonbackground_cmds(struct client_dict *dict) +{ + struct client_dict_cmd *cmd; + + array_foreach_elem(&dict->cmds, cmd) { + if (!cmd->background) + return TRUE; + } + return FALSE; +} + +static void client_dict_add_timeout(struct client_dict *dict) +{ + if (dict->to_idle != NULL) { + if (dict->idle_msecs > 0) + timeout_reset(dict->to_idle); + } else if (client_dict_is_finished(dict)) { + dict->to_idle = timeout_add(dict->idle_msecs, + client_dict_timeout, dict); + timeout_remove(&dict->to_requests); + } else if (dict->transactions == NULL && + !client_dict_have_nonbackground_cmds(dict)) { + /* we had non-background commands, but now we're back to + having only background commands. remove timeouts. */ + timeout_remove(&dict->to_requests); + } +} + +static void client_dict_cmd_backgrounded(struct client_dict *dict) +{ + if (dict->to_requests == NULL) + return; + + if (!client_dict_have_nonbackground_cmds(dict)) { + /* we only have background-commands. + remove the request timeout. */ + timeout_remove(&dict->to_requests); + } +} + +static int +dict_conn_assign_next_async_id(struct dict_client_connection *conn, + const char *line) +{ + struct client_dict_cmd *const *cmds; + unsigned int i, count, async_id; + + i_assert(line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID); + + if (str_to_uint(line+1, &async_id) < 0 || async_id == 0) { + e_error(conn->conn.event, "Received invalid async-id line: %s", + line); + return -1; + } + cmds = array_get(&conn->dict->cmds, &count); + for (i = 0; i < count; i++) { + if (cmds[i]->async_id == 0) { + cmds[i]->async_id = async_id; + cmds[i]->async_id_received_time = ioloop_timeval; + return 0; + } + } + e_error(conn->conn.event, "Received async-id line, but all %u " + "commands already have it: %s", + count, line); + return -1; +} + +static int dict_conn_find_async_id(struct dict_client_connection *conn, + const char *async_arg, + const char *line, unsigned int *idx_r) +{ + struct client_dict_cmd *const *cmds; + unsigned int i, count, async_id; + + i_assert(async_arg[0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY); + + if (str_to_uint(async_arg+1, &async_id) < 0 || async_id == 0) { + e_error(conn->conn.event, "Received invalid async-reply line: %s", + line); + return -1; + } + + cmds = array_get(&conn->dict->cmds, &count); + for (i = 0; i < count; i++) { + if (cmds[i]->async_id == async_id) { + *idx_r = i; + return 0; + } + } + e_error(conn->conn.event, "Received reply for nonexistent async-id %u: %s", + async_id, line); + return -1; +} + +static int dict_conn_input_line(struct connection *_conn, const char *line) +{ + struct dict_client_connection *conn = + (struct dict_client_connection *)_conn; + struct client_dict *dict = conn->dict; + struct client_dict_cmd *const *cmds; + const char *const *args; + unsigned int i, count; + bool finished; + + if (dict->to_requests != NULL) + timeout_reset(dict->to_requests); + + if (line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID) + return dict_conn_assign_next_async_id(conn, line) < 0 ? -1 : 1; + + cmds = array_get(&conn->dict->cmds, &count); + if (count == 0) { + e_error(conn->conn.event, "Received reply without pending commands: %s", + line); + return -1; + } + + args = t_strsplit_tabescaped(line); + if (args[0] != NULL && args[0][0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY) { + if (dict_conn_find_async_id(conn, args[0], line, &i) < 0) + return -1; + args++; + } else { + i = 0; + } + i_assert(!cmds[i]->no_replies); + + client_dict_cmd_ref(cmds[i]); + finished = dict_cmd_callback_line(cmds[i], args); + if (!client_dict_cmd_unref(cmds[i])) { + /* disconnected during command handling */ + return -1; + } + if (!finished) { + /* more lines needed for this command */ + return 1; + } + client_dict_cmd_unref(cmds[i]); + array_delete(&dict->cmds, i, 1); + + client_dict_add_timeout(dict); + return 1; +} + +static int client_dict_connect(struct client_dict *dict, const char **error_r) +{ + const char *query, *error; + + if (dict->conn.conn.fd_in != -1) + return 0; + if (dict->last_failed_connect == ioloop_time) { + /* Try again later */ + *error_r = dict->last_connect_error; + return -1; + } + + if (connection_client_connect(&dict->conn.conn) < 0) { + dict->last_failed_connect = ioloop_time; + if (errno == EACCES) { + error = eacces_error_get("net_connect_unix", + dict->conn.conn.name); + } else { + error = t_strdup_printf( + "net_connect_unix(%s) failed: %m", dict->conn.conn.name); + } + i_free(dict->last_connect_error); + dict->last_connect_error = i_strdup(error); + *error_r = error; + return -1; + } + + query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n", + DICT_PROTOCOL_CMD_HELLO, + DICT_CLIENT_PROTOCOL_MAJOR_VERSION, + DICT_CLIENT_PROTOCOL_MINOR_VERSION, + dict->value_type, + "", + str_tabescape(dict->uri)); + o_stream_nsend_str(dict->conn.conn.output, query); + client_dict_add_timeout(dict); + return 0; +} + +static void +client_dict_abort_commands(struct client_dict *dict, const char *reason) +{ + ARRAY(struct client_dict_cmd *) cmds_copy; + struct client_dict_cmd *cmd; + + /* abort all commands */ + t_array_init(&cmds_copy, array_count(&dict->cmds)); + array_append_array(&cmds_copy, &dict->cmds); + array_clear(&dict->cmds); + + array_foreach_elem(&cmds_copy, cmd) { + dict_cmd_callback_error(cmd, reason, TRUE); + client_dict_cmd_unref(cmd); + } +} + +static void client_dict_disconnect(struct client_dict *dict, const char *reason) +{ + struct client_dict_transaction_context *ctx, *next; + + client_dict_abort_commands(dict, reason); + + /* all transactions that have sent BEGIN are no longer valid */ + for (ctx = dict->transactions; ctx != NULL; ctx = next) { + next = ctx->next; + if (ctx->sent_begin && ctx->error == NULL) + ctx->error = i_strdup(reason); + } + + timeout_remove(&dict->to_idle); + timeout_remove(&dict->to_requests); + connection_disconnect(&dict->conn.conn); +} + +static int client_dict_reconnect(struct client_dict *dict, const char *reason, + const char **error_r) +{ + ARRAY(struct client_dict_cmd *) retry_cmds; + struct client_dict_cmd *cmd; + const char *error; + int ret; + + t_array_init(&retry_cmds, array_count(&dict->cmds)); + for (unsigned int i = 0; i < array_count(&dict->cmds); ) { + cmd = array_idx_elem(&dict->cmds, i); + if (!cmd->retry_errors) { + i++; + } else if (cmd->iter != NULL && + cmd->iter->seen_results) { + /* don't retry iteration that already returned + something to the caller. otherwise we'd return + duplicates. */ + i++; + } else { + array_push_back(&retry_cmds, &cmd); + array_delete(&dict->cmds, i, 1); + } + } + client_dict_disconnect(dict, reason); + if (client_dict_connect(dict, error_r) < 0) { + reason = t_strdup_printf("%s - reconnect failed: %s", + reason, *error_r); + array_foreach_elem(&retry_cmds, cmd) { + dict_cmd_callback_error(cmd, reason, TRUE); + client_dict_cmd_unref(cmd); + } + return -1; + } + if (array_count(&retry_cmds) == 0) + return 0; + e_warning(dict->conn.conn.event, "%s - reconnected", reason); + + ret = 0; error = ""; + array_foreach_elem(&retry_cmds, cmd) { + cmd->reconnected = TRUE; + cmd->async_id = 0; + /* if it fails again, don't retry anymore */ + cmd->retry_errors = FALSE; + if (ret < 0) { + dict_cmd_callback_error(cmd, error, TRUE); + client_dict_cmd_unref(cmd); + } else if (!client_dict_cmd_send(dict, &cmd, &error)) + ret = -1; + } + return ret; +} + +static void dict_conn_destroy(struct connection *_conn) +{ + struct dict_client_connection *conn = + (struct dict_client_connection *)_conn; + + client_dict_disconnect(conn->dict, connection_disconnect_reason(_conn)); +} + +static const struct connection_settings dict_conn_set = { + .input_max_size = SIZE_MAX, + .output_max_size = SIZE_MAX, + .unix_client_connect_msecs = 1000, + .client = TRUE +}; + +static const struct connection_vfuncs dict_conn_vfuncs = { + .destroy = dict_conn_destroy, + .input_line = dict_conn_input_line +}; + +static int +client_dict_init(struct dict *driver, const char *uri, + const struct dict_settings *set, + struct dict **dict_r, const char **error_r) +{ + struct ioloop *old_ioloop = current_ioloop; + struct client_dict *dict; + const char *p, *dest_uri, *path; + unsigned int idle_msecs = DICT_CLIENT_DEFAULT_TIMEOUT_MSECS; + unsigned int warn_slow_msecs = DICT_CLIENT_DEFAULT_WARN_SLOW_MSECS; + + /* uri = [idle_msecs=<n>:] [warn_slow_msecs=<n>:] [<path>] ":" <uri> */ + for (;;) { + if (str_begins(uri, "idle_msecs=")) { + p = strchr(uri+11, ':'); + if (p == NULL) { + *error_r = t_strdup_printf("Invalid URI: %s", uri); + return -1; + } + if (str_to_uint(t_strdup_until(uri+11, p), &idle_msecs) < 0) { + *error_r = "Invalid idle_msecs"; + return -1; + } + uri = p+1; + } else if (str_begins(uri, "warn_slow_msecs=")) { + p = strchr(uri+11, ':'); + if (p == NULL) { + *error_r = t_strdup_printf("Invalid URI: %s", uri); + return -1; + } + if (str_to_uint(t_strdup_until(uri+16, p), &warn_slow_msecs) < 0) { + *error_r = "Invalid warn_slow_msecs"; + return -1; + } + uri = p+1; + } else { + break; + } + } + dest_uri = strchr(uri, ':'); + if (dest_uri == NULL) { + *error_r = t_strdup_printf("Invalid URI: %s", uri); + return -1; + } + + if (dict_connections == NULL) { + dict_connections = connection_list_init(&dict_conn_set, + &dict_conn_vfuncs); + } + + dict = i_new(struct client_dict, 1); + dict->dict = *driver; + dict->conn.dict = dict; + dict->conn.conn.event_parent = set->event_parent; + dict->idle_msecs = idle_msecs; + dict->warn_slow_msecs = warn_slow_msecs; + i_array_init(&dict->cmds, 32); + + if (uri[0] == ':') { + /* default path */ + path = t_strconcat(set->base_dir, + "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL); + } else if (uri[0] == '/') { + /* absolute path */ + path = t_strdup_until(uri, dest_uri); + } else { + /* relative path to base_dir */ + path = t_strconcat(set->base_dir, "/", + t_strdup_until(uri, dest_uri), NULL); + } + connection_init_client_unix(dict_connections, &dict->conn.conn, path); + dict->uri = i_strdup(dest_uri + 1); + + dict->dict.ioloop = io_loop_create(); + dict->wait_timer = io_wait_timer_add(); + io_loop_set_current(old_ioloop); + *dict_r = &dict->dict; + return 0; +} + +static void client_dict_deinit(struct dict *_dict) +{ + struct client_dict *dict = (struct client_dict *)_dict; + struct ioloop *old_ioloop = current_ioloop; + + client_dict_disconnect(dict, "Deinit"); + connection_deinit(&dict->conn.conn); + io_wait_timer_remove(&dict->wait_timer); + + i_assert(dict->transactions == NULL); + i_assert(array_count(&dict->cmds) == 0); + + io_loop_set_current(dict->dict.ioloop); + io_loop_destroy(&dict->dict.ioloop); + io_loop_set_current(old_ioloop); + + array_free(&dict->cmds); + i_free(dict->last_connect_error); + i_free(dict->uri); + i_free(dict); + + if (dict_connections->connections == NULL) + connection_list_deinit(&dict_connections); +} + +static void client_dict_wait(struct dict *_dict) +{ + struct client_dict *dict = (struct client_dict *)_dict; + + if (array_count(&dict->cmds) == 0) + return; + + i_assert(io_loop_is_empty(dict->dict.ioloop)); + dict->dict.prev_ioloop = current_ioloop; + io_loop_set_current(dict->dict.ioloop); + dict_switch_ioloop(_dict); + while (array_count(&dict->cmds) > 0) + io_loop_run(dict->dict.ioloop); + + io_loop_set_current(dict->dict.prev_ioloop); + dict->dict.prev_ioloop = NULL; + + dict_switch_ioloop(_dict); + i_assert(io_loop_is_empty(dict->dict.ioloop)); +} + +static bool client_dict_switch_ioloop(struct dict *_dict) +{ + struct client_dict *dict = (struct client_dict *)_dict; + + dict->last_timer_switch_usecs = + io_wait_timer_get_usecs(dict->wait_timer); + dict->wait_timer = io_wait_timer_move(&dict->wait_timer); + if (dict->to_idle != NULL) + dict->to_idle = io_loop_move_timeout(&dict->to_idle); + if (dict->to_requests != NULL) + dict->to_requests = io_loop_move_timeout(&dict->to_requests); + connection_switch_ioloop(&dict->conn.conn); + return array_count(&dict->cmds) > 0; +} + +static const char *dict_wait_warnings(const struct client_dict_cmd *cmd) +{ + int global_ioloop_msecs = (ioloop_global_wait_usecs - + cmd->start_global_ioloop_usecs + 999) / 1000; + int dict_ioloop_msecs = (io_wait_timer_get_usecs(cmd->dict->wait_timer) - + cmd->start_dict_ioloop_usecs + 999) / 1000; + int other_ioloop_msecs = global_ioloop_msecs - dict_ioloop_msecs; + int lock_msecs = (file_lock_wait_get_total_usecs() - + cmd->start_lock_usecs + 999) / 1000; + + return t_strdup_printf( + "%d.%03d in dict wait, %d.%03d in other ioloops, %d.%03d in locks", + dict_ioloop_msecs/1000, dict_ioloop_msecs%1000, + other_ioloop_msecs/1000, other_ioloop_msecs%1000, + lock_msecs/1000, lock_msecs%1000); +} + +static const char * +dict_warnings_sec(const struct client_dict_cmd *cmd, int msecs, + const char *const *extra_args) +{ + string_t *str = t_str_new(64); + struct timeval tv_start, tv_end; + unsigned int tv_start_usec, tv_end_usec; + + str_printfa(str, "%d.%03d secs (%s", msecs/1000, msecs%1000, + dict_wait_warnings(cmd)); + if (cmd->reconnected) { + int reconnected_msecs = + timeval_diff_msecs(&ioloop_timeval, + &cmd->dict->conn.conn.connect_started); + str_printfa(str, ", reconnected %u.%03u secs ago", + reconnected_msecs/1000, reconnected_msecs%1000); + } + if (cmd->async_id != 0) { + int async_reply_msecs = + timeval_diff_msecs(&ioloop_timeval, &cmd->async_id_received_time); + str_printfa(str, ", async-id reply %u.%03u secs ago", + async_reply_msecs/1000, async_reply_msecs%1000); + } + if (extra_args != NULL && + str_array_length(extra_args) >= 4 && + str_to_time(extra_args[0], &tv_start.tv_sec) == 0 && + str_to_uint(extra_args[1], &tv_start_usec) == 0 && + str_to_time(extra_args[2], &tv_end.tv_sec) == 0 && + str_to_uint(extra_args[3], &tv_end_usec) == 0) { + tv_start.tv_usec = tv_start_usec; + tv_end.tv_usec = tv_end_usec; + + int server_msecs_since_start = + timeval_diff_msecs(&ioloop_timeval, &tv_start); + int server_msecs = timeval_diff_msecs(&tv_end, &tv_start); + str_printfa(str, ", started on dict-server %u.%03d secs ago, " + "took %u.%03d secs", + server_msecs_since_start/1000, + server_msecs_since_start%1000, + server_msecs/1000, server_msecs%1000); + } + str_append_c(str, ')'); + return str_c(str); +} + +static void +client_dict_lookup_async_callback(struct client_dict_cmd *cmd, + enum dict_protocol_reply reply, + const char *value, + const char *const *extra_args, + const char *error, + bool disconnected ATTR_UNUSED) +{ + struct client_dict *dict = cmd->dict; + struct dict_lookup_result result; + const char *const values[] = { value, NULL }; + + i_zero(&result); + if (error != NULL) { + result.ret = -1; + result.error = error; + } else switch (reply) { + case DICT_PROTOCOL_REPLY_OK: + result.value = value; + result.values = values; + result.ret = 1; + break; + case DICT_PROTOCOL_REPLY_MULTI_OK: + result.values = t_strsplit_tabescaped(value); + result.value = result.values[0]; + result.ret = 1; + break; + case DICT_PROTOCOL_REPLY_NOTFOUND: + result.ret = 0; + break; + case DICT_PROTOCOL_REPLY_FAIL: + result.error = value[0] == '\0' ? "dict-server returned failure" : + t_strdup_printf("dict-server returned failure: %s", + value); + result.ret = -1; + break; + default: + result.error = t_strdup_printf( + "dict-client: Invalid lookup '%s' reply: %c%s", + cmd->query, reply, value); + client_dict_disconnect(dict, result.error); + result.ret = -1; + break; + } + + int diff = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time); + if (result.error != NULL) { + /* include timing info always in error messages */ + result.error = t_strdup_printf("%s (reply took %s)", + result.error, dict_warnings_sec(cmd, diff, extra_args)); + } else if (!cmd->background && + diff >= (int)dict->warn_slow_msecs) { + e_warning(dict->conn.conn.event, "dict lookup took %s: %s", + dict_warnings_sec(cmd, diff, extra_args), + cmd->query); + } + + dict_pre_api_callback(&dict->dict); + cmd->api_callback.lookup(&result, cmd->api_callback.context); + dict_post_api_callback(&dict->dict); +} + +static void +client_dict_lookup_async(struct dict *_dict, const struct dict_op_settings *set, + const char *key, dict_lookup_callback_t *callback, + void *context) +{ + struct client_dict *dict = (struct client_dict *)_dict; + struct client_dict_cmd *cmd; + const char *query; + + query = t_strdup_printf("%c%s\t%s", DICT_PROTOCOL_CMD_LOOKUP, + str_tabescape(key), + set->username == NULL ? "" : str_tabescape(set->username)); + cmd = client_dict_cmd_init(dict, query); + cmd->callback = client_dict_lookup_async_callback; + cmd->api_callback.lookup = callback; + cmd->api_callback.context = context; + cmd->retry_errors = TRUE; + + client_dict_cmd_send(dict, &cmd, NULL); +} + +struct client_dict_sync_lookup { + char *error; + char *value; + int ret; +}; + +static void client_dict_lookup_callback(const struct dict_lookup_result *result, + struct client_dict_sync_lookup *lookup) +{ + lookup->ret = result->ret; + if (result->ret == -1) + lookup->error = i_strdup(result->error); + else if (result->ret == 1) + lookup->value = i_strdup(result->value); +} + +static int client_dict_lookup(struct dict *_dict, + const struct dict_op_settings *set, + pool_t pool, const char *key, + const char **value_r, const char **error_r) +{ + struct client_dict_sync_lookup lookup; + + i_zero(&lookup); + lookup.ret = -2; + + dict_lookup_async(_dict, set, key, client_dict_lookup_callback, &lookup); + if (lookup.ret == -2) + client_dict_wait(_dict); + + switch (lookup.ret) { + case -1: + *error_r = t_strdup(lookup.error); + i_free(lookup.error); + return -1; + case 0: + i_assert(lookup.value == NULL); + *value_r = NULL; + return 0; + case 1: + *value_r = p_strdup(pool, lookup.value); + i_free(lookup.value); + return 1; + } + i_unreached(); +} + +static void client_dict_iterate_unref(struct client_dict_iterate_context *ctx) +{ + i_assert(ctx->refcount > 0); + if (--ctx->refcount > 0) + return; + i_free(ctx->error); + i_free(ctx); +} + +static void +client_dict_iter_api_callback(struct client_dict_iterate_context *ctx, + struct client_dict_cmd *cmd, + const char *const *extra_args) +{ + struct client_dict *dict = cmd->dict; + + if (ctx->deinit) { + /* Iterator was already deinitialized. Stop if we're in + client_dict_wait(). */ + dict_post_api_callback(&dict->dict); + return; + } + if (ctx->finished) { + int diff = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time); + if (ctx->error != NULL) { + /* include timing info always in error messages */ + char *new_error = i_strdup_printf("%s (reply took %s)", + ctx->error, dict_warnings_sec(cmd, diff, extra_args)); + i_free(ctx->error); + ctx->error = new_error; + } else if (!cmd->background && + diff >= (int)dict->warn_slow_msecs) { + e_warning(dict->conn.conn.event, "dict iteration took %s: %s", + dict_warnings_sec(cmd, diff, extra_args), + cmd->query); + } + } + if (ctx->ctx.async_callback != NULL) { + dict_pre_api_callback(&dict->dict); + ctx->ctx.async_callback(ctx->ctx.async_context); + dict_post_api_callback(&dict->dict); + } else { + /* synchronous lookup */ + io_loop_stop(dict->dict.ioloop); + } +} + +static void +client_dict_iter_async_callback(struct client_dict_cmd *cmd, + enum dict_protocol_reply reply, + const char *value, + const char *const *extra_args, + const char *error, + bool disconnected ATTR_UNUSED) +{ + struct client_dict_iterate_context *ctx = cmd->iter; + struct client_dict *dict = cmd->dict; + struct client_dict_iter_result *result; + const char *iter_key = NULL, *const *iter_values = NULL; + + if (ctx->deinit) { + cmd->background = TRUE; + client_dict_cmd_backgrounded(dict); + } + + if (error != NULL) { + /* failed */ + } else switch (reply) { + case DICT_PROTOCOL_REPLY_ITER_FINISHED: + /* end of iteration */ + i_assert(!ctx->finished); + ctx->finished = TRUE; + client_dict_iter_api_callback(ctx, cmd, extra_args); + client_dict_iterate_unref(ctx); + return; + case DICT_PROTOCOL_REPLY_OK: + /* key \t value */ + iter_key = value; + iter_values = extra_args; + extra_args++; + break; + case DICT_PROTOCOL_REPLY_FAIL: + error = t_strdup_printf("dict-server returned failure: %s", value); + break; + default: + break; + } + if ((iter_values == NULL || iter_values[0] == NULL) && error == NULL) { + /* broken protocol */ + error = t_strdup_printf("dict client (%s) sent broken iterate reply: %c%s", + dict->conn.conn.name, reply, value); + client_dict_disconnect(dict, error); + } + + if (error != NULL) { + if (ctx->error == NULL) + ctx->error = i_strdup(error); + i_assert(!ctx->finished); + ctx->finished = TRUE; + client_dict_iter_api_callback(ctx, cmd, extra_args); + client_dict_iterate_unref(ctx); + return; + } + cmd->unfinished = TRUE; + + if (ctx->deinit) { + /* iterator was already deinitialized */ + return; + } + + result = array_append_space(&ctx->results); + result->key = p_strdup(ctx->results_pool, iter_key); + result->values = p_strarray_dup(ctx->results_pool, iter_values); + + client_dict_iter_api_callback(ctx, cmd, NULL); +} + +static struct dict_iterate_context * +client_dict_iterate_init(struct dict *_dict, + const struct dict_op_settings *set ATTR_UNUSED, + const char *path, enum dict_iterate_flags flags) +{ + struct client_dict_iterate_context *ctx; + + ctx = i_new(struct client_dict_iterate_context, 1); + ctx->ctx.dict = _dict; + ctx->results_pool = pool_alloconly_create("client dict iteration", 512); + ctx->flags = flags; + ctx->path = i_strdup(path); + ctx->refcount = 1; + i_array_init(&ctx->results, 64); + return &ctx->ctx; +} + +static void +client_dict_iterate_cmd_send(struct client_dict_iterate_context *ctx) +{ + struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; + const struct dict_op_settings_private *set = &ctx->ctx.set; + struct client_dict_cmd *cmd; + string_t *query = t_str_new(256); + + /* we can't do this query in _iterate_init(), because + _set_limit() hasn't been called yet at that point. */ + str_printfa(query, "%c%d\t%"PRIu64, DICT_PROTOCOL_CMD_ITERATE, + ctx->flags, ctx->ctx.max_rows); + str_append_c(query, '\t'); + str_append_tabescaped(query, ctx->path); + str_append_c(query, '\t'); + str_append_tabescaped(query, set->username == NULL ? "" : set->username); + + cmd = client_dict_cmd_init(dict, str_c(query)); + cmd->iter = ctx; + cmd->callback = client_dict_iter_async_callback; + cmd->retry_errors = TRUE; + + ctx->refcount++; + client_dict_cmd_send(dict, &cmd, NULL); +} + +static bool client_dict_iterate(struct dict_iterate_context *_ctx, + const char **key_r, const char *const **values_r) +{ + struct client_dict_iterate_context *ctx = + (struct client_dict_iterate_context *)_ctx; + const struct client_dict_iter_result *results; + unsigned int count; + + if (ctx->error != NULL) { + ctx->ctx.has_more = FALSE; + return FALSE; + } + + results = array_get(&ctx->results, &count); + if (ctx->result_idx < count) { + *key_r = results[ctx->result_idx].key; + *values_r = results[ctx->result_idx].values; + ctx->ctx.has_more = TRUE; + ctx->result_idx++; + ctx->seen_results = TRUE; + return TRUE; + } + if (!ctx->cmd_sent) { + ctx->cmd_sent = TRUE; + client_dict_iterate_cmd_send(ctx); + return client_dict_iterate(_ctx, key_r, values_r); + } + ctx->ctx.has_more = !ctx->finished; + ctx->result_idx = 0; + array_clear(&ctx->results); + p_clear(ctx->results_pool); + + if ((ctx->flags & DICT_ITERATE_FLAG_ASYNC) == 0 && ctx->ctx.has_more) { + client_dict_wait(_ctx->dict); + return client_dict_iterate(_ctx, key_r, values_r); + } + return FALSE; +} + +static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx, + const char **error_r) +{ + struct client_dict *dict = (struct client_dict *)_ctx->dict; + struct client_dict_iterate_context *ctx = + (struct client_dict_iterate_context *)_ctx; + int ret = ctx->error != NULL ? -1 : 0; + + i_assert(!ctx->deinit); + ctx->deinit = TRUE; + + *error_r = t_strdup(ctx->error); + array_free(&ctx->results); + pool_unref(&ctx->results_pool); + i_free(ctx->path); + client_dict_iterate_unref(ctx); + + client_dict_add_timeout(dict); + return ret; +} + +static struct dict_transaction_context * +client_dict_transaction_init(struct dict *_dict) +{ + struct client_dict *dict = (struct client_dict *)_dict; + struct client_dict_transaction_context *ctx; + + ctx = i_new(struct client_dict_transaction_context, 1); + ctx->ctx.dict = _dict; + ctx->id = ++dict->transaction_id_counter; + + DLLIST_PREPEND(&dict->transactions, ctx); + return &ctx->ctx; +} + +static void +client_dict_transaction_free(struct client_dict_transaction_context **_ctx) +{ + struct client_dict_transaction_context *ctx = *_ctx; + + *_ctx = NULL; + i_free(ctx->first_query); + i_free(ctx->error); + i_free(ctx); +} + +static void +client_dict_transaction_commit_callback(struct client_dict_cmd *cmd, + enum dict_protocol_reply reply, + const char *value, + const char *const *extra_args, + const char *error, bool disconnected) +{ + struct client_dict *dict = cmd->dict; + struct dict_commit_result result = { + .ret = DICT_COMMIT_RET_FAILED, .error = NULL + }; + + i_assert(cmd->trans != NULL); + + if (error != NULL) { + /* failed */ + if (disconnected) + result.ret = DICT_COMMIT_RET_WRITE_UNCERTAIN; + result.error = error; + } else switch (reply) { + case DICT_PROTOCOL_REPLY_OK: + result.ret = DICT_COMMIT_RET_OK; + break; + case DICT_PROTOCOL_REPLY_NOTFOUND: + result.ret = DICT_COMMIT_RET_NOTFOUND; + break; + case DICT_PROTOCOL_REPLY_WRITE_UNCERTAIN: + result.ret = DICT_COMMIT_RET_WRITE_UNCERTAIN; + /* fallthrough */ + case DICT_PROTOCOL_REPLY_FAIL: { + /* value contains the obsolete trans_id */ + const char *error = extra_args[0]; + + result.error = t_strdup_printf("dict-server returned failure: %s", + error != NULL ? t_str_tabunescape(error) : ""); + if (error != NULL) + extra_args++; + break; + } + default: + result.ret = DICT_COMMIT_RET_FAILED; + result.error = t_strdup_printf( + "dict-client: Invalid commit reply: %c%s", + reply, value); + client_dict_disconnect(dict, result.error); + break; + } + + int diff = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time); + if (result.error != NULL) { + /* include timing info always in error messages */ + result.error = t_strdup_printf("%s (reply took %s)", + result.error, dict_warnings_sec(cmd, diff, extra_args)); + } else if (!cmd->background && !cmd->trans->ctx.no_slowness_warning && + diff >= (int)dict->warn_slow_msecs) { + e_warning(dict->conn.conn.event, "dict commit took %s: " + "%s (%u commands, first: %s)", + dict_warnings_sec(cmd, diff, extra_args), + cmd->query, cmd->trans->query_count, + cmd->trans->first_query); + } + client_dict_transaction_free(&cmd->trans); + + dict_pre_api_callback(&dict->dict); + cmd->api_callback.commit(&result, cmd->api_callback.context); + dict_post_api_callback(&dict->dict); +} + + +static void +client_dict_transaction_commit(struct dict_transaction_context *_ctx, + bool async, + dict_transaction_commit_callback_t *callback, + void *context) +{ + struct client_dict_transaction_context *ctx = + (struct client_dict_transaction_context *)_ctx; + struct client_dict *dict = (struct client_dict *)_ctx->dict; + struct client_dict_cmd *cmd; + const char *query; + + DLLIST_REMOVE(&dict->transactions, ctx); + + if (ctx->sent_begin && ctx->error == NULL) { + query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_COMMIT, ctx->id); + cmd = client_dict_cmd_init(dict, query); + cmd->trans = ctx; + + cmd->callback = client_dict_transaction_commit_callback; + cmd->api_callback.commit = callback; + cmd->api_callback.context = context; + if (callback == dict_transaction_commit_async_noop_callback) + cmd->background = TRUE; + if (client_dict_cmd_send(dict, &cmd, NULL)) { + if (!async) + client_dict_wait(_ctx->dict); + } + } else if (ctx->error != NULL) { + /* already failed */ + struct dict_commit_result result = { + .ret = DICT_COMMIT_RET_FAILED, .error = ctx->error + }; + callback(&result, context); + client_dict_transaction_free(&ctx); + } else { + /* nothing changed */ + struct dict_commit_result result = { + .ret = DICT_COMMIT_RET_OK, .error = NULL + }; + callback(&result, context); + client_dict_transaction_free(&ctx); + } + + client_dict_add_timeout(dict); +} + +static void +client_dict_transaction_rollback(struct dict_transaction_context *_ctx) +{ + struct client_dict_transaction_context *ctx = + (struct client_dict_transaction_context *)_ctx; + struct client_dict *dict = (struct client_dict *)_ctx->dict; + + if (ctx->sent_begin) { + const char *query; + + query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_ROLLBACK, + ctx->id); + client_dict_send_transaction_query(ctx, query); + } + + DLLIST_REMOVE(&dict->transactions, ctx); + client_dict_transaction_free(&ctx); + client_dict_add_timeout(dict); +} + +static void client_dict_set(struct dict_transaction_context *_ctx, + const char *key, const char *value) +{ + struct client_dict_transaction_context *ctx = + (struct client_dict_transaction_context *)_ctx; + const char *query; + + query = t_strdup_printf("%c%u\t%s\t%s", + DICT_PROTOCOL_CMD_SET, ctx->id, + str_tabescape(key), + str_tabescape(value)); + client_dict_send_transaction_query(ctx, query); +} + +static void client_dict_unset(struct dict_transaction_context *_ctx, + const char *key) +{ + struct client_dict_transaction_context *ctx = + (struct client_dict_transaction_context *)_ctx; + const char *query; + + query = t_strdup_printf("%c%u\t%s", + DICT_PROTOCOL_CMD_UNSET, ctx->id, + str_tabescape(key)); + client_dict_send_transaction_query(ctx, query); +} + +static void client_dict_atomic_inc(struct dict_transaction_context *_ctx, + const char *key, long long diff) +{ + struct client_dict_transaction_context *ctx = + (struct client_dict_transaction_context *)_ctx; + const char *query; + + query = t_strdup_printf("%c%u\t%s\t%lld", + DICT_PROTOCOL_CMD_ATOMIC_INC, + ctx->id, str_tabescape(key), diff); + client_dict_send_transaction_query(ctx, query); +} + +static void client_dict_set_timestamp(struct dict_transaction_context *_ctx, + const struct timespec *ts) +{ + struct client_dict_transaction_context *ctx = + (struct client_dict_transaction_context *)_ctx; + const char *query; + + query = t_strdup_printf("%c%u\t%s\t%u", + DICT_PROTOCOL_CMD_TIMESTAMP, + ctx->id, dec2str(ts->tv_sec), + (unsigned int)ts->tv_nsec); + client_dict_send_transaction_query(ctx, query); +} + +struct dict dict_driver_client = { + .name = "proxy", + + { + .init = client_dict_init, + .deinit = client_dict_deinit, + .wait = client_dict_wait, + .lookup = client_dict_lookup, + .iterate_init = client_dict_iterate_init, + .iterate = client_dict_iterate, + .iterate_deinit = client_dict_iterate_deinit, + .transaction_init = client_dict_transaction_init, + .transaction_commit = client_dict_transaction_commit, + .transaction_rollback = client_dict_transaction_rollback, + .set = client_dict_set, + .unset = client_dict_unset, + .atomic_inc = client_dict_atomic_inc, + .lookup_async = client_dict_lookup_async, + .switch_ioloop = client_dict_switch_ioloop, + .set_timestamp = client_dict_set_timestamp, + } +}; |