diff options
Diffstat (limited to 'src/replication/replicator/replicator-queue.c')
-rw-r--r-- | src/replication/replicator/replicator-queue.c | 488 |
1 files changed, 488 insertions, 0 deletions
diff --git a/src/replication/replicator/replicator-queue.c b/src/replication/replicator/replicator-queue.c new file mode 100644 index 0000000..430716a --- /dev/null +++ b/src/replication/replicator/replicator-queue.c @@ -0,0 +1,488 @@ +/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "ioloop.h" +#include "istream.h" +#include "ostream.h" +#include "str.h" +#include "strescape.h" +#include "hash.h" +#include "replicator-queue.h" + +#include <unistd.h> +#include <fcntl.h> + +struct replicator_sync_lookup { + struct replicator_user *user; + + replicator_sync_callback_t *callback; + void *context; + + bool wait_for_next_push; +}; + +struct replicator_queue { + struct priorityq *user_queue; + /* username => struct replicator_user* */ + HASH_TABLE(char *, struct replicator_user *) user_hash; + + ARRAY(struct replicator_sync_lookup) sync_lookups; + + unsigned int full_sync_interval; + unsigned int failure_resync_interval; + + void (*change_callback)(void *context); + void *change_context; +}; + +struct replicator_queue_iter { + struct replicator_queue *queue; + struct hash_iterate_context *iter; +}; + +static int user_priority_cmp(const void *p1, const void *p2) +{ + const struct replicator_user *user1 = p1, *user2 = p2; + + if (user1->priority > user2->priority) + return -1; + if (user1->priority < user2->priority) + return 1; + + if (user1->priority != REPLICATION_PRIORITY_NONE) { + /* there is something to replicate */ + if (user1->last_fast_sync < user2->last_fast_sync) + return -1; + if (user1->last_fast_sync > user2->last_fast_sync) + return 1; + } else if (user1->last_sync_failed != user2->last_sync_failed) { + /* resync failures first */ + if (user1->last_sync_failed) + return -1; + else + return 1; + } else if (user1->last_sync_failed) { + /* both have failed. resync failures with fast-sync timestamp */ + if (user1->last_fast_sync < user2->last_fast_sync) + return -1; + if (user1->last_fast_sync > user2->last_fast_sync) + return 1; + } else { + /* nothing to replicate, but do still periodic full syncs */ + if (user1->last_full_sync < user2->last_full_sync) + return -1; + if (user1->last_full_sync > user2->last_full_sync) + return 1; + } + return 0; +} + +struct replicator_queue * +replicator_queue_init(unsigned int full_sync_interval, + unsigned int failure_resync_interval) +{ + struct replicator_queue *queue; + + queue = i_new(struct replicator_queue, 1); + queue->full_sync_interval = full_sync_interval; + queue->failure_resync_interval = failure_resync_interval; + queue->user_queue = priorityq_init(user_priority_cmp, 1024); + hash_table_create(&queue->user_hash, default_pool, 1024, + str_hash, strcmp); + i_array_init(&queue->sync_lookups, 32); + return queue; +} + +void replicator_queue_deinit(struct replicator_queue **_queue) +{ + struct replicator_queue *queue = *_queue; + struct priorityq_item *item; + + *_queue = NULL; + + queue->change_callback = NULL; + + while ((item = priorityq_pop(queue->user_queue)) != NULL) { + struct replicator_user *user = (struct replicator_user *)item; + + user->popped = TRUE; + replicator_queue_remove(queue, &user); + } + + priorityq_deinit(&queue->user_queue); + hash_table_destroy(&queue->user_hash); + i_assert(array_count(&queue->sync_lookups) == 0); + array_free(&queue->sync_lookups); + i_free(queue); +} + +void replicator_queue_set_change_callback(struct replicator_queue *queue, + void (*callback)(void *context), + void *context) +{ + queue->change_callback = callback; + queue->change_context = context; +} + +void replicator_user_ref(struct replicator_user *user) +{ + i_assert(user->refcount > 0); + user->refcount++; +} + +bool replicator_user_unref(struct replicator_user **_user) +{ + struct replicator_user *user = *_user; + + i_assert(user->refcount > 0); + *_user = NULL; + if (--user->refcount > 0) + return TRUE; + + i_free(user->state); + i_free(user->username); + i_free(user); + return FALSE; +} + +struct replicator_user * +replicator_queue_lookup(struct replicator_queue *queue, const char *username) +{ + return hash_table_lookup(queue->user_hash, username); +} + +static struct replicator_user * +replicator_queue_add_int(struct replicator_queue *queue, const char *username, + enum replication_priority priority) +{ + struct replicator_user *user; + + user = replicator_queue_lookup(queue, username); + if (user == NULL) { + user = i_new(struct replicator_user, 1); + user->refcount = 1; + user->username = i_strdup(username); + hash_table_insert(queue->user_hash, user->username, user); + } else { + if (user->priority > priority) { + /* user already has a higher priority than this */ + return user; + } + if (!user->popped) + priorityq_remove(queue->user_queue, &user->item); + } + user->priority = priority; + user->last_update = ioloop_time; + + if (!user->popped) + priorityq_add(queue->user_queue, &user->item); + return user; +} + +struct replicator_user * +replicator_queue_add(struct replicator_queue *queue, const char *username, + enum replication_priority priority) +{ + struct replicator_user *user; + + user = replicator_queue_add_int(queue, username, priority); + if (queue->change_callback != NULL) + queue->change_callback(queue->change_context); + return user; +} + +void replicator_queue_add_sync(struct replicator_queue *queue, + const char *username, + replicator_sync_callback_t *callback, + void *context) +{ + struct replicator_user *user; + struct replicator_sync_lookup *lookup; + + user = replicator_queue_add_int(queue, username, + REPLICATION_PRIORITY_SYNC); + + lookup = array_append_space(&queue->sync_lookups); + lookup->user = user; + lookup->callback = callback; + lookup->context = context; + lookup->wait_for_next_push = user->popped; + + if (queue->change_callback != NULL) + queue->change_callback(queue->change_context); +} + +void replicator_queue_remove(struct replicator_queue *queue, + struct replicator_user **_user) +{ + struct replicator_user *user = *_user; + + *_user = NULL; + if (!user->popped) + priorityq_remove(queue->user_queue, &user->item); + hash_table_remove(queue->user_hash, user->username); + replicator_user_unref(&user); + + if (queue->change_callback != NULL) + queue->change_callback(queue->change_context); +} + +bool replicator_queue_want_sync_now(struct replicator_queue *queue, + struct replicator_user *user, + unsigned int *next_secs_r) +{ + time_t next_sync; + + if (user->priority != REPLICATION_PRIORITY_NONE) + return TRUE; + + if (user->last_sync_failed) { + next_sync = user->last_fast_sync + + queue->failure_resync_interval; + } else { + next_sync = user->last_full_sync + queue->full_sync_interval; + } + if (next_sync <= ioloop_time) + return TRUE; + + *next_secs_r = next_sync - ioloop_time; + return FALSE; +} + +struct replicator_user * +replicator_queue_pop(struct replicator_queue *queue, + unsigned int *next_secs_r) +{ + struct priorityq_item *item; + struct replicator_user *user; + + item = priorityq_peek(queue->user_queue); + if (item == NULL) { + /* no users defined. we shouldn't normally get here */ + *next_secs_r = 3600; + return NULL; + } + user = (struct replicator_user *)item; + if (!replicator_queue_want_sync_now(queue, user, next_secs_r)) { + /* we don't want to sync the user yet */ + return NULL; + } + priorityq_remove(queue->user_queue, &user->item); + user->popped = TRUE; + return user; +} + +static void +replicator_queue_handle_sync_lookups(struct replicator_queue *queue, + struct replicator_user *user) +{ + struct replicator_sync_lookup *lookups; + ARRAY(struct replicator_sync_lookup) callbacks; + unsigned int i, count; + bool success = !user->last_sync_failed; + + t_array_init(&callbacks, 8); + lookups = array_get_modifiable(&queue->sync_lookups, &count); + for (i = 0; i < count; ) { + if (lookups[i].user != user) + i++; + else if (lookups[i].wait_for_next_push) { + /* another sync request came while user was being + replicated */ + i_assert(user->priority == REPLICATION_PRIORITY_SYNC); + lookups[i].wait_for_next_push = FALSE; + i++; + } else { + array_push_back(&callbacks, &lookups[i]); + array_delete(&queue->sync_lookups, i, 1); + } + } + + array_foreach_modifiable(&callbacks, lookups) + lookups->callback(success, lookups->context); +} + +void replicator_queue_push(struct replicator_queue *queue, + struct replicator_user *user) +{ + i_assert(user->popped); + + priorityq_add(queue->user_queue, &user->item); + user->popped = FALSE; + + T_BEGIN { + replicator_queue_handle_sync_lookups(queue, user); + } T_END; +} + +static int +replicator_queue_import_line(struct replicator_queue *queue, const char *line) +{ + const char *const *args, *username, *state; + unsigned int priority; + struct replicator_user *user, tmp_user; + + /* <user> <priority> <last update> <last fast sync> <last full sync> + <last failed> <state> <last successful sync>*/ + args = t_strsplit_tabescaped(line); + if (str_array_length(args) < 7) + return -1; + + i_zero(&tmp_user); + username = args[0]; + state = t_strdup_noconst(args[6]); + if (username[0] == '\0' || + str_to_uint(args[1], &priority) < 0 || + str_to_time(args[2], &tmp_user.last_update) < 0 || + str_to_time(args[3], &tmp_user.last_fast_sync) < 0 || + str_to_time(args[4], &tmp_user.last_full_sync) < 0) + return -1; + tmp_user.priority = priority; + tmp_user.last_sync_failed = args[5][0] != '0'; + + if (str_array_length(args) >= 8) { + if (str_to_time(args[7], &tmp_user.last_successful_sync) < 0) + return -1; + } else { + tmp_user.last_successful_sync = 0; + /* On-disk format didn't have this yet */ + } + + user = hash_table_lookup(queue->user_hash, username); + if (user != NULL) { + if (user->last_update > tmp_user.last_update) { + /* we already have a newer state */ + return 0; + } + if (user->last_update == tmp_user.last_update) { + /* either one of these could be newer. use the one + with higher priority. */ + if (user->priority > tmp_user.priority) + return 0; + } + } + user = replicator_queue_add(queue, username, + tmp_user.priority); + user->last_update = tmp_user.last_update; + user->last_fast_sync = tmp_user.last_fast_sync; + user->last_full_sync = tmp_user.last_full_sync; + user->last_successful_sync = tmp_user.last_successful_sync; + user->last_sync_failed = tmp_user.last_sync_failed; + i_free(user->state); + user->state = i_strdup(state); + return 0; +} + +int replicator_queue_import(struct replicator_queue *queue, const char *path) +{ + struct istream *input; + const char *line; + int fd, ret = 0; + + fd = open(path, O_RDONLY); + if (fd == -1) { + if (errno == ENOENT) + return 0; + i_error("open(%s) failed: %m", path); + return -1; + } + + input = i_stream_create_fd_autoclose(&fd, SIZE_MAX); + while ((line = i_stream_read_next_line(input)) != NULL) { + T_BEGIN { + ret = replicator_queue_import_line(queue, line); + } T_END; + if (ret < 0) { + i_error("Corrupted replicator record in %s: %s", + path, line); + break; + } + } + if (input->stream_errno != 0) { + i_error("read(%s) failed: %s", path, i_stream_get_error(input)); + ret = -1; + } + i_stream_destroy(&input); + return ret; +} + +static void +replicator_queue_export_user(struct replicator_user *user, string_t *str) +{ + str_append_tabescaped(str, user->username); + str_printfa(str, "\t%d\t%lld\t%lld\t%lld\t%d\t", (int)user->priority, + (long long)user->last_update, + (long long)user->last_fast_sync, + (long long)user->last_full_sync, + user->last_sync_failed ? 1 : 0); + if (user->state != NULL) + str_append_tabescaped(str, user->state); + str_printfa(str, "\t%lld\n", (long long)user->last_successful_sync); +} + +int replicator_queue_export(struct replicator_queue *queue, const char *path) +{ + struct replicator_queue_iter *iter; + struct replicator_user *user; + struct ostream *output; + string_t *str; + int fd, ret = 0; + + fd = creat(path, 0600); + if (fd == -1) { + i_error("creat(%s) failed: %m", path); + return -1; + } + output = o_stream_create_fd_file_autoclose(&fd, 0); + o_stream_cork(output); + + str = t_str_new(128); + iter = replicator_queue_iter_init(queue); + while ((user = replicator_queue_iter_next(iter)) != NULL) { + str_truncate(str, 0); + replicator_queue_export_user(user, str); + if (o_stream_send(output, str_data(str), str_len(str)) < 0) + break; + } + replicator_queue_iter_deinit(&iter); + if (o_stream_finish(output) < 0) { + i_error("write(%s) failed: %s", path, o_stream_get_error(output)); + ret = -1; + } + o_stream_destroy(&output); + return ret; +} + +struct replicator_queue_iter * +replicator_queue_iter_init(struct replicator_queue *queue) +{ + struct replicator_queue_iter *iter; + + iter = i_new(struct replicator_queue_iter, 1); + iter->queue = queue; + iter->iter = hash_table_iterate_init(queue->user_hash); + return iter; +} + +struct replicator_user * +replicator_queue_iter_next(struct replicator_queue_iter *iter) +{ + struct replicator_user *user; + char *username; + + if (!hash_table_iterate(iter->iter, iter->queue->user_hash, + &username, &user)) + return NULL; + return user; +} + +void replicator_queue_iter_deinit(struct replicator_queue_iter **_iter) +{ + struct replicator_queue_iter *iter = *_iter; + + *_iter = NULL; + + hash_table_iterate_deinit(&iter->iter); + i_free(iter); +} |