/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "llist.h" #include "istream.h" #include "ostream.h" #include "strescape.h" #include "master-service.h" #include "replicator-queue.h" #include "notify-connection.h" #include #define MAX_INBUF_SIZE (1024*64) #define NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION 1 #define NOTIFY_CLIENT_PROTOCOL_MINOR_VERSION 0 struct notify_connection { struct notify_connection *prev, *next; int refcount; int fd; struct io *io; struct istream *input; struct ostream *output; struct replicator_queue *queue; bool version_received:1; bool destroyed:1; }; struct notify_sync_request { struct notify_connection *conn; unsigned int id; }; static struct notify_connection *connections; static void notify_connection_destroy(struct notify_connection *conn); static void notify_sync_callback(bool success, void *context) { struct notify_sync_request *request = context; o_stream_nsend_str(request->conn->output, t_strdup_printf( "%c\t%u\n", success ? '+' : '-', request->id)); notify_connection_unref(&request->conn); i_free(request); } static int notify_connection_input_line(struct notify_connection *conn, const char *line) { struct notify_sync_request *request; const char *const *args; enum replication_priority priority; unsigned int id; /* U \t \t [\t ] */ args = t_strsplit_tabescaped(line); if (str_array_length(args) < 2) { i_error("notify client sent invalid input: %s", line); return -1; } if (strcmp(args[0], "U") != 0) { i_error("notify client sent unknown command: %s", args[0]); return -1; } if (replication_priority_parse(args[2], &priority) < 0) { i_error("notify client sent invalid priority: %s", args[2]); return -1; } if (priority != REPLICATION_PRIORITY_SYNC) { struct replicator_user *user = replicator_queue_get(conn->queue, args[1]); replicator_queue_update(conn->queue, user, priority); replicator_queue_add(conn->queue, user); } else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) { i_error("notify client sent invalid sync id: %s", line); return -1; } else { request = i_new(struct notify_sync_request, 1); request->conn = conn; request->id = id; notify_connection_ref(conn); struct replicator_user *user = replicator_queue_get(conn->queue, args[1]); replicator_queue_update(conn->queue, user, REPLICATION_PRIORITY_SYNC); replicator_queue_add_sync_callback(conn->queue, user, notify_sync_callback, request); } return 0; } static void notify_connection_input(struct notify_connection *conn) { const char *line; int ret; switch (i_stream_read(conn->input)) { case -2: i_error("BUG: Client connection sent too much data"); notify_connection_destroy(conn); return; case -1: notify_connection_destroy(conn); return; } if (!conn->version_received) { if ((line = i_stream_next_line(conn->input)) == NULL) return; if (!version_string_verify(line, "replicator-notify", NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION)) { i_error("Notify client not compatible with this server " "(mixed old and new binaries?)"); notify_connection_destroy(conn); return; } conn->version_received = TRUE; } while ((line = i_stream_next_line(conn->input)) != NULL) { T_BEGIN { ret = notify_connection_input_line(conn, line); } T_END; if (ret < 0) { notify_connection_destroy(conn); break; } } } struct notify_connection * notify_connection_create(int fd, struct replicator_queue *queue) { struct notify_connection *conn; i_assert(fd >= 0); conn = i_new(struct notify_connection, 1); conn->refcount = 1; conn->queue = queue; conn->fd = fd; conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE); conn->output = o_stream_create_fd(fd, SIZE_MAX); o_stream_set_no_error_handling(conn->output, TRUE); conn->io = io_add(fd, IO_READ, notify_connection_input, conn); conn->queue = queue; DLLIST_PREPEND(&connections, conn); return conn; } static void notify_connection_destroy(struct notify_connection *conn) { if (conn->destroyed) return; conn->destroyed = TRUE; DLLIST_REMOVE(&connections, conn); io_remove(&conn->io); i_stream_close(conn->input); o_stream_close(conn->output); if (close(conn->fd) < 0) i_error("close(notify connection) failed: %m"); conn->fd = -1; notify_connection_unref(&conn); master_service_client_connection_destroyed(master_service); } void notify_connection_ref(struct notify_connection *conn) { i_assert(conn->refcount > 0); conn->refcount++; } void notify_connection_unref(struct notify_connection **_conn) { struct notify_connection *conn = *_conn; i_assert(conn->refcount > 0); *_conn = NULL; if (--conn->refcount > 0) return; notify_connection_destroy(conn); i_stream_unref(&conn->input); o_stream_unref(&conn->output); i_free(conn); } void notify_connections_destroy_all(void) { while (connections != NULL) notify_connection_destroy(connections); }