diff options
Diffstat (limited to 'src/replication/aggregator/notify-connection.c')
-rw-r--r-- | src/replication/aggregator/notify-connection.c | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/src/replication/aggregator/notify-connection.c b/src/replication/aggregator/notify-connection.c new file mode 100644 index 0000000..f9587fe --- /dev/null +++ b/src/replication/aggregator/notify-connection.c @@ -0,0 +1,154 @@ +/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "net.h" +#include "istream.h" +#include "ostream.h" +#include "llist.h" +#include "strescape.h" +#include "master-service.h" +#include "replication-common.h" +#include "replicator-connection.h" +#include "notify-connection.h" + +#define MAX_INBUF_SIZE 8192 + +#define CONNECTION_IS_FIFO(conn) \ + ((conn)->output == NULL) + +struct notify_connection { + struct notify_connection *prev, *next; + int refcount; + + int fd; + struct io *io; + struct istream *input; + struct ostream *output; +}; + +static struct notify_connection *conns = NULL; + +static void notify_connection_unref(struct notify_connection *conn); +static void notify_connection_destroy(struct notify_connection *conn); + +static bool notify_input_error(struct notify_connection *conn) +{ + if (CONNECTION_IS_FIFO(conn)) + return TRUE; + notify_connection_destroy(conn); + return FALSE; +} + +void notify_connection_sync_callback(bool success, void *context) +{ + struct notify_connection *conn = context; + + o_stream_nsend_str(conn->output, success ? "+\n" : "-\n"); + notify_connection_unref(conn); +} + +static int +notify_input_line(struct notify_connection *conn, const char *line) +{ + const char *const *args; + enum replication_priority priority; + + /* <username> \t <priority> */ + args = t_strsplit_tabescaped(line); + if (str_array_length(args) < 2) { + i_error("Client sent invalid input"); + return -1; + } + if (replication_priority_parse(args[1], &priority) < 0) { + i_error("Client sent invalid priority: %s", args[1]); + return -1; + } + if (priority != REPLICATION_PRIORITY_SYNC) + replicator_connection_notify(replicator, args[0], priority); + else { + conn->refcount++; + replicator_connection_notify_sync(replicator, args[0], conn); + } + return 0; +} + +static void notify_input(struct notify_connection *conn) +{ + const char *line; + int ret; + + switch (i_stream_read(conn->input)) { + case -2: + /* buffer full */ + i_error("Client sent too long line"); + (void)notify_input_error(conn); + return; + case -1: + /* disconnected */ + notify_connection_destroy(conn); + return; + } + + while ((line = i_stream_next_line(conn->input)) != NULL) { + T_BEGIN { + ret = notify_input_line(conn, line); + } T_END; + if (ret < 0) { + if (!notify_input_error(conn)) + return; + } + } +} + +void notify_connection_create(int fd, bool fifo) +{ + struct notify_connection *conn; + + conn = i_new(struct notify_connection, 1); + conn->refcount = 1; + conn->fd = fd; + conn->io = io_add(fd, IO_READ, notify_input, conn); + conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE); + if (!fifo) { + conn->output = o_stream_create_fd(fd, SIZE_MAX); + o_stream_set_no_error_handling(conn->output, TRUE); + } + + DLLIST_PREPEND(&conns, conn); +} + +static void notify_connection_unref(struct notify_connection *conn) +{ + i_assert(conn->refcount > 0); + if (--conn->refcount > 0) + return; + + i_stream_destroy(&conn->input); + o_stream_destroy(&conn->output); + i_free(conn); +} + +static void notify_connection_destroy(struct notify_connection *conn) +{ + i_assert(conn->fd != -1); + + if (!CONNECTION_IS_FIFO(conn)) + master_service_client_connection_destroyed(master_service); + + DLLIST_REMOVE(&conns, conn); + + io_remove(&conn->io); + i_stream_close(conn->input); + o_stream_close(conn->output); + net_disconnect(conn->fd); + conn->fd = -1; + + notify_connection_unref(conn); +} + +void notify_connections_destroy_all(void) +{ + while (conns != NULL) + notify_connection_destroy(conns); +} |