diff options
Diffstat (limited to 'src/replication/aggregator/replicator-connection.c')
-rw-r--r-- | src/replication/aggregator/replicator-connection.c | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/src/replication/aggregator/replicator-connection.c b/src/replication/aggregator/replicator-connection.c new file mode 100644 index 0000000..9275376 --- /dev/null +++ b/src/replication/aggregator/replicator-connection.c @@ -0,0 +1,326 @@ +/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "net.h" +#include "istream.h" +#include "ostream.h" +#include "buffer.h" +#include "hash.h" +#include "llist.h" +#include "strescape.h" +#include "replicator-connection.h" + +#define MAX_INBUF_SIZE 1024 +#define REPLICATOR_RECONNECT_MSECS 5000 +#define REPLICATOR_MEMBUF_MAX_SIZE 1024*1024 +#define REPLICATOR_HANDSHAKE "VERSION\treplicator-notify\t1\t0\n" + +struct replicator_connection { + char *path; + struct ip_addr *ips; + unsigned int ips_count, ip_idx; + in_port_t port; + + int fd; + struct io *io; + struct istream *input; + struct ostream *output; + struct timeout *to; + + buffer_t *queue[REPLICATION_PRIORITY_SYNC + 1]; + + HASH_TABLE(void *, void *) requests; + unsigned int request_id_counter; + replicator_sync_callback_t *callback; +}; + +static void replicator_connection_disconnect(struct replicator_connection *conn); + +static int +replicator_input_line(struct replicator_connection *conn, const char *line) +{ + void *context; + unsigned int id; + + /* <+|-> \t <id> */ + if ((line[0] != '+' && line[0] != '-') || line[1] != '\t' || + str_to_uint(line+2, &id) < 0 || id == 0) { + i_error("Replicator sent invalid input: %s", line); + return -1; + } + + context = hash_table_lookup(conn->requests, POINTER_CAST(id)); + if (context == NULL) { + i_error("Replicator sent invalid ID: %u", id); + return -1; + } + hash_table_remove(conn->requests, POINTER_CAST(id)); + conn->callback(line[0] == '+', context); + return 0; +} + +static void replicator_input(struct replicator_connection *conn) +{ + const char *line; + + switch (i_stream_read(conn->input)) { + case -2: + /* buffer full */ + i_error("Replicator sent too long line"); + replicator_connection_disconnect(conn); + return; + case -1: + /* disconnected */ + replicator_connection_disconnect(conn); + return; + } + + while ((line = i_stream_next_line(conn->input)) != NULL) + (void)replicator_input_line(conn, line); +} + +static bool +replicator_send_buf(struct replicator_connection *conn, buffer_t *buf) +{ + const unsigned char *data = buf->data; + size_t len = IO_BLOCK_SIZE; + + /* try to send about IO_BLOCK_SIZE amount of data, + but only full lines */ + if (len > buf->used) + len = buf->used; + for (;; len++) { + i_assert(len < buf->used); /* there is always LF */ + if (data[len] == '\n') { + len++; + break; + } + } + + if (o_stream_send(conn->output, data, len) < 0) { + replicator_connection_disconnect(conn); + return FALSE; + } + buffer_delete(buf, 0, len); + return TRUE; +} + +static int replicator_output(struct replicator_connection *conn) +{ + enum replication_priority p; + + if (o_stream_flush(conn->output) < 0) { + replicator_connection_disconnect(conn); + return 1; + } + + for (p = REPLICATION_PRIORITY_SYNC;;) { + if (o_stream_get_buffer_used_size(conn->output) > 0) { + o_stream_set_flush_pending(conn->output, TRUE); + break; + } + /* output buffer is empty, send more data */ + if (conn->queue[p]->used > 0) { + if (!replicator_send_buf(conn, conn->queue[p])) + break; + } else { + if (p == REPLICATION_PRIORITY_LOW) + break; + p--; + } + } + return 1; +} + +static void replicator_connection_connect(struct replicator_connection *conn) +{ + unsigned int n; + int fd = -1; + + if (conn->fd != -1) + return; + + if (conn->port == 0) { + fd = net_connect_unix(conn->path); + if (fd == -1) + i_error("net_connect_unix(%s) failed: %m", conn->path); + } else { + for (n = 0; n < conn->ips_count; n++) { + unsigned int idx = conn->ip_idx; + + conn->ip_idx = (conn->ip_idx + 1) % conn->ips_count; + fd = net_connect_ip(&conn->ips[idx], conn->port, NULL); + if (fd != -1) + break; + i_error("connect(%s, %u) failed: %m", + net_ip2addr(&conn->ips[idx]), conn->port); + } + } + + if (fd == -1) { + if (conn->to == NULL) { + conn->to = timeout_add(REPLICATOR_RECONNECT_MSECS, + replicator_connection_connect, + conn); + } + return; + } + + timeout_remove(&conn->to); + conn->fd = fd; + conn->io = io_add(fd, IO_READ, replicator_input, conn); + conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE); + conn->output = o_stream_create_fd(fd, SIZE_MAX); + o_stream_set_no_error_handling(conn->output, TRUE); + o_stream_nsend_str(conn->output, REPLICATOR_HANDSHAKE); + o_stream_set_flush_callback(conn->output, replicator_output, conn); +} + +static void replicator_abort_all_requests(struct replicator_connection *conn) +{ + struct hash_iterate_context *iter; + void *key, *value; + + iter = hash_table_iterate_init(conn->requests); + while (hash_table_iterate(iter, conn->requests, &key, &value)) + conn->callback(FALSE, value); + hash_table_iterate_deinit(&iter); + hash_table_clear(conn->requests, TRUE); +} + +static void replicator_connection_disconnect(struct replicator_connection *conn) +{ + if (conn->fd == -1) + return; + + replicator_abort_all_requests(conn); + io_remove(&conn->io); + i_stream_destroy(&conn->input); + o_stream_destroy(&conn->output); + net_disconnect(conn->fd); + conn->fd = -1; +} + +static struct replicator_connection *replicator_connection_create(void) +{ + struct replicator_connection *conn; + unsigned int i; + + conn = i_new(struct replicator_connection, 1); + conn->fd = -1; + hash_table_create_direct(&conn->requests, default_pool, 0); + for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++) + conn->queue[i] = buffer_create_dynamic(default_pool, 1024); + return conn; +} + +struct replicator_connection * +replicator_connection_create_unix(const char *path, + replicator_sync_callback_t *callback) +{ + struct replicator_connection *conn; + + conn = replicator_connection_create(); + conn->callback = callback; + conn->path = i_strdup(path); + return conn; +} + +struct replicator_connection * +replicator_connection_create_inet(const struct ip_addr *ips, + unsigned int ips_count, in_port_t port, + replicator_sync_callback_t *callback) +{ + struct replicator_connection *conn; + + conn = replicator_connection_create(); + conn->callback = callback; + conn->ips = i_new(struct ip_addr, ips_count); + memcpy(conn->ips, ips, sizeof(*ips) * ips_count); + conn->ips_count = ips_count; + conn->port = port; + return conn; +} + +void replicator_connection_destroy(struct replicator_connection **_conn) +{ + struct replicator_connection *conn = *_conn; + unsigned int i; + + *_conn = NULL; + replicator_connection_disconnect(conn); + + for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++) + buffer_free(&conn->queue[i]); + + timeout_remove(&conn->to); + hash_table_destroy(&conn->requests); + i_free(conn->ips); + i_free(conn->path); + i_free(conn); +} + +static void +replicator_send(struct replicator_connection *conn, + enum replication_priority priority, const char *data) +{ + size_t data_len = strlen(data); + + if (conn->fd != -1 && + o_stream_get_buffer_used_size(conn->output) == 0) { + /* we can send data immediately */ + o_stream_nsend(conn->output, data, data_len); + } else if (conn->queue[priority]->used + data_len >= + REPLICATOR_MEMBUF_MAX_SIZE) { + /* FIXME: compress duplicates, start writing to file */ + } else { + /* queue internally to separate queues */ + buffer_append(conn->queue[priority], data, data_len); + if (conn->output != NULL) + o_stream_set_flush_pending(conn->output, TRUE); + } +} + +void replicator_connection_notify(struct replicator_connection *conn, + const char *username, + enum replication_priority priority) +{ + const char *priority_str = ""; + + replicator_connection_connect(conn); + + switch (priority) { + case REPLICATION_PRIORITY_NONE: + case REPLICATION_PRIORITY_SYNC: + i_unreached(); + case REPLICATION_PRIORITY_LOW: + priority_str = "low"; + break; + case REPLICATION_PRIORITY_HIGH: + priority_str = "high"; + break; + } + + T_BEGIN { + replicator_send(conn, priority, t_strdup_printf( + "U\t%s\t%s\n", str_tabescape(username), priority_str)); + } T_END; +} + +void replicator_connection_notify_sync(struct replicator_connection *conn, + const char *username, void *context) +{ + unsigned int id; + + replicator_connection_connect(conn); + + id = ++conn->request_id_counter; + if (id == 0) id++; + hash_table_insert(conn->requests, POINTER_CAST(id), context); + + T_BEGIN { + replicator_send(conn, REPLICATION_PRIORITY_SYNC, t_strdup_printf( + "U\t%s\tsync\t%u\n", str_tabescape(username), id)); + } T_END; +} |