diff options
Diffstat (limited to 'src/replication/replicator/doveadm-connection.c')
-rw-r--r-- | src/replication/replicator/doveadm-connection.c | 354 |
1 files changed, 354 insertions, 0 deletions
diff --git a/src/replication/replicator/doveadm-connection.c b/src/replication/replicator/doveadm-connection.c new file mode 100644 index 0000000..1932bc6 --- /dev/null +++ b/src/replication/replicator/doveadm-connection.c @@ -0,0 +1,354 @@ +/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "connection.h" +#include "ostream.h" +#include "str.h" +#include "strescape.h" +#include "wildcard-match.h" +#include "master-service.h" +#include "replicator-brain.h" +#include "replicator-queue.h" +#include "replicator-settings.h" +#include "dsync-client.h" +#include "doveadm-connection.h" + +#include <unistd.h> + +#define REPLICATOR_DOVEADM_MAJOR_VERSION 1 +#define REPLICATOR_DOVEADM_MINOR_VERSION 0 + +struct doveadm_connection { + struct connection conn; + struct replicator_brain *brain; +}; +static struct connection_list *doveadm_connections; + +static int client_input_status_overview(struct doveadm_connection *client) +{ + struct replicator_queue *queue = + replicator_brain_get_queue(client->brain); + struct replicator_queue_iter *iter; + struct replicator_user *user; + enum replication_priority priority; + unsigned int pending_counts[REPLICATION_PRIORITY_SYNC+1]; + unsigned int user_count, next_secs, pending_failed_count; + unsigned int pending_full_resync_count, waiting_failed_count; + string_t *str = t_str_new(256); + + memset(pending_counts, 0, sizeof(pending_counts)); + pending_failed_count = 0; waiting_failed_count = 0; + pending_full_resync_count = 0; + + user_count = 0; + iter = replicator_queue_iter_init(queue); + while ((user = replicator_queue_iter_next(iter)) != NULL) { + if (user->priority != REPLICATION_PRIORITY_NONE) + pending_counts[user->priority]++; + else if (replicator_queue_want_sync_now(user, &next_secs)) { + if (user->last_sync_failed) + pending_failed_count++; + else + pending_full_resync_count++; + } else { + if (user->last_sync_failed) + waiting_failed_count++; + } + user_count++; + } + replicator_queue_iter_deinit(&iter); + + for (priority = REPLICATION_PRIORITY_SYNC; priority > 0; priority--) { + str_printfa(str, "Queued '%s' requests\t%u\n", + replicator_priority_to_str(priority), + pending_counts[priority]); + } + str_printfa(str, "Queued 'failed' requests\t%u\n", + pending_failed_count); + str_printfa(str, "Queued 'full resync' requests\t%u\n", + pending_full_resync_count); + str_printfa(str, "Waiting 'failed' requests\t%u\n", + waiting_failed_count); + str_printfa(str, "Total number of known users\t%u\n", user_count); + str_append_c(str, '\n'); + o_stream_nsend(client->conn.output, str_data(str), str_len(str)); + return 0; +} + +static int +client_input_status(struct doveadm_connection *client, const char *const *args) +{ + struct replicator_queue *queue = + replicator_brain_get_queue(client->brain); + struct replicator_queue_iter *iter; + struct replicator_user *user; + const char *mask = args[0]; + unsigned int next_secs; + string_t *str = t_str_new(128); + + if (mask == NULL) + return client_input_status_overview(client); + + iter = replicator_queue_iter_init(queue); + while ((user = replicator_queue_iter_next(iter)) != NULL) { + if (!wildcard_match(user->username, mask)) + continue; + + str_truncate(str, 0); + str_append_tabescaped(str, user->username); + str_append_c(str, '\t'); + str_append(str, replicator_priority_to_str(user->priority)); + if (replicator_queue_want_sync_now(user, &next_secs)) + next_secs = 0; + str_printfa(str, "\t%lld\t%lld\t%d\t%lld\t%u\n", + (long long)user->last_fast_sync, + (long long)user->last_full_sync, + user->last_sync_failed ? 1 : 0, + (long long)user->last_successful_sync, + next_secs); + o_stream_nsend(client->conn.output, str_data(str), str_len(str)); + } + replicator_queue_iter_deinit(&iter); + o_stream_nsend(client->conn.output, "\n", 1); + return 0; +} + +static int +client_input_status_dsyncs(struct doveadm_connection *client) +{ + string_t *str = t_str_new(256); + const ARRAY_TYPE(dsync_client) *clients; + struct dsync_client *dsync_client; + const char *username; + + clients = replicator_brain_get_dsync_clients(client->brain); + array_foreach_elem(clients, dsync_client) { + username = dsync_client_get_username(dsync_client); + if (username != NULL) { + str_append_tabescaped(str, username); + str_append_c(str, '\t'); + switch (dsync_client_get_type(dsync_client)) { + case DSYNC_TYPE_FULL: + str_append(str, "full"); + break; + case DSYNC_TYPE_NORMAL: + str_append(str, "normal"); + break; + case DSYNC_TYPE_INCREMENTAL: + str_append(str, "incremental"); + break; + } + } else { + str_append(str, "\t-"); + } + str_append_c(str, '\t'); + str_append_tabescaped(str, dsync_client_get_state(dsync_client)); + str_append_c(str, '\n'); + } + + str_append_c(str, '\n'); + o_stream_nsend(client->conn.output, str_data(str), str_len(str)); + return 0; +} + +static int +client_input_replicate(struct doveadm_connection *client, const char *const *args) +{ + struct replicator_queue *queue = + replicator_brain_get_queue(client->brain); + struct replicator_queue_iter *iter; + struct replicator_user *user; + const char *usermask; + enum replication_priority priority; + unsigned int match_count; + bool full; + + /* <priority> <flags> <username>|<mask> */ + if (str_array_length(args) != 3) { + i_error("%s: REPLICATE: Invalid parameters", client->conn.name); + return -1; + } + if (replication_priority_parse(args[0], &priority) < 0) { + o_stream_nsend_str(client->conn.output, "-Invalid priority\n"); + return 0; + } + full = strchr(args[1], 'f') != NULL; + usermask = args[2]; + if (strchr(usermask, '*') == NULL && strchr(usermask, '?') == NULL) { + struct replicator_user *user = + replicator_queue_get(queue, usermask); + if (full) + user->force_full_sync = TRUE; + replicator_queue_update(queue, user, priority); + replicator_queue_add(queue, user); + o_stream_nsend_str(client->conn.output, "+1\n"); + return 0; + } + + match_count = 0; + iter = replicator_queue_iter_init(queue); + while ((user = replicator_queue_iter_next(iter)) != NULL) { + if (!wildcard_match(user->username, usermask)) + continue; + if (full) + user->force_full_sync = TRUE; + replicator_queue_update(queue, user, priority); + replicator_queue_add(queue, user); + match_count++; + } + replicator_queue_iter_deinit(&iter); + o_stream_nsend_str(client->conn.output, + t_strdup_printf("+%u\n", match_count)); + return 0; +} + +static int +client_input_add(struct doveadm_connection *client, const char *const *args) +{ + struct replicator_queue *queue = + replicator_brain_get_queue(client->brain); + const struct replicator_settings *set = + replicator_brain_get_settings(client->brain); + + /* <usermask> */ + if (str_array_length(args) != 1) { + i_error("%s: ADD: Invalid parameters", client->conn.name); + return -1; + } + + if (strchr(args[0], '*') == NULL && strchr(args[0], '?') == NULL) { + struct replicator_user *user = + replicator_queue_get(queue, args[0]); + replicator_queue_add(queue, user); + } else { + replicator_queue_add_auth_users(queue, set->auth_socket_path, + args[0], ioloop_time); + } + o_stream_nsend_str(client->conn.output, "+\n"); + return 0; +} + +static int +client_input_remove(struct doveadm_connection *client, const char *const *args) +{ + struct replicator_queue *queue = + replicator_brain_get_queue(client->brain); + struct replicator_user *user; + + /* <username> */ + if (str_array_length(args) != 1) { + i_error("%s: REMOVE: Invalid parameters", client->conn.name); + return -1; + } + user = replicator_queue_lookup(queue, args[0]); + if (user == NULL) + o_stream_nsend_str(client->conn.output, "-User not found\n"); + else { + replicator_queue_remove(queue, &user); + o_stream_nsend_str(client->conn.output, "+\n"); + } + return 0; +} + +static int +client_input_notify(struct doveadm_connection *client, const char *const *args) +{ + struct replicator_queue *queue = + replicator_brain_get_queue(client->brain); + struct replicator_user *user; + + /* <username> <flags> <state> */ + if (str_array_length(args) < 3) { + i_error("%s: NOTIFY: Invalid parameters", client->conn.name); + return -1; + } + + user = replicator_queue_get(queue, args[0]); + if (args[1][0] == 'f') + user->last_full_sync = ioloop_time; + user->last_fast_sync = ioloop_time; + user->last_update = ioloop_time; + replicator_queue_add(queue, user); + + if (args[2][0] != '\0') { + i_free(user->state); + user->state = i_strdup(args[2]); + } + o_stream_nsend_str(client->conn.output, "+\n"); + return 0; +} + +static int client_input_args(struct connection *conn, const char *const *args) +{ + struct doveadm_connection *client = (struct doveadm_connection *)conn; + const char *cmd = args[0]; + + if (cmd == NULL) { + i_error("%s: Empty command", conn->name); + return 0; + } + args++; + + if (strcmp(cmd, "STATUS") == 0) + return client_input_status(client, args); + else if (strcmp(cmd, "STATUS-DSYNC") == 0) + return client_input_status_dsyncs(client); + else if (strcmp(cmd, "REPLICATE") == 0) + return client_input_replicate(client, args); + else if (strcmp(cmd, "ADD") == 0) + return client_input_add(client, args); + else if (strcmp(cmd, "REMOVE") == 0) + return client_input_remove(client, args); + else if (strcmp(cmd, "NOTIFY") == 0) + return client_input_notify(client, args); + i_error("%s: Unknown command: %s", conn->name, cmd); + return -1; +} + +static void client_destroy(struct connection *conn) +{ + struct doveadm_connection *client = (struct doveadm_connection *)conn; + + connection_deinit(&client->conn); + i_free(client); + + master_service_client_connection_destroyed(master_service); +} + +void doveadm_connection_create(struct replicator_brain *brain, int fd) +{ + struct doveadm_connection *client; + + client = i_new(struct doveadm_connection, 1); + client->brain = brain; + connection_init_server(doveadm_connections, &client->conn, + "doveadm-client", fd, fd); +} + +static struct connection_settings doveadm_conn_set = { + .service_name_in = "replicator-doveadm-client", + .service_name_out = "replicator-doveadm-server", + .major_version = REPLICATOR_DOVEADM_MAJOR_VERSION, + .minor_version = REPLICATOR_DOVEADM_MINOR_VERSION, + + .input_max_size = SIZE_MAX, + .output_max_size = SIZE_MAX, + .client = FALSE +}; + +static const struct connection_vfuncs doveadm_conn_vfuncs = { + .destroy = client_destroy, + .input_args = client_input_args +}; + +void doveadm_connections_init(void) +{ + doveadm_connections = connection_list_init(&doveadm_conn_set, + &doveadm_conn_vfuncs); +} + +void doveadm_connections_deinit(void) +{ + connection_list_deinit(&doveadm_connections); +} |