summaryrefslogtreecommitdiffstats
path: root/src/lib-dict/dict-client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib-dict/dict-client.c')
-rw-r--r--src/lib-dict/dict-client.c1492
1 files changed, 1492 insertions, 0 deletions
diff --git a/src/lib-dict/dict-client.c b/src/lib-dict/dict-client.c
new file mode 100644
index 0000000..32b2da1
--- /dev/null
+++ b/src/lib-dict/dict-client.c
@@ -0,0 +1,1492 @@
+/* Copyright (c) 2005-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "llist.h"
+#include "str.h"
+#include "strescape.h"
+#include "file-lock.h"
+#include "time-util.h"
+#include "connection.h"
+#include "ostream.h"
+#include "eacces-error.h"
+#include "dict-private.h"
+#include "dict-client.h"
+
+#include <unistd.h>
+#include <fcntl.h>
+
+/* Disconnect from dict server after this many milliseconds of idling after
+ sending a command. Because dict server does blocking dict accesses, it can
+ handle only one client at a time. This is why the default timeout is zero,
+ so that there won't be many dict processes just doing nothing. Zero means
+ that the socket is disconnected immediately after returning to ioloop. */
+#define DICT_CLIENT_DEFAULT_TIMEOUT_MSECS 0
+
+/* Abort dict lookup after this many seconds. */
+#define DICT_CLIENT_REQUEST_TIMEOUT_MSECS 30000
+/* When dict lookup timeout is reached, wait a bit longer if the last dict
+ ioloop wait was shorter than this. */
+#define DICT_CLIENT_REQUEST_TIMEOUT_MIN_LAST_IOLOOP_WAIT_MSECS 1000
+/* Log a warning if dict lookup takes longer than this many milliseconds. */
+#define DICT_CLIENT_DEFAULT_WARN_SLOW_MSECS 5000
+
+struct client_dict_cmd {
+ int refcount;
+ struct client_dict *dict;
+ struct timeval start_time;
+ char *query;
+ unsigned int async_id;
+ struct timeval async_id_received_time;
+
+ uint64_t start_global_ioloop_usecs;
+ uint64_t start_dict_ioloop_usecs;
+ uint64_t start_lock_usecs;
+
+ bool reconnected;
+ bool retry_errors;
+ bool no_replies;
+ bool unfinished;
+ bool background;
+
+ void (*callback)(struct client_dict_cmd *cmd,
+ enum dict_protocol_reply reply, const char *value,
+ const char *const *extra_args, const char *error,
+ bool disconnected);
+ struct client_dict_iterate_context *iter;
+ struct client_dict_transaction_context *trans;
+
+ struct {
+ dict_lookup_callback_t *lookup;
+ dict_transaction_commit_callback_t *commit;
+ void *context;
+ } api_callback;
+};
+
+struct dict_client_connection {
+ struct connection conn;
+ struct client_dict *dict;
+};
+
+struct client_dict {
+ struct dict dict;
+ struct dict_client_connection conn;
+
+ char *uri;
+ enum dict_data_type value_type;
+ unsigned warn_slow_msecs;
+
+ time_t last_failed_connect;
+ char *last_connect_error;
+
+ struct io_wait_timer *wait_timer;
+ uint64_t last_timer_switch_usecs;
+ struct timeout *to_requests;
+ struct timeout *to_idle;
+ unsigned int idle_msecs;
+
+ ARRAY(struct client_dict_cmd *) cmds;
+ struct client_dict_transaction_context *transactions;
+
+ unsigned int transaction_id_counter;
+};
+
+struct client_dict_iter_result {
+ const char *key, *const *values;
+};
+
+struct client_dict_iterate_context {
+ struct dict_iterate_context ctx;
+ char *error;
+ char *path;
+ enum dict_iterate_flags flags;
+ int refcount;
+
+ pool_t results_pool;
+ ARRAY(struct client_dict_iter_result) results;
+ unsigned int result_idx;
+
+ bool cmd_sent;
+ bool seen_results;
+ bool finished;
+ bool deinit;
+};
+
+struct client_dict_transaction_context {
+ struct dict_transaction_context ctx;
+ struct client_dict_transaction_context *prev, *next;
+
+ char *first_query;
+ char *error;
+
+ unsigned int id;
+ unsigned int query_count;
+
+ bool sent_begin:1;
+};
+
+static struct connection_list *dict_connections;
+
+static int client_dict_connect(struct client_dict *dict, const char **error_r);
+static int client_dict_reconnect(struct client_dict *dict, const char *reason,
+ const char **error_r);
+static void client_dict_disconnect(struct client_dict *dict, const char *reason);
+static const char *dict_wait_warnings(const struct client_dict_cmd *cmd);
+
+static struct client_dict_cmd *
+client_dict_cmd_init(struct client_dict *dict, const char *query)
+{
+ struct client_dict_cmd *cmd;
+
+ io_loop_time_refresh();
+
+ cmd = i_new(struct client_dict_cmd, 1);
+ cmd->refcount = 1;
+ cmd->dict = dict;
+ cmd->query = i_strdup(query);
+ cmd->start_time = ioloop_timeval;
+ cmd->start_global_ioloop_usecs = ioloop_global_wait_usecs;
+ cmd->start_dict_ioloop_usecs = io_wait_timer_get_usecs(dict->wait_timer);
+ cmd->start_lock_usecs = file_lock_wait_get_total_usecs();
+ return cmd;
+}
+
+static void client_dict_cmd_ref(struct client_dict_cmd *cmd)
+{
+ i_assert(cmd->refcount > 0);
+ cmd->refcount++;
+}
+
+static bool client_dict_cmd_unref(struct client_dict_cmd *cmd)
+{
+ i_assert(cmd->refcount > 0);
+ if (--cmd->refcount > 0)
+ return TRUE;
+
+ i_assert(cmd->trans == NULL);
+
+ i_free(cmd->query);
+ i_free(cmd);
+ return FALSE;
+}
+
+static bool
+dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *const *args)
+{
+ const char *value = args[0];
+ enum dict_protocol_reply reply;
+
+ if (value == NULL) {
+ /* "" is a valid iteration reply */
+ reply = DICT_PROTOCOL_REPLY_ITER_FINISHED;
+ } else {
+ reply = value[0];
+ value++;
+ args++;
+ }
+
+ cmd->unfinished = FALSE;
+ cmd->callback(cmd, reply, value, args, NULL, FALSE);
+ return !cmd->unfinished;
+}
+
+static void
+dict_cmd_callback_error(struct client_dict_cmd *cmd, const char *error,
+ bool disconnected)
+{
+ const char *null_arg = NULL;
+
+ cmd->unfinished = FALSE;
+ if (cmd->callback != NULL) {
+ cmd->callback(cmd, DICT_PROTOCOL_REPLY_ERROR,
+ "", &null_arg, error, disconnected);
+ }
+ i_assert(!cmd->unfinished);
+}
+
+static struct client_dict_cmd *
+client_dict_cmd_first_nonbg(struct client_dict *dict)
+{
+ struct client_dict_cmd *const *cmds;
+ unsigned int i, count;
+
+ cmds = array_get(&dict->cmds, &count);
+ for (i = 0; i < count; i++) {
+ if (!cmds[i]->background)
+ return cmds[i];
+ }
+ return NULL;
+}
+
+static void client_dict_input_timeout(struct client_dict *dict)
+{
+ struct client_dict_cmd *cmd;
+ const char *error;
+ uint64_t msecs_in_last_dict_ioloop_wait;
+ int cmd_diff;
+
+ /* find the first non-background command. there must be at least one. */
+ cmd = client_dict_cmd_first_nonbg(dict);
+ i_assert(cmd != NULL);
+
+ cmd_diff = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time);
+ if (cmd_diff < DICT_CLIENT_REQUEST_TIMEOUT_MSECS) {
+ /* need to re-create this timeout. the currently-oldest
+ command was added when another command was still
+ running with an older timeout. */
+ timeout_remove(&dict->to_requests);
+ dict->to_requests =
+ timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS - cmd_diff,
+ client_dict_input_timeout, dict);
+ return;
+ }
+
+ /* If we've gotten here because all the time was spent in other ioloops
+ or locks, make sure there's a bit of time waiting for the dict
+ ioloop as well. There's a good chance that the reply can be read. */
+ msecs_in_last_dict_ioloop_wait =
+ (io_wait_timer_get_usecs(dict->wait_timer) -
+ dict->last_timer_switch_usecs + 999) / 1000;
+ if (msecs_in_last_dict_ioloop_wait < DICT_CLIENT_REQUEST_TIMEOUT_MIN_LAST_IOLOOP_WAIT_MSECS) {
+ timeout_remove(&dict->to_requests);
+ dict->to_requests =
+ timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MIN_LAST_IOLOOP_WAIT_MSECS -
+ msecs_in_last_dict_ioloop_wait,
+ client_dict_input_timeout, dict);
+ return;
+ }
+
+ (void)client_dict_reconnect(dict, t_strdup_printf(
+ "Dict server timeout: %s "
+ "(%u commands pending, oldest sent %u.%03u secs ago: %s, %s)",
+ connection_input_timeout_reason(&dict->conn.conn),
+ array_count(&dict->cmds),
+ cmd_diff/1000, cmd_diff%1000, cmd->query,
+ dict_wait_warnings(cmd)), &error);
+}
+
+static int
+client_dict_cmd_query_send(struct client_dict *dict, const char *query)
+{
+ struct const_iovec iov[2];
+ ssize_t ret;
+
+ iov[0].iov_base = query;
+ iov[0].iov_len = strlen(query);
+ iov[1].iov_base = "\n";
+ iov[1].iov_len = 1;
+ ret = o_stream_sendv(dict->conn.conn.output, iov, 2);
+ if (ret < 0)
+ return -1;
+ i_assert((size_t)ret == iov[0].iov_len + 1);
+ return 0;
+}
+
+static bool
+client_dict_cmd_send(struct client_dict *dict, struct client_dict_cmd **_cmd,
+ const char **error_r)
+{
+ struct client_dict_cmd *cmd = *_cmd;
+ const char *error = NULL;
+ bool retry = cmd->retry_errors;
+ int ret;
+
+ *_cmd = NULL;
+
+ /* we're no longer idling. even with no_replies=TRUE we're going to
+ wait for COMMIT/ROLLBACK. */
+ timeout_remove(&dict->to_idle);
+
+ if (client_dict_connect(dict, &error) < 0) {
+ retry = FALSE;
+ ret = -1;
+ } else {
+ ret = client_dict_cmd_query_send(dict, cmd->query);
+ if (ret < 0) {
+ error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name,
+ o_stream_get_error(dict->conn.conn.output));
+ }
+ }
+ if (ret < 0 && retry) {
+ /* Reconnect and try again. */
+ if (client_dict_reconnect(dict, error, &error) < 0)
+ ;
+ else if (client_dict_cmd_query_send(dict, cmd->query) < 0) {
+ error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name,
+ o_stream_get_error(dict->conn.conn.output));
+ } else {
+ ret = 0;
+ }
+ }
+
+ if (cmd->no_replies) {
+ /* just send and forget */
+ client_dict_cmd_unref(cmd);
+ return TRUE;
+ } else if (ret < 0) {
+ i_assert(error != NULL);
+ /* we didn't successfully send this command to dict */
+ dict_cmd_callback_error(cmd, error, cmd->reconnected);
+ client_dict_cmd_unref(cmd);
+ if (error_r != NULL)
+ *error_r = error;
+ return FALSE;
+ } else {
+ if (dict->to_requests == NULL && !cmd->background) {
+ dict->to_requests =
+ timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS,
+ client_dict_input_timeout, dict);
+ }
+ array_push_back(&dict->cmds, &cmd);
+ return TRUE;
+ }
+}
+
+static bool
+client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx,
+ const struct dict_op_settings_private *set)
+{
+ struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+ struct client_dict_cmd *cmd;
+ const char *query, *error;
+
+ i_assert(ctx->error == NULL);
+
+ ctx->sent_begin = TRUE;
+
+ /* transactions commands don't have replies. only COMMIT has. */
+ query = t_strdup_printf("%c%u\t%s", DICT_PROTOCOL_CMD_BEGIN,
+ ctx->id,
+ set->username == NULL ? "" : str_tabescape(set->username));
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->no_replies = TRUE;
+ cmd->retry_errors = TRUE;
+ if (!client_dict_cmd_send(dict, &cmd, &error)) {
+ ctx->error = i_strdup(error);
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static void
+client_dict_send_transaction_query(struct client_dict_transaction_context *ctx,
+ const char *query)
+{
+ struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+ const struct dict_op_settings_private *set = &ctx->ctx.set;
+ struct client_dict_cmd *cmd;
+ const char *error;
+
+ if (ctx->error != NULL)
+ return;
+
+ if (!ctx->sent_begin) {
+ if (!client_dict_transaction_send_begin(ctx, set))
+ return;
+ }
+
+ ctx->query_count++;
+ if (ctx->first_query == NULL)
+ ctx->first_query = i_strdup(query);
+
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->no_replies = TRUE;
+ if (!client_dict_cmd_send(dict, &cmd, &error))
+ ctx->error = i_strdup(error);
+}
+
+static bool client_dict_is_finished(struct client_dict *dict)
+{
+ return dict->transactions == NULL && array_count(&dict->cmds) == 0;
+}
+
+static void client_dict_timeout(struct client_dict *dict)
+{
+ if (client_dict_is_finished(dict))
+ client_dict_disconnect(dict, "Idle disconnection");
+ else
+ timeout_remove(&dict->to_idle);
+}
+
+static bool client_dict_have_nonbackground_cmds(struct client_dict *dict)
+{
+ struct client_dict_cmd *cmd;
+
+ array_foreach_elem(&dict->cmds, cmd) {
+ if (!cmd->background)
+ return TRUE;
+ }
+ return FALSE;
+}
+
+static void client_dict_add_timeout(struct client_dict *dict)
+{
+ if (dict->to_idle != NULL) {
+ if (dict->idle_msecs > 0)
+ timeout_reset(dict->to_idle);
+ } else if (client_dict_is_finished(dict)) {
+ dict->to_idle = timeout_add(dict->idle_msecs,
+ client_dict_timeout, dict);
+ timeout_remove(&dict->to_requests);
+ } else if (dict->transactions == NULL &&
+ !client_dict_have_nonbackground_cmds(dict)) {
+ /* we had non-background commands, but now we're back to
+ having only background commands. remove timeouts. */
+ timeout_remove(&dict->to_requests);
+ }
+}
+
+static void client_dict_cmd_backgrounded(struct client_dict *dict)
+{
+ if (dict->to_requests == NULL)
+ return;
+
+ if (!client_dict_have_nonbackground_cmds(dict)) {
+ /* we only have background-commands.
+ remove the request timeout. */
+ timeout_remove(&dict->to_requests);
+ }
+}
+
+static int
+dict_conn_assign_next_async_id(struct dict_client_connection *conn,
+ const char *line)
+{
+ struct client_dict_cmd *const *cmds;
+ unsigned int i, count, async_id;
+
+ i_assert(line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID);
+
+ if (str_to_uint(line+1, &async_id) < 0 || async_id == 0) {
+ e_error(conn->conn.event, "Received invalid async-id line: %s",
+ line);
+ return -1;
+ }
+ cmds = array_get(&conn->dict->cmds, &count);
+ for (i = 0; i < count; i++) {
+ if (cmds[i]->async_id == 0) {
+ cmds[i]->async_id = async_id;
+ cmds[i]->async_id_received_time = ioloop_timeval;
+ return 0;
+ }
+ }
+ e_error(conn->conn.event, "Received async-id line, but all %u "
+ "commands already have it: %s",
+ count, line);
+ return -1;
+}
+
+static int dict_conn_find_async_id(struct dict_client_connection *conn,
+ const char *async_arg,
+ const char *line, unsigned int *idx_r)
+{
+ struct client_dict_cmd *const *cmds;
+ unsigned int i, count, async_id;
+
+ i_assert(async_arg[0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY);
+
+ if (str_to_uint(async_arg+1, &async_id) < 0 || async_id == 0) {
+ e_error(conn->conn.event, "Received invalid async-reply line: %s",
+ line);
+ return -1;
+ }
+
+ cmds = array_get(&conn->dict->cmds, &count);
+ for (i = 0; i < count; i++) {
+ if (cmds[i]->async_id == async_id) {
+ *idx_r = i;
+ return 0;
+ }
+ }
+ e_error(conn->conn.event, "Received reply for nonexistent async-id %u: %s",
+ async_id, line);
+ return -1;
+}
+
+static int dict_conn_input_line(struct connection *_conn, const char *line)
+{
+ struct dict_client_connection *conn =
+ (struct dict_client_connection *)_conn;
+ struct client_dict *dict = conn->dict;
+ struct client_dict_cmd *const *cmds;
+ const char *const *args;
+ unsigned int i, count;
+ bool finished;
+
+ if (dict->to_requests != NULL)
+ timeout_reset(dict->to_requests);
+
+ if (line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID)
+ return dict_conn_assign_next_async_id(conn, line) < 0 ? -1 : 1;
+
+ cmds = array_get(&conn->dict->cmds, &count);
+ if (count == 0) {
+ e_error(conn->conn.event, "Received reply without pending commands: %s",
+ line);
+ return -1;
+ }
+
+ args = t_strsplit_tabescaped(line);
+ if (args[0] != NULL && args[0][0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY) {
+ if (dict_conn_find_async_id(conn, args[0], line, &i) < 0)
+ return -1;
+ args++;
+ } else {
+ i = 0;
+ }
+ i_assert(!cmds[i]->no_replies);
+
+ client_dict_cmd_ref(cmds[i]);
+ finished = dict_cmd_callback_line(cmds[i], args);
+ if (!client_dict_cmd_unref(cmds[i])) {
+ /* disconnected during command handling */
+ return -1;
+ }
+ if (!finished) {
+ /* more lines needed for this command */
+ return 1;
+ }
+ client_dict_cmd_unref(cmds[i]);
+ array_delete(&dict->cmds, i, 1);
+
+ client_dict_add_timeout(dict);
+ return 1;
+}
+
+static int client_dict_connect(struct client_dict *dict, const char **error_r)
+{
+ const char *query, *error;
+
+ if (dict->conn.conn.fd_in != -1)
+ return 0;
+ if (dict->last_failed_connect == ioloop_time) {
+ /* Try again later */
+ *error_r = dict->last_connect_error;
+ return -1;
+ }
+
+ if (connection_client_connect(&dict->conn.conn) < 0) {
+ dict->last_failed_connect = ioloop_time;
+ if (errno == EACCES) {
+ error = eacces_error_get("net_connect_unix",
+ dict->conn.conn.name);
+ } else {
+ error = t_strdup_printf(
+ "net_connect_unix(%s) failed: %m", dict->conn.conn.name);
+ }
+ i_free(dict->last_connect_error);
+ dict->last_connect_error = i_strdup(error);
+ *error_r = error;
+ return -1;
+ }
+
+ query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
+ DICT_PROTOCOL_CMD_HELLO,
+ DICT_CLIENT_PROTOCOL_MAJOR_VERSION,
+ DICT_CLIENT_PROTOCOL_MINOR_VERSION,
+ dict->value_type,
+ "",
+ str_tabescape(dict->uri));
+ o_stream_nsend_str(dict->conn.conn.output, query);
+ client_dict_add_timeout(dict);
+ return 0;
+}
+
+static void
+client_dict_abort_commands(struct client_dict *dict, const char *reason)
+{
+ ARRAY(struct client_dict_cmd *) cmds_copy;
+ struct client_dict_cmd *cmd;
+
+ /* abort all commands */
+ t_array_init(&cmds_copy, array_count(&dict->cmds));
+ array_append_array(&cmds_copy, &dict->cmds);
+ array_clear(&dict->cmds);
+
+ array_foreach_elem(&cmds_copy, cmd) {
+ dict_cmd_callback_error(cmd, reason, TRUE);
+ client_dict_cmd_unref(cmd);
+ }
+}
+
+static void client_dict_disconnect(struct client_dict *dict, const char *reason)
+{
+ struct client_dict_transaction_context *ctx, *next;
+
+ client_dict_abort_commands(dict, reason);
+
+ /* all transactions that have sent BEGIN are no longer valid */
+ for (ctx = dict->transactions; ctx != NULL; ctx = next) {
+ next = ctx->next;
+ if (ctx->sent_begin && ctx->error == NULL)
+ ctx->error = i_strdup(reason);
+ }
+
+ timeout_remove(&dict->to_idle);
+ timeout_remove(&dict->to_requests);
+ connection_disconnect(&dict->conn.conn);
+}
+
+static int client_dict_reconnect(struct client_dict *dict, const char *reason,
+ const char **error_r)
+{
+ ARRAY(struct client_dict_cmd *) retry_cmds;
+ struct client_dict_cmd *cmd;
+ const char *error;
+ int ret;
+
+ t_array_init(&retry_cmds, array_count(&dict->cmds));
+ for (unsigned int i = 0; i < array_count(&dict->cmds); ) {
+ cmd = array_idx_elem(&dict->cmds, i);
+ if (!cmd->retry_errors) {
+ i++;
+ } else if (cmd->iter != NULL &&
+ cmd->iter->seen_results) {
+ /* don't retry iteration that already returned
+ something to the caller. otherwise we'd return
+ duplicates. */
+ i++;
+ } else {
+ array_push_back(&retry_cmds, &cmd);
+ array_delete(&dict->cmds, i, 1);
+ }
+ }
+ client_dict_disconnect(dict, reason);
+ if (client_dict_connect(dict, error_r) < 0) {
+ reason = t_strdup_printf("%s - reconnect failed: %s",
+ reason, *error_r);
+ array_foreach_elem(&retry_cmds, cmd) {
+ dict_cmd_callback_error(cmd, reason, TRUE);
+ client_dict_cmd_unref(cmd);
+ }
+ return -1;
+ }
+ if (array_count(&retry_cmds) == 0)
+ return 0;
+ e_warning(dict->conn.conn.event, "%s - reconnected", reason);
+
+ ret = 0; error = "";
+ array_foreach_elem(&retry_cmds, cmd) {
+ cmd->reconnected = TRUE;
+ cmd->async_id = 0;
+ /* if it fails again, don't retry anymore */
+ cmd->retry_errors = FALSE;
+ if (ret < 0) {
+ dict_cmd_callback_error(cmd, error, TRUE);
+ client_dict_cmd_unref(cmd);
+ } else if (!client_dict_cmd_send(dict, &cmd, &error))
+ ret = -1;
+ }
+ return ret;
+}
+
+static void dict_conn_destroy(struct connection *_conn)
+{
+ struct dict_client_connection *conn =
+ (struct dict_client_connection *)_conn;
+
+ client_dict_disconnect(conn->dict, connection_disconnect_reason(_conn));
+}
+
+static const struct connection_settings dict_conn_set = {
+ .input_max_size = SIZE_MAX,
+ .output_max_size = SIZE_MAX,
+ .unix_client_connect_msecs = 1000,
+ .client = TRUE
+};
+
+static const struct connection_vfuncs dict_conn_vfuncs = {
+ .destroy = dict_conn_destroy,
+ .input_line = dict_conn_input_line
+};
+
+static int
+client_dict_init(struct dict *driver, const char *uri,
+ const struct dict_settings *set,
+ struct dict **dict_r, const char **error_r)
+{
+ struct ioloop *old_ioloop = current_ioloop;
+ struct client_dict *dict;
+ const char *p, *dest_uri, *path;
+ unsigned int idle_msecs = DICT_CLIENT_DEFAULT_TIMEOUT_MSECS;
+ unsigned int warn_slow_msecs = DICT_CLIENT_DEFAULT_WARN_SLOW_MSECS;
+
+ /* uri = [idle_msecs=<n>:] [warn_slow_msecs=<n>:] [<path>] ":" <uri> */
+ for (;;) {
+ if (str_begins(uri, "idle_msecs=")) {
+ p = strchr(uri+11, ':');
+ if (p == NULL) {
+ *error_r = t_strdup_printf("Invalid URI: %s", uri);
+ return -1;
+ }
+ if (str_to_uint(t_strdup_until(uri+11, p), &idle_msecs) < 0) {
+ *error_r = "Invalid idle_msecs";
+ return -1;
+ }
+ uri = p+1;
+ } else if (str_begins(uri, "warn_slow_msecs=")) {
+ p = strchr(uri+11, ':');
+ if (p == NULL) {
+ *error_r = t_strdup_printf("Invalid URI: %s", uri);
+ return -1;
+ }
+ if (str_to_uint(t_strdup_until(uri+16, p), &warn_slow_msecs) < 0) {
+ *error_r = "Invalid warn_slow_msecs";
+ return -1;
+ }
+ uri = p+1;
+ } else {
+ break;
+ }
+ }
+ dest_uri = strchr(uri, ':');
+ if (dest_uri == NULL) {
+ *error_r = t_strdup_printf("Invalid URI: %s", uri);
+ return -1;
+ }
+
+ if (dict_connections == NULL) {
+ dict_connections = connection_list_init(&dict_conn_set,
+ &dict_conn_vfuncs);
+ }
+
+ dict = i_new(struct client_dict, 1);
+ dict->dict = *driver;
+ dict->conn.dict = dict;
+ dict->conn.conn.event_parent = set->event_parent;
+ dict->idle_msecs = idle_msecs;
+ dict->warn_slow_msecs = warn_slow_msecs;
+ i_array_init(&dict->cmds, 32);
+
+ if (uri[0] == ':') {
+ /* default path */
+ path = t_strconcat(set->base_dir,
+ "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL);
+ } else if (uri[0] == '/') {
+ /* absolute path */
+ path = t_strdup_until(uri, dest_uri);
+ } else {
+ /* relative path to base_dir */
+ path = t_strconcat(set->base_dir, "/",
+ t_strdup_until(uri, dest_uri), NULL);
+ }
+ connection_init_client_unix(dict_connections, &dict->conn.conn, path);
+ dict->uri = i_strdup(dest_uri + 1);
+
+ dict->dict.ioloop = io_loop_create();
+ dict->wait_timer = io_wait_timer_add();
+ io_loop_set_current(old_ioloop);
+ *dict_r = &dict->dict;
+ return 0;
+}
+
+static void client_dict_deinit(struct dict *_dict)
+{
+ struct client_dict *dict = (struct client_dict *)_dict;
+ struct ioloop *old_ioloop = current_ioloop;
+
+ client_dict_disconnect(dict, "Deinit");
+ connection_deinit(&dict->conn.conn);
+ io_wait_timer_remove(&dict->wait_timer);
+
+ i_assert(dict->transactions == NULL);
+ i_assert(array_count(&dict->cmds) == 0);
+
+ io_loop_set_current(dict->dict.ioloop);
+ io_loop_destroy(&dict->dict.ioloop);
+ io_loop_set_current(old_ioloop);
+
+ array_free(&dict->cmds);
+ i_free(dict->last_connect_error);
+ i_free(dict->uri);
+ i_free(dict);
+
+ if (dict_connections->connections == NULL)
+ connection_list_deinit(&dict_connections);
+}
+
+static void client_dict_wait(struct dict *_dict)
+{
+ struct client_dict *dict = (struct client_dict *)_dict;
+
+ if (array_count(&dict->cmds) == 0)
+ return;
+
+ i_assert(io_loop_is_empty(dict->dict.ioloop));
+ dict->dict.prev_ioloop = current_ioloop;
+ io_loop_set_current(dict->dict.ioloop);
+ dict_switch_ioloop(_dict);
+ while (array_count(&dict->cmds) > 0)
+ io_loop_run(dict->dict.ioloop);
+
+ io_loop_set_current(dict->dict.prev_ioloop);
+ dict->dict.prev_ioloop = NULL;
+
+ dict_switch_ioloop(_dict);
+ i_assert(io_loop_is_empty(dict->dict.ioloop));
+}
+
+static bool client_dict_switch_ioloop(struct dict *_dict)
+{
+ struct client_dict *dict = (struct client_dict *)_dict;
+
+ dict->last_timer_switch_usecs =
+ io_wait_timer_get_usecs(dict->wait_timer);
+ dict->wait_timer = io_wait_timer_move(&dict->wait_timer);
+ if (dict->to_idle != NULL)
+ dict->to_idle = io_loop_move_timeout(&dict->to_idle);
+ if (dict->to_requests != NULL)
+ dict->to_requests = io_loop_move_timeout(&dict->to_requests);
+ connection_switch_ioloop(&dict->conn.conn);
+ return array_count(&dict->cmds) > 0;
+}
+
+static const char *dict_wait_warnings(const struct client_dict_cmd *cmd)
+{
+ int global_ioloop_msecs = (ioloop_global_wait_usecs -
+ cmd->start_global_ioloop_usecs + 999) / 1000;
+ int dict_ioloop_msecs = (io_wait_timer_get_usecs(cmd->dict->wait_timer) -
+ cmd->start_dict_ioloop_usecs + 999) / 1000;
+ int other_ioloop_msecs = global_ioloop_msecs - dict_ioloop_msecs;
+ int lock_msecs = (file_lock_wait_get_total_usecs() -
+ cmd->start_lock_usecs + 999) / 1000;
+
+ return t_strdup_printf(
+ "%d.%03d in dict wait, %d.%03d in other ioloops, %d.%03d in locks",
+ dict_ioloop_msecs/1000, dict_ioloop_msecs%1000,
+ other_ioloop_msecs/1000, other_ioloop_msecs%1000,
+ lock_msecs/1000, lock_msecs%1000);
+}
+
+static const char *
+dict_warnings_sec(const struct client_dict_cmd *cmd, int msecs,
+ const char *const *extra_args)
+{
+ string_t *str = t_str_new(64);
+ struct timeval tv_start, tv_end;
+ unsigned int tv_start_usec, tv_end_usec;
+
+ str_printfa(str, "%d.%03d secs (%s", msecs/1000, msecs%1000,
+ dict_wait_warnings(cmd));
+ if (cmd->reconnected) {
+ int reconnected_msecs =
+ timeval_diff_msecs(&ioloop_timeval,
+ &cmd->dict->conn.conn.connect_started);
+ str_printfa(str, ", reconnected %u.%03u secs ago",
+ reconnected_msecs/1000, reconnected_msecs%1000);
+ }
+ if (cmd->async_id != 0) {
+ int async_reply_msecs =
+ timeval_diff_msecs(&ioloop_timeval, &cmd->async_id_received_time);
+ str_printfa(str, ", async-id reply %u.%03u secs ago",
+ async_reply_msecs/1000, async_reply_msecs%1000);
+ }
+ if (extra_args != NULL &&
+ str_array_length(extra_args) >= 4 &&
+ str_to_time(extra_args[0], &tv_start.tv_sec) == 0 &&
+ str_to_uint(extra_args[1], &tv_start_usec) == 0 &&
+ str_to_time(extra_args[2], &tv_end.tv_sec) == 0 &&
+ str_to_uint(extra_args[3], &tv_end_usec) == 0) {
+ tv_start.tv_usec = tv_start_usec;
+ tv_end.tv_usec = tv_end_usec;
+
+ int server_msecs_since_start =
+ timeval_diff_msecs(&ioloop_timeval, &tv_start);
+ int server_msecs = timeval_diff_msecs(&tv_end, &tv_start);
+ str_printfa(str, ", started on dict-server %u.%03d secs ago, "
+ "took %u.%03d secs",
+ server_msecs_since_start/1000,
+ server_msecs_since_start%1000,
+ server_msecs/1000, server_msecs%1000);
+ }
+ str_append_c(str, ')');
+ return str_c(str);
+}
+
+static void
+client_dict_lookup_async_callback(struct client_dict_cmd *cmd,
+ enum dict_protocol_reply reply,
+ const char *value,
+ const char *const *extra_args,
+ const char *error,
+ bool disconnected ATTR_UNUSED)
+{
+ struct client_dict *dict = cmd->dict;
+ struct dict_lookup_result result;
+ const char *const values[] = { value, NULL };
+
+ i_zero(&result);
+ if (error != NULL) {
+ result.ret = -1;
+ result.error = error;
+ } else switch (reply) {
+ case DICT_PROTOCOL_REPLY_OK:
+ result.value = value;
+ result.values = values;
+ result.ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_MULTI_OK:
+ result.values = t_strsplit_tabescaped(value);
+ result.value = result.values[0];
+ result.ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ result.ret = 0;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL:
+ result.error = value[0] == '\0' ? "dict-server returned failure" :
+ t_strdup_printf("dict-server returned failure: %s",
+ value);
+ result.ret = -1;
+ break;
+ default:
+ result.error = t_strdup_printf(
+ "dict-client: Invalid lookup '%s' reply: %c%s",
+ cmd->query, reply, value);
+ client_dict_disconnect(dict, result.error);
+ result.ret = -1;
+ break;
+ }
+
+ int diff = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time);
+ if (result.error != NULL) {
+ /* include timing info always in error messages */
+ result.error = t_strdup_printf("%s (reply took %s)",
+ result.error, dict_warnings_sec(cmd, diff, extra_args));
+ } else if (!cmd->background &&
+ diff >= (int)dict->warn_slow_msecs) {
+ e_warning(dict->conn.conn.event, "dict lookup took %s: %s",
+ dict_warnings_sec(cmd, diff, extra_args),
+ cmd->query);
+ }
+
+ dict_pre_api_callback(&dict->dict);
+ cmd->api_callback.lookup(&result, cmd->api_callback.context);
+ dict_post_api_callback(&dict->dict);
+}
+
+static void
+client_dict_lookup_async(struct dict *_dict, const struct dict_op_settings *set,
+ const char *key, dict_lookup_callback_t *callback,
+ void *context)
+{
+ struct client_dict *dict = (struct client_dict *)_dict;
+ struct client_dict_cmd *cmd;
+ const char *query;
+
+ query = t_strdup_printf("%c%s\t%s", DICT_PROTOCOL_CMD_LOOKUP,
+ str_tabescape(key),
+ set->username == NULL ? "" : str_tabescape(set->username));
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->callback = client_dict_lookup_async_callback;
+ cmd->api_callback.lookup = callback;
+ cmd->api_callback.context = context;
+ cmd->retry_errors = TRUE;
+
+ client_dict_cmd_send(dict, &cmd, NULL);
+}
+
+struct client_dict_sync_lookup {
+ char *error;
+ char *value;
+ int ret;
+};
+
+static void client_dict_lookup_callback(const struct dict_lookup_result *result,
+ struct client_dict_sync_lookup *lookup)
+{
+ lookup->ret = result->ret;
+ if (result->ret == -1)
+ lookup->error = i_strdup(result->error);
+ else if (result->ret == 1)
+ lookup->value = i_strdup(result->value);
+}
+
+static int client_dict_lookup(struct dict *_dict,
+ const struct dict_op_settings *set,
+ pool_t pool, const char *key,
+ const char **value_r, const char **error_r)
+{
+ struct client_dict_sync_lookup lookup;
+
+ i_zero(&lookup);
+ lookup.ret = -2;
+
+ dict_lookup_async(_dict, set, key, client_dict_lookup_callback, &lookup);
+ if (lookup.ret == -2)
+ client_dict_wait(_dict);
+
+ switch (lookup.ret) {
+ case -1:
+ *error_r = t_strdup(lookup.error);
+ i_free(lookup.error);
+ return -1;
+ case 0:
+ i_assert(lookup.value == NULL);
+ *value_r = NULL;
+ return 0;
+ case 1:
+ *value_r = p_strdup(pool, lookup.value);
+ i_free(lookup.value);
+ return 1;
+ }
+ i_unreached();
+}
+
+static void client_dict_iterate_unref(struct client_dict_iterate_context *ctx)
+{
+ i_assert(ctx->refcount > 0);
+ if (--ctx->refcount > 0)
+ return;
+ i_free(ctx->error);
+ i_free(ctx);
+}
+
+static void
+client_dict_iter_api_callback(struct client_dict_iterate_context *ctx,
+ struct client_dict_cmd *cmd,
+ const char *const *extra_args)
+{
+ struct client_dict *dict = cmd->dict;
+
+ if (ctx->deinit) {
+ /* Iterator was already deinitialized. Stop if we're in
+ client_dict_wait(). */
+ dict_post_api_callback(&dict->dict);
+ return;
+ }
+ if (ctx->finished) {
+ int diff = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time);
+ if (ctx->error != NULL) {
+ /* include timing info always in error messages */
+ char *new_error = i_strdup_printf("%s (reply took %s)",
+ ctx->error, dict_warnings_sec(cmd, diff, extra_args));
+ i_free(ctx->error);
+ ctx->error = new_error;
+ } else if (!cmd->background &&
+ diff >= (int)dict->warn_slow_msecs) {
+ e_warning(dict->conn.conn.event, "dict iteration took %s: %s",
+ dict_warnings_sec(cmd, diff, extra_args),
+ cmd->query);
+ }
+ }
+ if (ctx->ctx.async_callback != NULL) {
+ dict_pre_api_callback(&dict->dict);
+ ctx->ctx.async_callback(ctx->ctx.async_context);
+ dict_post_api_callback(&dict->dict);
+ } else {
+ /* synchronous lookup */
+ io_loop_stop(dict->dict.ioloop);
+ }
+}
+
+static void
+client_dict_iter_async_callback(struct client_dict_cmd *cmd,
+ enum dict_protocol_reply reply,
+ const char *value,
+ const char *const *extra_args,
+ const char *error,
+ bool disconnected ATTR_UNUSED)
+{
+ struct client_dict_iterate_context *ctx = cmd->iter;
+ struct client_dict *dict = cmd->dict;
+ struct client_dict_iter_result *result;
+ const char *iter_key = NULL, *const *iter_values = NULL;
+
+ if (ctx->deinit) {
+ cmd->background = TRUE;
+ client_dict_cmd_backgrounded(dict);
+ }
+
+ if (error != NULL) {
+ /* failed */
+ } else switch (reply) {
+ case DICT_PROTOCOL_REPLY_ITER_FINISHED:
+ /* end of iteration */
+ i_assert(!ctx->finished);
+ ctx->finished = TRUE;
+ client_dict_iter_api_callback(ctx, cmd, extra_args);
+ client_dict_iterate_unref(ctx);
+ return;
+ case DICT_PROTOCOL_REPLY_OK:
+ /* key \t value */
+ iter_key = value;
+ iter_values = extra_args;
+ extra_args++;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL:
+ error = t_strdup_printf("dict-server returned failure: %s", value);
+ break;
+ default:
+ break;
+ }
+ if ((iter_values == NULL || iter_values[0] == NULL) && error == NULL) {
+ /* broken protocol */
+ error = t_strdup_printf("dict client (%s) sent broken iterate reply: %c%s",
+ dict->conn.conn.name, reply, value);
+ client_dict_disconnect(dict, error);
+ }
+
+ if (error != NULL) {
+ if (ctx->error == NULL)
+ ctx->error = i_strdup(error);
+ i_assert(!ctx->finished);
+ ctx->finished = TRUE;
+ client_dict_iter_api_callback(ctx, cmd, extra_args);
+ client_dict_iterate_unref(ctx);
+ return;
+ }
+ cmd->unfinished = TRUE;
+
+ if (ctx->deinit) {
+ /* iterator was already deinitialized */
+ return;
+ }
+
+ result = array_append_space(&ctx->results);
+ result->key = p_strdup(ctx->results_pool, iter_key);
+ result->values = p_strarray_dup(ctx->results_pool, iter_values);
+
+ client_dict_iter_api_callback(ctx, cmd, NULL);
+}
+
+static struct dict_iterate_context *
+client_dict_iterate_init(struct dict *_dict,
+ const struct dict_op_settings *set ATTR_UNUSED,
+ const char *path, enum dict_iterate_flags flags)
+{
+ struct client_dict_iterate_context *ctx;
+
+ ctx = i_new(struct client_dict_iterate_context, 1);
+ ctx->ctx.dict = _dict;
+ ctx->results_pool = pool_alloconly_create("client dict iteration", 512);
+ ctx->flags = flags;
+ ctx->path = i_strdup(path);
+ ctx->refcount = 1;
+ i_array_init(&ctx->results, 64);
+ return &ctx->ctx;
+}
+
+static void
+client_dict_iterate_cmd_send(struct client_dict_iterate_context *ctx)
+{
+ struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+ const struct dict_op_settings_private *set = &ctx->ctx.set;
+ struct client_dict_cmd *cmd;
+ string_t *query = t_str_new(256);
+
+ /* we can't do this query in _iterate_init(), because
+ _set_limit() hasn't been called yet at that point. */
+ str_printfa(query, "%c%d\t%"PRIu64, DICT_PROTOCOL_CMD_ITERATE,
+ ctx->flags, ctx->ctx.max_rows);
+ str_append_c(query, '\t');
+ str_append_tabescaped(query, ctx->path);
+ str_append_c(query, '\t');
+ str_append_tabescaped(query, set->username == NULL ? "" : set->username);
+
+ cmd = client_dict_cmd_init(dict, str_c(query));
+ cmd->iter = ctx;
+ cmd->callback = client_dict_iter_async_callback;
+ cmd->retry_errors = TRUE;
+
+ ctx->refcount++;
+ client_dict_cmd_send(dict, &cmd, NULL);
+}
+
+static bool client_dict_iterate(struct dict_iterate_context *_ctx,
+ const char **key_r, const char *const **values_r)
+{
+ struct client_dict_iterate_context *ctx =
+ (struct client_dict_iterate_context *)_ctx;
+ const struct client_dict_iter_result *results;
+ unsigned int count;
+
+ if (ctx->error != NULL) {
+ ctx->ctx.has_more = FALSE;
+ return FALSE;
+ }
+
+ results = array_get(&ctx->results, &count);
+ if (ctx->result_idx < count) {
+ *key_r = results[ctx->result_idx].key;
+ *values_r = results[ctx->result_idx].values;
+ ctx->ctx.has_more = TRUE;
+ ctx->result_idx++;
+ ctx->seen_results = TRUE;
+ return TRUE;
+ }
+ if (!ctx->cmd_sent) {
+ ctx->cmd_sent = TRUE;
+ client_dict_iterate_cmd_send(ctx);
+ return client_dict_iterate(_ctx, key_r, values_r);
+ }
+ ctx->ctx.has_more = !ctx->finished;
+ ctx->result_idx = 0;
+ array_clear(&ctx->results);
+ p_clear(ctx->results_pool);
+
+ if ((ctx->flags & DICT_ITERATE_FLAG_ASYNC) == 0 && ctx->ctx.has_more) {
+ client_dict_wait(_ctx->dict);
+ return client_dict_iterate(_ctx, key_r, values_r);
+ }
+ return FALSE;
+}
+
+static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx,
+ const char **error_r)
+{
+ struct client_dict *dict = (struct client_dict *)_ctx->dict;
+ struct client_dict_iterate_context *ctx =
+ (struct client_dict_iterate_context *)_ctx;
+ int ret = ctx->error != NULL ? -1 : 0;
+
+ i_assert(!ctx->deinit);
+ ctx->deinit = TRUE;
+
+ *error_r = t_strdup(ctx->error);
+ array_free(&ctx->results);
+ pool_unref(&ctx->results_pool);
+ i_free(ctx->path);
+ client_dict_iterate_unref(ctx);
+
+ client_dict_add_timeout(dict);
+ return ret;
+}
+
+static struct dict_transaction_context *
+client_dict_transaction_init(struct dict *_dict)
+{
+ struct client_dict *dict = (struct client_dict *)_dict;
+ struct client_dict_transaction_context *ctx;
+
+ ctx = i_new(struct client_dict_transaction_context, 1);
+ ctx->ctx.dict = _dict;
+ ctx->id = ++dict->transaction_id_counter;
+
+ DLLIST_PREPEND(&dict->transactions, ctx);
+ return &ctx->ctx;
+}
+
+static void
+client_dict_transaction_free(struct client_dict_transaction_context **_ctx)
+{
+ struct client_dict_transaction_context *ctx = *_ctx;
+
+ *_ctx = NULL;
+ i_free(ctx->first_query);
+ i_free(ctx->error);
+ i_free(ctx);
+}
+
+static void
+client_dict_transaction_commit_callback(struct client_dict_cmd *cmd,
+ enum dict_protocol_reply reply,
+ const char *value,
+ const char *const *extra_args,
+ const char *error, bool disconnected)
+{
+ struct client_dict *dict = cmd->dict;
+ struct dict_commit_result result = {
+ .ret = DICT_COMMIT_RET_FAILED, .error = NULL
+ };
+
+ i_assert(cmd->trans != NULL);
+
+ if (error != NULL) {
+ /* failed */
+ if (disconnected)
+ result.ret = DICT_COMMIT_RET_WRITE_UNCERTAIN;
+ result.error = error;
+ } else switch (reply) {
+ case DICT_PROTOCOL_REPLY_OK:
+ result.ret = DICT_COMMIT_RET_OK;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ result.ret = DICT_COMMIT_RET_NOTFOUND;
+ break;
+ case DICT_PROTOCOL_REPLY_WRITE_UNCERTAIN:
+ result.ret = DICT_COMMIT_RET_WRITE_UNCERTAIN;
+ /* fallthrough */
+ case DICT_PROTOCOL_REPLY_FAIL: {
+ /* value contains the obsolete trans_id */
+ const char *error = extra_args[0];
+
+ result.error = t_strdup_printf("dict-server returned failure: %s",
+ error != NULL ? t_str_tabunescape(error) : "");
+ if (error != NULL)
+ extra_args++;
+ break;
+ }
+ default:
+ result.ret = DICT_COMMIT_RET_FAILED;
+ result.error = t_strdup_printf(
+ "dict-client: Invalid commit reply: %c%s",
+ reply, value);
+ client_dict_disconnect(dict, result.error);
+ break;
+ }
+
+ int diff = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time);
+ if (result.error != NULL) {
+ /* include timing info always in error messages */
+ result.error = t_strdup_printf("%s (reply took %s)",
+ result.error, dict_warnings_sec(cmd, diff, extra_args));
+ } else if (!cmd->background && !cmd->trans->ctx.no_slowness_warning &&
+ diff >= (int)dict->warn_slow_msecs) {
+ e_warning(dict->conn.conn.event, "dict commit took %s: "
+ "%s (%u commands, first: %s)",
+ dict_warnings_sec(cmd, diff, extra_args),
+ cmd->query, cmd->trans->query_count,
+ cmd->trans->first_query);
+ }
+ client_dict_transaction_free(&cmd->trans);
+
+ dict_pre_api_callback(&dict->dict);
+ cmd->api_callback.commit(&result, cmd->api_callback.context);
+ dict_post_api_callback(&dict->dict);
+}
+
+
+static void
+client_dict_transaction_commit(struct dict_transaction_context *_ctx,
+ bool async,
+ dict_transaction_commit_callback_t *callback,
+ void *context)
+{
+ struct client_dict_transaction_context *ctx =
+ (struct client_dict_transaction_context *)_ctx;
+ struct client_dict *dict = (struct client_dict *)_ctx->dict;
+ struct client_dict_cmd *cmd;
+ const char *query;
+
+ DLLIST_REMOVE(&dict->transactions, ctx);
+
+ if (ctx->sent_begin && ctx->error == NULL) {
+ query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_COMMIT, ctx->id);
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->trans = ctx;
+
+ cmd->callback = client_dict_transaction_commit_callback;
+ cmd->api_callback.commit = callback;
+ cmd->api_callback.context = context;
+ if (callback == dict_transaction_commit_async_noop_callback)
+ cmd->background = TRUE;
+ if (client_dict_cmd_send(dict, &cmd, NULL)) {
+ if (!async)
+ client_dict_wait(_ctx->dict);
+ }
+ } else if (ctx->error != NULL) {
+ /* already failed */
+ struct dict_commit_result result = {
+ .ret = DICT_COMMIT_RET_FAILED, .error = ctx->error
+ };
+ callback(&result, context);
+ client_dict_transaction_free(&ctx);
+ } else {
+ /* nothing changed */
+ struct dict_commit_result result = {
+ .ret = DICT_COMMIT_RET_OK, .error = NULL
+ };
+ callback(&result, context);
+ client_dict_transaction_free(&ctx);
+ }
+
+ client_dict_add_timeout(dict);
+}
+
+static void
+client_dict_transaction_rollback(struct dict_transaction_context *_ctx)
+{
+ struct client_dict_transaction_context *ctx =
+ (struct client_dict_transaction_context *)_ctx;
+ struct client_dict *dict = (struct client_dict *)_ctx->dict;
+
+ if (ctx->sent_begin) {
+ const char *query;
+
+ query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_ROLLBACK,
+ ctx->id);
+ client_dict_send_transaction_query(ctx, query);
+ }
+
+ DLLIST_REMOVE(&dict->transactions, ctx);
+ client_dict_transaction_free(&ctx);
+ client_dict_add_timeout(dict);
+}
+
+static void client_dict_set(struct dict_transaction_context *_ctx,
+ const char *key, const char *value)
+{
+ struct client_dict_transaction_context *ctx =
+ (struct client_dict_transaction_context *)_ctx;
+ const char *query;
+
+ query = t_strdup_printf("%c%u\t%s\t%s",
+ DICT_PROTOCOL_CMD_SET, ctx->id,
+ str_tabescape(key),
+ str_tabescape(value));
+ client_dict_send_transaction_query(ctx, query);
+}
+
+static void client_dict_unset(struct dict_transaction_context *_ctx,
+ const char *key)
+{
+ struct client_dict_transaction_context *ctx =
+ (struct client_dict_transaction_context *)_ctx;
+ const char *query;
+
+ query = t_strdup_printf("%c%u\t%s",
+ DICT_PROTOCOL_CMD_UNSET, ctx->id,
+ str_tabescape(key));
+ client_dict_send_transaction_query(ctx, query);
+}
+
+static void client_dict_atomic_inc(struct dict_transaction_context *_ctx,
+ const char *key, long long diff)
+{
+ struct client_dict_transaction_context *ctx =
+ (struct client_dict_transaction_context *)_ctx;
+ const char *query;
+
+ query = t_strdup_printf("%c%u\t%s\t%lld",
+ DICT_PROTOCOL_CMD_ATOMIC_INC,
+ ctx->id, str_tabescape(key), diff);
+ client_dict_send_transaction_query(ctx, query);
+}
+
+static void client_dict_set_timestamp(struct dict_transaction_context *_ctx,
+ const struct timespec *ts)
+{
+ struct client_dict_transaction_context *ctx =
+ (struct client_dict_transaction_context *)_ctx;
+ const char *query;
+
+ query = t_strdup_printf("%c%u\t%s\t%u",
+ DICT_PROTOCOL_CMD_TIMESTAMP,
+ ctx->id, dec2str(ts->tv_sec),
+ (unsigned int)ts->tv_nsec);
+ client_dict_send_transaction_query(ctx, query);
+}
+
+struct dict dict_driver_client = {
+ .name = "proxy",
+
+ {
+ .init = client_dict_init,
+ .deinit = client_dict_deinit,
+ .wait = client_dict_wait,
+ .lookup = client_dict_lookup,
+ .iterate_init = client_dict_iterate_init,
+ .iterate = client_dict_iterate,
+ .iterate_deinit = client_dict_iterate_deinit,
+ .transaction_init = client_dict_transaction_init,
+ .transaction_commit = client_dict_transaction_commit,
+ .transaction_rollback = client_dict_transaction_rollback,
+ .set = client_dict_set,
+ .unset = client_dict_unset,
+ .atomic_inc = client_dict_atomic_inc,
+ .lookup_async = client_dict_lookup_async,
+ .switch_ioloop = client_dict_switch_ioloop,
+ .set_timestamp = client_dict_set_timestamp,
+ }
+};