/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "ioloop.h" #include "net.h" #include "istream.h" #include "ostream.h" #include "buffer.h" #include "hash.h" #include "llist.h" #include "strescape.h" #include "replicator-connection.h" #define MAX_INBUF_SIZE 1024 #define REPLICATOR_RECONNECT_MSECS 5000 #define REPLICATOR_MEMBUF_MAX_SIZE 1024*1024 #define REPLICATOR_HANDSHAKE "VERSION\treplicator-notify\t1\t0\n" struct replicator_connection { char *path; struct ip_addr *ips; unsigned int ips_count, ip_idx; in_port_t port; int fd; struct io *io; struct istream *input; struct ostream *output; struct timeout *to; buffer_t *queue[REPLICATION_PRIORITY_SYNC + 1]; HASH_TABLE(void *, void *) requests; unsigned int request_id_counter; replicator_sync_callback_t *callback; }; static void replicator_connection_disconnect(struct replicator_connection *conn); static int replicator_input_line(struct replicator_connection *conn, const char *line) { void *context; unsigned int id; /* <+|-> \t */ if ((line[0] != '+' && line[0] != '-') || line[1] != '\t' || str_to_uint(line+2, &id) < 0 || id == 0) { i_error("Replicator sent invalid input: %s", line); return -1; } context = hash_table_lookup(conn->requests, POINTER_CAST(id)); if (context == NULL) { i_error("Replicator sent invalid ID: %u", id); return -1; } hash_table_remove(conn->requests, POINTER_CAST(id)); conn->callback(line[0] == '+', context); return 0; } static void replicator_input(struct replicator_connection *conn) { const char *line; switch (i_stream_read(conn->input)) { case -2: /* buffer full */ i_error("Replicator sent too long line"); replicator_connection_disconnect(conn); return; case -1: /* disconnected */ replicator_connection_disconnect(conn); return; } while ((line = i_stream_next_line(conn->input)) != NULL) (void)replicator_input_line(conn, line); } static bool replicator_send_buf(struct replicator_connection *conn, buffer_t *buf) { const unsigned char *data = buf->data; size_t len = IO_BLOCK_SIZE; /* try to send about IO_BLOCK_SIZE amount of data, but only full lines */ if (len > buf->used) len = buf->used; for (;; len++) { i_assert(len < buf->used); /* there is always LF */ if (data[len] == '\n') { len++; break; } } if (o_stream_send(conn->output, data, len) < 0) { replicator_connection_disconnect(conn); return FALSE; } buffer_delete(buf, 0, len); return TRUE; } static int replicator_output(struct replicator_connection *conn) { enum replication_priority p; if (o_stream_flush(conn->output) < 0) { replicator_connection_disconnect(conn); return 1; } for (p = REPLICATION_PRIORITY_SYNC;;) { if (o_stream_get_buffer_used_size(conn->output) > 0) { o_stream_set_flush_pending(conn->output, TRUE); break; } /* output buffer is empty, send more data */ if (conn->queue[p]->used > 0) { if (!replicator_send_buf(conn, conn->queue[p])) break; } else { if (p == REPLICATION_PRIORITY_LOW) break; p--; } } return 1; } static void replicator_connection_connect(struct replicator_connection *conn) { unsigned int n; int fd = -1; if (conn->fd != -1) return; if (conn->port == 0) { fd = net_connect_unix(conn->path); if (fd == -1) i_error("net_connect_unix(%s) failed: %m", conn->path); } else { for (n = 0; n < conn->ips_count; n++) { unsigned int idx = conn->ip_idx; conn->ip_idx = (conn->ip_idx + 1) % conn->ips_count; fd = net_connect_ip(&conn->ips[idx], conn->port, NULL); if (fd != -1) break; i_error("connect(%s, %u) failed: %m", net_ip2addr(&conn->ips[idx]), conn->port); } } if (fd == -1) { if (conn->to == NULL) { conn->to = timeout_add(REPLICATOR_RECONNECT_MSECS, replicator_connection_connect, conn); } return; } timeout_remove(&conn->to); conn->fd = fd; conn->io = io_add(fd, IO_READ, replicator_input, conn); conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE); conn->output = o_stream_create_fd(fd, SIZE_MAX); o_stream_set_no_error_handling(conn->output, TRUE); o_stream_nsend_str(conn->output, REPLICATOR_HANDSHAKE); o_stream_set_flush_callback(conn->output, replicator_output, conn); } static void replicator_abort_all_requests(struct replicator_connection *conn) { struct hash_iterate_context *iter; void *key, *value; iter = hash_table_iterate_init(conn->requests); while (hash_table_iterate(iter, conn->requests, &key, &value)) conn->callback(FALSE, value); hash_table_iterate_deinit(&iter); hash_table_clear(conn->requests, TRUE); } static void replicator_connection_disconnect(struct replicator_connection *conn) { if (conn->fd == -1) return; replicator_abort_all_requests(conn); io_remove(&conn->io); i_stream_destroy(&conn->input); o_stream_destroy(&conn->output); net_disconnect(conn->fd); conn->fd = -1; } static struct replicator_connection *replicator_connection_create(void) { struct replicator_connection *conn; unsigned int i; conn = i_new(struct replicator_connection, 1); conn->fd = -1; hash_table_create_direct(&conn->requests, default_pool, 0); for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++) conn->queue[i] = buffer_create_dynamic(default_pool, 1024); return conn; } struct replicator_connection * replicator_connection_create_unix(const char *path, replicator_sync_callback_t *callback) { struct replicator_connection *conn; conn = replicator_connection_create(); conn->callback = callback; conn->path = i_strdup(path); return conn; } struct replicator_connection * replicator_connection_create_inet(const struct ip_addr *ips, unsigned int ips_count, in_port_t port, replicator_sync_callback_t *callback) { struct replicator_connection *conn; conn = replicator_connection_create(); conn->callback = callback; conn->ips = i_new(struct ip_addr, ips_count); memcpy(conn->ips, ips, sizeof(*ips) * ips_count); conn->ips_count = ips_count; conn->port = port; return conn; } void replicator_connection_destroy(struct replicator_connection **_conn) { struct replicator_connection *conn = *_conn; unsigned int i; *_conn = NULL; replicator_connection_disconnect(conn); for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++) buffer_free(&conn->queue[i]); timeout_remove(&conn->to); hash_table_destroy(&conn->requests); i_free(conn->ips); i_free(conn->path); i_free(conn); } static void replicator_send(struct replicator_connection *conn, enum replication_priority priority, const char *data) { size_t data_len = strlen(data); if (conn->fd != -1 && o_stream_get_buffer_used_size(conn->output) == 0) { /* we can send data immediately */ o_stream_nsend(conn->output, data, data_len); } else if (conn->queue[priority]->used + data_len >= REPLICATOR_MEMBUF_MAX_SIZE) { /* FIXME: compress duplicates, start writing to file */ } else { /* queue internally to separate queues */ buffer_append(conn->queue[priority], data, data_len); if (conn->output != NULL) o_stream_set_flush_pending(conn->output, TRUE); } } void replicator_connection_notify(struct replicator_connection *conn, const char *username, enum replication_priority priority) { const char *priority_str = ""; replicator_connection_connect(conn); switch (priority) { case REPLICATION_PRIORITY_NONE: case REPLICATION_PRIORITY_SYNC: i_unreached(); case REPLICATION_PRIORITY_LOW: priority_str = "low"; break; case REPLICATION_PRIORITY_HIGH: priority_str = "high"; break; } T_BEGIN { replicator_send(conn, priority, t_strdup_printf( "U\t%s\t%s\n", str_tabescape(username), priority_str)); } T_END; } void replicator_connection_notify_sync(struct replicator_connection *conn, const char *username, void *context) { unsigned int id; replicator_connection_connect(conn); id = ++conn->request_id_counter; if (id == 0) id++; hash_table_insert(conn->requests, POINTER_CAST(id), context); T_BEGIN { replicator_send(conn, REPLICATION_PRIORITY_SYNC, t_strdup_printf( "U\t%s\tsync\t%u\n", str_tabescape(username), id)); } T_END; }