summaryrefslogtreecommitdiffstats
path: root/src/doveadm/dsync/dsync-ibc-stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/doveadm/dsync/dsync-ibc-stream.c')
-rw-r--r--src/doveadm/dsync/dsync-ibc-stream.c2138
1 files changed, 2138 insertions, 0 deletions
diff --git a/src/doveadm/dsync/dsync-ibc-stream.c b/src/doveadm/dsync/dsync-ibc-stream.c
new file mode 100644
index 0000000..17115b0
--- /dev/null
+++ b/src/doveadm/dsync/dsync-ibc-stream.c
@@ -0,0 +1,2138 @@
+/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "safe-mkstemp.h"
+#include "ioloop.h"
+#include "istream.h"
+#include "istream-seekable.h"
+#include "istream-dot.h"
+#include "ostream.h"
+#include "str.h"
+#include "strescape.h"
+#include "master-service.h"
+#include "mail-cache.h"
+#include "mail-storage-private.h"
+#include "dsync-serializer.h"
+#include "dsync-deserializer.h"
+#include "dsync-mail.h"
+#include "dsync-mailbox.h"
+#include "dsync-mailbox-state.h"
+#include "dsync-mailbox-tree.h"
+#include "dsync-ibc-private.h"
+
+
+#define DSYNC_IBC_STREAM_OUTBUF_THROTTLE_SIZE (1024*128)
+
+#define DSYNC_PROTOCOL_VERSION_MAJOR 3
+#define DSYNC_PROTOCOL_VERSION_MINOR 5
+#define DSYNC_HANDSHAKE_VERSION "VERSION\tdsync\t3\t5\n"
+
+#define DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES 1
+#define DSYNC_PROTOCOL_MINOR_HAVE_SAVE_GUID 2
+#define DSYNC_PROTOCOL_MINOR_HAVE_FINISH 3
+#define DSYNC_PROTOCOL_MINOR_HAVE_HDR_HASH_V2 4
+#define DSYNC_PROTOCOL_MINOR_HAVE_HDR_HASH_V3 5
+
+enum item_type {
+ ITEM_NONE,
+ ITEM_DONE,
+
+ ITEM_HANDSHAKE,
+ ITEM_MAILBOX_STATE,
+ ITEM_MAILBOX_TREE_NODE,
+ ITEM_MAILBOX_DELETE,
+ ITEM_MAILBOX,
+
+ ITEM_MAILBOX_ATTRIBUTE,
+ ITEM_MAIL_CHANGE,
+ ITEM_MAIL_REQUEST,
+ ITEM_MAIL,
+ ITEM_FINISH,
+
+ ITEM_MAILBOX_CACHE_FIELD,
+
+ ITEM_END_OF_LIST
+};
+
+#define END_OF_LIST_LINE "."
+static const struct {
+ /* full human readable name of the item */
+ const char *name;
+ /* unique character identifying the item */
+ char chr;
+ const char *required_keys;
+ const char *optional_keys;
+ unsigned int min_minor_version;
+} items[ITEM_END_OF_LIST+1] = {
+ { NULL, '\0', NULL, NULL, 0 },
+ { .name = "done",
+ .chr = 'X',
+ .optional_keys = ""
+ },
+ { .name = "handshake",
+ .chr = 'H',
+ .required_keys = "hostname",
+ .optional_keys = "sync_ns_prefix sync_box sync_box_guid sync_type "
+ "debug sync_visible_namespaces exclude_mailboxes "
+ "send_mail_requests backup_send backup_recv lock_timeout "
+ "no_mail_sync no_backup_overwrite purge_remote "
+ "no_notify sync_since_timestamp sync_max_size sync_flags sync_until_timestamp "
+ "virtual_all_box empty_hdr_workaround import_commit_msgs_interval "
+ "hashed_headers alt_char"
+ },
+ { .name = "mailbox_state",
+ .chr = 'S',
+ .required_keys = "mailbox_guid last_uidvalidity last_common_uid "
+ "last_common_modseq last_common_pvt_modseq",
+ .optional_keys = "last_messages_count changes_during_sync"
+ },
+ { .name = "mailbox_tree_node",
+ .chr = 'N',
+ .required_keys = "name existence",
+ .optional_keys = "mailbox_guid uid_validity uid_next "
+ "last_renamed_or_created subscribed last_subscription_change"
+ },
+ { .name = "mailbox_delete",
+ .chr = 'D',
+ .required_keys = "hierarchy_sep",
+ .optional_keys = "escape_char mailboxes dirs unsubscribes"
+ },
+ { .name = "mailbox",
+ .chr = 'B',
+ .required_keys = "mailbox_guid uid_validity uid_next messages_count "
+ "first_recent_uid highest_modseq highest_pvt_modseq",
+ .optional_keys = "mailbox_lost mailbox_ignore "
+ "cache_fields have_guids have_save_guids have_only_guid128"
+ },
+ { .name = "mailbox_attribute",
+ .chr = 'A',
+ .required_keys = "type key",
+ .optional_keys = "value stream deleted last_change modseq",
+ .min_minor_version = DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES
+ },
+ { .name = "mail_change",
+ .chr = 'C',
+ .required_keys = "type uid",
+ .optional_keys = "guid hdr_hash modseq pvt_modseq "
+ "add_flags remove_flags final_flags "
+ "keywords_reset keyword_changes received_timestamp virtual_size"
+ },
+ { .name = "mail_request",
+ .chr = 'R',
+ .optional_keys = "guid uid"
+ },
+ { .name = "mail",
+ .chr = 'M',
+ .optional_keys = "guid uid pop3_uidl pop3_order received_date saved_date stream"
+ },
+ { .name = "finish",
+ .chr = 'F',
+ .optional_keys = "error mail_error require_full_resync",
+ .min_minor_version = DSYNC_PROTOCOL_MINOR_HAVE_FINISH
+ },
+ { .name = "mailbox_cache_field",
+ .chr = 'c',
+ .required_keys = "name decision",
+ .optional_keys = "last_used"
+ },
+
+ { "end_of_list", '\0', NULL, NULL, 0 }
+};
+
+struct dsync_ibc_stream {
+ struct dsync_ibc ibc;
+
+ char *name, *temp_path_prefix;
+ unsigned int timeout_secs;
+ struct istream *input;
+ struct ostream *output;
+ struct io *io;
+ struct timeout *to;
+
+ unsigned int minor_version;
+ struct dsync_serializer *serializers[ITEM_END_OF_LIST];
+ struct dsync_deserializer *deserializers[ITEM_END_OF_LIST];
+
+ pool_t ret_pool;
+ struct dsync_deserializer_decoder *cur_decoder;
+
+ struct istream *value_output, *value_input;
+ struct dsync_mail *cur_mail;
+ struct dsync_mailbox_attribute *cur_attr;
+ char value_output_last;
+
+ enum item_type last_recv_item, last_sent_item;
+ bool last_recv_item_eol:1;
+ bool last_sent_item_eol:1;
+
+ bool version_received:1;
+ bool handshake_received:1;
+ bool has_pending_data:1;
+ bool finish_received:1;
+ bool done_received:1;
+ bool stopped:1;
+};
+
+static const char *dsync_ibc_stream_get_state(struct dsync_ibc_stream *ibc)
+{
+ if (!ibc->version_received)
+ return "version not received";
+ else if (!ibc->handshake_received)
+ return "handshake not received";
+
+ return t_strdup_printf("last sent=%s%s, last recv=%s%s",
+ items[ibc->last_sent_item].name,
+ ibc->last_sent_item_eol ? " (EOL)" : "",
+ items[ibc->last_recv_item].name,
+ ibc->last_recv_item_eol ? " (EOL)" : "");
+}
+
+static void dsync_ibc_stream_stop(struct dsync_ibc_stream *ibc)
+{
+ ibc->stopped = TRUE;
+ i_stream_close(ibc->input);
+ o_stream_close(ibc->output);
+ io_loop_stop(current_ioloop);
+}
+
+static int dsync_ibc_stream_read_mail_stream(struct dsync_ibc_stream *ibc)
+{
+ do {
+ i_stream_skip(ibc->value_input,
+ i_stream_get_data_size(ibc->value_input));
+ } while (i_stream_read(ibc->value_input) > 0);
+ if (ibc->value_input->eof) {
+ if (ibc->value_input->stream_errno != 0) {
+ i_error("dsync(%s): read(%s) failed: %s (%s)", ibc->name,
+ i_stream_get_name(ibc->value_input),
+ i_stream_get_error(ibc->value_input),
+ dsync_ibc_stream_get_state(ibc));
+ dsync_ibc_stream_stop(ibc);
+ return -1;
+ }
+ /* finished reading the mail stream */
+ i_assert(ibc->value_input->eof);
+ i_stream_seek(ibc->value_input, 0);
+ ibc->has_pending_data = TRUE;
+ ibc->value_input = NULL;
+ return 1;
+ }
+ return 0;
+}
+
+static void dsync_ibc_stream_input(struct dsync_ibc_stream *ibc)
+{
+ timeout_reset(ibc->to);
+ if (ibc->value_input != NULL) {
+ if (dsync_ibc_stream_read_mail_stream(ibc) == 0)
+ return;
+ }
+ o_stream_cork(ibc->output);
+ ibc->ibc.io_callback(ibc->ibc.io_context);
+ o_stream_uncork(ibc->output);
+}
+
+static int dsync_ibc_stream_send_value_stream(struct dsync_ibc_stream *ibc)
+{
+ const unsigned char *data;
+ unsigned char add;
+ size_t i, size;
+ int ret;
+
+ while ((ret = i_stream_read_more(ibc->value_output, &data, &size)) > 0) {
+ add = '\0';
+ for (i = 0; i < size; i++) {
+ if (data[i] == '.' &&
+ ((i == 0 && ibc->value_output_last == '\n') ||
+ (i > 0 && data[i-1] == '\n'))) {
+ /* escape the dot */
+ add = '.';
+ break;
+ }
+ }
+
+ if (i > 0) {
+ o_stream_nsend(ibc->output, data, i);
+ ibc->value_output_last = data[i-1];
+ i_stream_skip(ibc->value_output, i);
+ }
+
+ if (o_stream_get_buffer_used_size(ibc->output) >= 4096) {
+ if ((ret = o_stream_flush(ibc->output)) < 0) {
+ dsync_ibc_stream_stop(ibc);
+ return -1;
+ }
+ if (ret == 0) {
+ /* continue later */
+ o_stream_set_flush_pending(ibc->output, TRUE);
+ return 0;
+ }
+ }
+
+ if (add != '\0') {
+ o_stream_nsend(ibc->output, &add, 1);
+ ibc->value_output_last = add;
+ }
+ }
+ i_assert(ret == -1);
+
+ if (ibc->value_output->stream_errno != 0) {
+ i_error("dsync(%s): read(%s) failed: %s (%s)",
+ ibc->name, i_stream_get_name(ibc->value_output),
+ i_stream_get_error(ibc->value_output),
+ dsync_ibc_stream_get_state(ibc));
+ dsync_ibc_stream_stop(ibc);
+ return -1;
+ }
+
+ /* finished sending the stream. use "CRLF." instead of "LF." just in
+ case we're sending binary data that ends with CR. */
+ o_stream_nsend_str(ibc->output, "\r\n.\r\n");
+ i_stream_unref(&ibc->value_output);
+ return 1;
+}
+
+static int dsync_ibc_stream_output(struct dsync_ibc_stream *ibc)
+{
+ struct ostream *output = ibc->output;
+ int ret;
+
+ if ((ret = o_stream_flush(output)) < 0)
+ ret = 1;
+ else if (ibc->value_output != NULL) {
+ if (dsync_ibc_stream_send_value_stream(ibc) < 0)
+ ret = 1;
+ }
+ timeout_reset(ibc->to);
+
+ if (!dsync_ibc_is_send_queue_full(&ibc->ibc))
+ ibc->ibc.io_callback(ibc->ibc.io_context);
+ return ret;
+}
+
+static void dsync_ibc_stream_timeout(struct dsync_ibc_stream *ibc)
+{
+ i_error("dsync(%s): I/O has stalled, no activity for %u seconds (%s)",
+ ibc->name, ibc->timeout_secs, dsync_ibc_stream_get_state(ibc));
+ ibc->ibc.timeout = TRUE;
+ dsync_ibc_stream_stop(ibc);
+}
+
+static void dsync_ibc_stream_init(struct dsync_ibc_stream *ibc)
+{
+ unsigned int i;
+
+ ibc->io = io_add_istream(ibc->input, dsync_ibc_stream_input, ibc);
+ io_set_pending(ibc->io);
+ o_stream_set_no_error_handling(ibc->output, TRUE);
+ o_stream_set_flush_callback(ibc->output, dsync_ibc_stream_output, ibc);
+ ibc->to = timeout_add(ibc->timeout_secs * 1000,
+ dsync_ibc_stream_timeout, ibc);
+ o_stream_cork(ibc->output);
+ o_stream_nsend_str(ibc->output, DSYNC_HANDSHAKE_VERSION);
+
+ /* initialize serializers and send their headers to remote */
+ for (i = ITEM_DONE + 1; i < ITEM_END_OF_LIST; i++) T_BEGIN {
+ const char *keys;
+
+ keys = items[i].required_keys == NULL ? items[i].optional_keys :
+ t_strconcat(items[i].required_keys, " ",
+ items[i].optional_keys, NULL);
+ if (keys != NULL) {
+ i_assert(items[i].chr != '\0');
+
+ ibc->serializers[i] =
+ dsync_serializer_init(t_strsplit_spaces(keys, " "));
+ o_stream_nsend(ibc->output, &items[i].chr, 1);
+ o_stream_nsend_str(ibc->output,
+ dsync_serializer_encode_header_line(ibc->serializers[i]));
+ }
+ } T_END;
+ o_stream_nsend_str(ibc->output, ".\n");
+ o_stream_uncork(ibc->output);
+}
+
+static void dsync_ibc_stream_deinit(struct dsync_ibc *_ibc)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ unsigned int i;
+
+ for (i = ITEM_DONE + 1; i < ITEM_END_OF_LIST; i++) {
+ if (ibc->serializers[i] != NULL)
+ dsync_serializer_deinit(&ibc->serializers[i]);
+ if (ibc->deserializers[i] != NULL)
+ dsync_deserializer_deinit(&ibc->deserializers[i]);
+ }
+ if (ibc->cur_decoder != NULL)
+ dsync_deserializer_decode_finish(&ibc->cur_decoder);
+ if (ibc->value_output != NULL)
+ i_stream_unref(&ibc->value_output);
+ else {
+ /* If the remote has not told us that they are closing we
+ notify remote that we're closing. this is mainly to avoid
+ "read() failed: EOF" errors on failing dsyncs.
+
+ Avoid a race condition:
+ We do not tell the remote we are closing if they have
+ already told us because they close the
+ connection after sending ITEM_DONE and will
+ not be ever receive anything else from us unless
+ it just happens to get combined into the same packet
+ as a previous response and is already in the buffer.
+ */
+ if (!ibc->done_received && !ibc->finish_received) {
+ o_stream_nsend_str(ibc->output,
+ t_strdup_printf("%c\n", items[ITEM_DONE].chr));
+ }
+ (void)o_stream_finish(ibc->output);
+ }
+
+ timeout_remove(&ibc->to);
+ io_remove(&ibc->io);
+ i_stream_destroy(&ibc->input);
+ o_stream_destroy(&ibc->output);
+ pool_unref(&ibc->ret_pool);
+ i_free(ibc->temp_path_prefix);
+ i_free(ibc->name);
+ i_free(ibc);
+}
+
+static int dsync_ibc_stream_next_line(struct dsync_ibc_stream *ibc,
+ const char **line_r)
+{
+ string_t *error;
+ const char *line;
+ ssize_t ret;
+
+ line = i_stream_next_line(ibc->input);
+ if (line != NULL) {
+ *line_r = line;
+ return 1;
+ }
+ /* try reading some */
+ if ((ret = i_stream_read(ibc->input)) == -1) {
+ if (ibc->stopped)
+ return -1;
+ error = t_str_new(128);
+ if (ibc->input->stream_errno != 0) {
+ str_printfa(error, "read(%s) failed: %s", ibc->name,
+ i_stream_get_error(ibc->input));
+ } else {
+ i_assert(ibc->input->eof);
+ str_printfa(error, "read(%s) failed: EOF", ibc->name);
+ }
+ str_printfa(error, " (%s)", dsync_ibc_stream_get_state(ibc));
+ i_error("%s", str_c(error));
+ dsync_ibc_stream_stop(ibc);
+ return -1;
+ }
+ i_assert(ret >= 0);
+ *line_r = i_stream_next_line(ibc->input);
+ if (*line_r == NULL) {
+ ibc->has_pending_data = FALSE;
+ return 0;
+ }
+ ibc->has_pending_data = TRUE;
+ return 1;
+}
+
+static void ATTR_FORMAT(3, 4) ATTR_NULL(2)
+dsync_ibc_input_error(struct dsync_ibc_stream *ibc,
+ struct dsync_deserializer_decoder *decoder,
+ const char *fmt, ...)
+{
+ va_list args;
+ const char *error;
+
+ va_start(args, fmt);
+ error = t_strdup_vprintf(fmt, args);
+ if (decoder == NULL)
+ i_error("dsync(%s): %s", ibc->name, error);
+ else {
+ i_error("dsync(%s): %s: %s", ibc->name,
+ dsync_deserializer_decoder_get_name(decoder), error);
+ }
+ va_end(args);
+
+ dsync_ibc_stream_stop(ibc);
+}
+
+static void
+dsync_ibc_stream_send_string(struct dsync_ibc_stream *ibc,
+ const string_t *str)
+{
+ i_assert(ibc->value_output == NULL);
+ o_stream_nsend(ibc->output, str_data(str), str_len(str));
+}
+
+static int seekable_fd_callback(const char **path_r, void *context)
+{
+ struct dsync_ibc_stream *ibc = context;
+ string_t *path;
+ int fd;
+
+ path = t_str_new(128);
+ str_append(path, ibc->temp_path_prefix);
+ fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1);
+ if (fd == -1) {
+ i_error("safe_mkstemp(%s) failed: %m", str_c(path));
+ return -1;
+ }
+
+ /* we just want the fd, unlink it */
+ if (i_unlink(str_c(path)) < 0) {
+ /* shouldn't happen.. */
+ i_close_fd(&fd);
+ return -1;
+ }
+
+ *path_r = str_c(path);
+ return fd;
+}
+
+static struct istream *
+dsync_ibc_stream_input_stream(struct dsync_ibc_stream *ibc)
+{
+ struct istream *inputs[2];
+
+ inputs[0] = i_stream_create_dot(ibc->input, FALSE);
+ inputs[1] = NULL;
+ ibc->value_input = i_stream_create_seekable(inputs, MAIL_READ_FULL_BLOCK_SIZE,
+ seekable_fd_callback, ibc);
+ i_stream_unref(&inputs[0]);
+
+ return ibc->value_input;
+}
+
+static int
+dsync_ibc_check_missing_deserializers(struct dsync_ibc_stream *ibc)
+{
+ unsigned int i;
+ int ret = 0;
+
+ for (i = ITEM_DONE + 1; i < ITEM_END_OF_LIST; i++) {
+ if (ibc->deserializers[i] == NULL &&
+ ibc->minor_version >= items[i].min_minor_version &&
+ (items[i].required_keys != NULL ||
+ items[i].optional_keys != NULL)) {
+ dsync_ibc_input_error(ibc, NULL,
+ "Remote didn't handshake deserializer for %s",
+ items[i].name);
+ ret = -1;
+ }
+ }
+ return ret;
+}
+
+static bool
+dsync_ibc_stream_handshake(struct dsync_ibc_stream *ibc, const char *line)
+{
+ enum item_type item = ITEM_NONE;
+ const char *const *required_keys, *error;
+ unsigned int i;
+
+ if (ibc->handshake_received)
+ return TRUE;
+
+ if (!ibc->version_received) {
+ if (!version_string_verify_full(line, "dsync",
+ DSYNC_PROTOCOL_VERSION_MAJOR,
+ &ibc->minor_version)) {
+ dsync_ibc_input_error(ibc, NULL,
+ "Remote dsync doesn't use compatible protocol");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ ibc->version_received = TRUE;
+ return FALSE;
+ }
+
+ if (strcmp(line, END_OF_LIST_LINE) == 0) {
+ /* finished handshaking */
+ if (dsync_ibc_check_missing_deserializers(ibc) < 0)
+ return FALSE;
+ ibc->handshake_received = TRUE;
+ ibc->last_recv_item = ITEM_HANDSHAKE;
+ return FALSE;
+ }
+
+ for (i = 1; i < ITEM_END_OF_LIST; i++) {
+ if (items[i].chr == line[0]) {
+ item = i;
+ break;
+ }
+ }
+ if (item == ITEM_NONE) {
+ /* unknown deserializer, ignore */
+ return FALSE;
+ }
+
+ required_keys = items[item].required_keys == NULL ? NULL :
+ t_strsplit(items[item].required_keys, " ");
+ if (dsync_deserializer_init(items[item].name,
+ required_keys, line + 1,
+ &ibc->deserializers[item], &error) < 0) {
+ dsync_ibc_input_error(ibc, NULL,
+ "Remote sent invalid handshake for %s: %s",
+ items[item].name, error);
+ }
+ return FALSE;
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_input_next(struct dsync_ibc_stream *ibc, enum item_type item,
+ struct dsync_deserializer_decoder **decoder_r)
+{
+ enum item_type line_item = ITEM_NONE;
+ const char *line, *error;
+ unsigned int i;
+
+ i_assert(ibc->value_input == NULL);
+
+ timeout_reset(ibc->to);
+
+ do {
+ if (dsync_ibc_stream_next_line(ibc, &line) <= 0)
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ } while (!dsync_ibc_stream_handshake(ibc, line));
+
+ ibc->last_recv_item = item;
+ ibc->last_recv_item_eol = FALSE;
+
+ if (strcmp(line, END_OF_LIST_LINE) == 0) {
+ /* end of this list */
+ ibc->last_recv_item_eol = TRUE;
+ return DSYNC_IBC_RECV_RET_FINISHED;
+ }
+ if (line[0] == items[ITEM_DONE].chr) {
+ /* remote cleanly closed the connection, possibly because of
+ some failure (which it should have logged). we don't want to
+ log any stream errors anyway after this. */
+ ibc->done_received = TRUE;
+ dsync_ibc_stream_stop(ibc);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+
+ }
+ for (i = 1; i < ITEM_END_OF_LIST; i++) {
+ if (*line == items[i].chr) {
+ line_item = i;
+ break;
+ }
+ }
+ if (line_item != item) {
+ dsync_ibc_input_error(ibc, NULL,
+ "Received unexpected input %c != %c",
+ *line, items[item].chr);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ if (ibc->cur_decoder != NULL)
+ dsync_deserializer_decode_finish(&ibc->cur_decoder);
+ if (dsync_deserializer_decode_begin(ibc->deserializers[item],
+ line+1, &ibc->cur_decoder,
+ &error) < 0) {
+ dsync_ibc_input_error(ibc, NULL, "Invalid input to %s: %s",
+ items[item].name, error);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ *decoder_r = ibc->cur_decoder;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static struct dsync_serializer_encoder *
+dsync_ibc_send_encode_begin(struct dsync_ibc_stream *ibc, enum item_type item)
+{
+ ibc->last_sent_item = item;
+ ibc->last_sent_item_eol = FALSE;
+ return dsync_serializer_encode_begin(ibc->serializers[item]);
+}
+
+static void
+dsync_ibc_stream_send_handshake(struct dsync_ibc *_ibc,
+ const struct dsync_ibc_settings *set)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str = t_str_new(128);
+ char sync_type[2];
+
+ str_append_c(str, items[ITEM_HANDSHAKE].chr);
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_HANDSHAKE);
+ dsync_serializer_encode_add(encoder, "hostname", set->hostname);
+ if (set->sync_ns_prefixes != NULL) {
+ dsync_serializer_encode_add(encoder, "sync_ns_prefix",
+ set->sync_ns_prefixes);
+ }
+ if (set->sync_box != NULL)
+ dsync_serializer_encode_add(encoder, "sync_box", set->sync_box);
+ if (set->virtual_all_box != NULL) {
+ dsync_serializer_encode_add(encoder, "virtual_all_box",
+ set->virtual_all_box);
+ }
+ if (set->exclude_mailboxes != NULL) {
+ string_t *substr = t_str_new(64);
+ unsigned int i;
+
+ for (i = 0; set->exclude_mailboxes[i] != NULL; i++) {
+ if (i != 0)
+ str_append_c(substr, '\t');
+ str_append_tabescaped(substr, set->exclude_mailboxes[i]);
+ }
+ dsync_serializer_encode_add(encoder, "exclude_mailboxes",
+ str_c(substr));
+ }
+ if (!guid_128_is_empty(set->sync_box_guid)) {
+ dsync_serializer_encode_add(encoder, "sync_box_guid",
+ guid_128_to_string(set->sync_box_guid));
+ }
+
+ sync_type[0] = sync_type[1] = '\0';
+ switch (set->sync_type) {
+ case DSYNC_BRAIN_SYNC_TYPE_UNKNOWN:
+ break;
+ case DSYNC_BRAIN_SYNC_TYPE_FULL:
+ sync_type[0] = 'f';
+ break;
+ case DSYNC_BRAIN_SYNC_TYPE_CHANGED:
+ sync_type[0] = 'c';
+ break;
+ case DSYNC_BRAIN_SYNC_TYPE_STATE:
+ sync_type[0] = 's';
+ break;
+ }
+ if (sync_type[0] != '\0')
+ dsync_serializer_encode_add(encoder, "sync_type", sync_type);
+ if (set->lock_timeout > 0) {
+ dsync_serializer_encode_add(encoder, "lock_timeout",
+ t_strdup_printf("%u", set->lock_timeout));
+ }
+ if (set->import_commit_msgs_interval > 0) {
+ dsync_serializer_encode_add(encoder, "import_commit_msgs_interval",
+ t_strdup_printf("%u", set->import_commit_msgs_interval));
+ }
+ if (set->sync_since_timestamp > 0) {
+ dsync_serializer_encode_add(encoder, "sync_since_timestamp",
+ t_strdup_printf("%ld", (long)set->sync_since_timestamp));
+ }
+ if (set->sync_until_timestamp > 0) {
+ dsync_serializer_encode_add(encoder, "sync_until_timestamp",
+ t_strdup_printf("%ld", (long)set->sync_since_timestamp));
+ }
+ if (set->sync_max_size > 0) {
+ dsync_serializer_encode_add(encoder, "sync_max_size",
+ t_strdup_printf("%"PRIu64, set->sync_max_size));
+ }
+ if (set->sync_flags != NULL) {
+ dsync_serializer_encode_add(encoder, "sync_flags",
+ set->sync_flags);
+ }
+ if (set->alt_char != '\0') {
+ dsync_serializer_encode_add(encoder, "alt_char",
+ t_strdup_printf("%c", set->alt_char));
+ }
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_SEND_MAIL_REQUESTS) != 0)
+ dsync_serializer_encode_add(encoder, "send_mail_requests", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_BACKUP_SEND) != 0)
+ dsync_serializer_encode_add(encoder, "backup_send", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_BACKUP_RECV) != 0)
+ dsync_serializer_encode_add(encoder, "backup_recv", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_DEBUG) != 0)
+ dsync_serializer_encode_add(encoder, "debug", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_SYNC_VISIBLE_NAMESPACES) != 0)
+ dsync_serializer_encode_add(encoder, "sync_visible_namespaces", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_NO_MAIL_SYNC) != 0)
+ dsync_serializer_encode_add(encoder, "no_mail_sync", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_NO_BACKUP_OVERWRITE) != 0)
+ dsync_serializer_encode_add(encoder, "no_backup_overwrite", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_PURGE_REMOTE) != 0)
+ dsync_serializer_encode_add(encoder, "purge_remote", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_NO_NOTIFY) != 0)
+ dsync_serializer_encode_add(encoder, "no_notify", "");
+ if ((set->brain_flags & DSYNC_BRAIN_FLAG_EMPTY_HDR_WORKAROUND) != 0)
+ dsync_serializer_encode_add(encoder, "empty_hdr_workaround", "");
+ /* this can be NULL in slave */
+ string_t *str2 = t_str_new(32);
+ if (set->hashed_headers != NULL) {
+ for(const char *const *ptr = set->hashed_headers; *ptr != NULL; ptr++) {
+ str_append_tabescaped(str2, *ptr);
+ str_append_c(str2, '\t');
+ }
+ }
+ dsync_serializer_encode_add(encoder, "hashed_headers", str_c(str2));
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_handshake(struct dsync_ibc *_ibc,
+ const struct dsync_ibc_settings **set_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_deserializer_decoder *decoder;
+ struct dsync_ibc_settings *set;
+ const char *value;
+ pool_t pool = ibc->ret_pool;
+ enum dsync_ibc_recv_ret ret;
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_HANDSHAKE, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK) {
+ if (ret != DSYNC_IBC_RECV_RET_TRYAGAIN) {
+ i_error("dsync(%s): Unexpected input in handshake",
+ ibc->name);
+ dsync_ibc_stream_stop(ibc);
+ }
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ p_clear(pool);
+ set = p_new(pool, struct dsync_ibc_settings, 1);
+
+ value = dsync_deserializer_decode_get(decoder, "hostname");
+ set->hostname = p_strdup(pool, value);
+ /* now that we know the remote's hostname, use it for the
+ stream's name */
+ i_free(ibc->name);
+ ibc->name = i_strdup(set->hostname);
+
+ if (dsync_deserializer_decode_try(decoder, "sync_ns_prefix", &value))
+ set->sync_ns_prefixes = p_strdup(pool, value);
+ if (dsync_deserializer_decode_try(decoder, "sync_box", &value))
+ set->sync_box = p_strdup(pool, value);
+ if (dsync_deserializer_decode_try(decoder, "virtual_all_box", &value))
+ set->virtual_all_box = p_strdup(pool, value);
+ if (dsync_deserializer_decode_try(decoder, "sync_box_guid", &value) &&
+ guid_128_from_string(value, set->sync_box_guid) < 0) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid sync_box_guid: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "exclude_mailboxes", &value) &&
+ *value != '\0') {
+ char **boxes = p_strsplit_tabescaped(pool, value);
+ set->exclude_mailboxes = (const void *)boxes;
+ }
+ if (dsync_deserializer_decode_try(decoder, "sync_type", &value)) {
+ switch (value[0]) {
+ case 'f':
+ set->sync_type = DSYNC_BRAIN_SYNC_TYPE_FULL;
+ break;
+ case 'c':
+ set->sync_type = DSYNC_BRAIN_SYNC_TYPE_CHANGED;
+ break;
+ case 's':
+ set->sync_type = DSYNC_BRAIN_SYNC_TYPE_STATE;
+ break;
+ default:
+ dsync_ibc_input_error(ibc, decoder,
+ "Unknown sync_type: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ }
+ if (dsync_deserializer_decode_try(decoder, "lock_timeout", &value)) {
+ if (str_to_uint(value, &set->lock_timeout) < 0 ||
+ set->lock_timeout == 0) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid lock_timeout: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ }
+ if (dsync_deserializer_decode_try(decoder, "import_commit_msgs_interval", &value)) {
+ if (str_to_uint(value, &set->import_commit_msgs_interval) < 0 ||
+ set->import_commit_msgs_interval == 0) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid import_commit_msgs_interval: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ }
+ if (dsync_deserializer_decode_try(decoder, "sync_since_timestamp", &value)) {
+ if (str_to_time(value, &set->sync_since_timestamp) < 0 ||
+ set->sync_since_timestamp == 0) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid sync_since_timestamp: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ }
+ if (dsync_deserializer_decode_try(decoder, "sync_until_timestamp", &value)) {
+ if (str_to_time(value, &set->sync_until_timestamp) < 0 ||
+ set->sync_until_timestamp == 0) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid sync_until_timestamp: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ }
+ if (dsync_deserializer_decode_try(decoder, "sync_max_size", &value)) {
+ if (str_to_uoff(value, &set->sync_max_size) < 0 ||
+ set->sync_max_size == 0) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid sync_max_size: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ }
+ if (dsync_deserializer_decode_try(decoder, "sync_flags", &value))
+ set->sync_flags = p_strdup(pool, value);
+ if (dsync_deserializer_decode_try(decoder, "alt_char", &value))
+ set->alt_char = value[0];
+ if (dsync_deserializer_decode_try(decoder, "send_mail_requests", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_SEND_MAIL_REQUESTS;
+ if (dsync_deserializer_decode_try(decoder, "backup_send", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_BACKUP_SEND;
+ if (dsync_deserializer_decode_try(decoder, "backup_recv", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_BACKUP_RECV;
+ if (dsync_deserializer_decode_try(decoder, "debug", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_DEBUG;
+ if (dsync_deserializer_decode_try(decoder, "sync_visible_namespaces", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_SYNC_VISIBLE_NAMESPACES;
+ if (dsync_deserializer_decode_try(decoder, "no_mail_sync", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_NO_MAIL_SYNC;
+ if (dsync_deserializer_decode_try(decoder, "no_backup_overwrite", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_NO_BACKUP_OVERWRITE;
+ if (dsync_deserializer_decode_try(decoder, "purge_remote", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_PURGE_REMOTE;
+ if (dsync_deserializer_decode_try(decoder, "no_notify", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_NO_NOTIFY;
+ if (dsync_deserializer_decode_try(decoder, "empty_hdr_workaround", &value))
+ set->brain_flags |= DSYNC_BRAIN_FLAG_EMPTY_HDR_WORKAROUND;
+ if (dsync_deserializer_decode_try(decoder, "hashed_headers", &value))
+ set->hashed_headers = (const char*const*)p_strsplit_tabescaped(pool, value);
+ set->hdr_hash_v2 = ibc->minor_version >= DSYNC_PROTOCOL_MINOR_HAVE_HDR_HASH_V2;
+ set->hdr_hash_v3 = ibc->minor_version >= DSYNC_PROTOCOL_MINOR_HAVE_HDR_HASH_V3;
+
+ *set_r = set;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void
+dsync_ibc_stream_send_end_of_list(struct dsync_ibc *_ibc,
+ enum dsync_ibc_eol_type type)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+
+ i_assert(ibc->value_output == NULL);
+
+ switch (type) {
+ case DSYNC_IBC_EOL_MAILBOX_ATTRIBUTE:
+ if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES)
+ return;
+ break;
+ default:
+ break;
+ }
+
+ ibc->last_sent_item_eol = TRUE;
+ o_stream_nsend_str(ibc->output, END_OF_LIST_LINE"\n");
+}
+
+static void
+dsync_ibc_stream_send_mailbox_state(struct dsync_ibc *_ibc,
+ const struct dsync_mailbox_state *state)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str = t_str_new(128);
+
+ str_append_c(str, items[ITEM_MAILBOX_STATE].chr);
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX_STATE);
+ dsync_serializer_encode_add(encoder, "mailbox_guid",
+ guid_128_to_string(state->mailbox_guid));
+ dsync_serializer_encode_add(encoder, "last_uidvalidity",
+ dec2str(state->last_uidvalidity));
+ dsync_serializer_encode_add(encoder, "last_common_uid",
+ dec2str(state->last_common_uid));
+ dsync_serializer_encode_add(encoder, "last_common_modseq",
+ dec2str(state->last_common_modseq));
+ dsync_serializer_encode_add(encoder, "last_common_pvt_modseq",
+ dec2str(state->last_common_pvt_modseq));
+ dsync_serializer_encode_add(encoder, "last_messages_count",
+ dec2str(state->last_messages_count));
+ if (state->changes_during_sync)
+ dsync_serializer_encode_add(encoder, "changes_during_sync", "");
+
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_mailbox_state(struct dsync_ibc *_ibc,
+ struct dsync_mailbox_state *state_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_deserializer_decoder *decoder;
+ const char *value;
+ enum dsync_ibc_recv_ret ret;
+
+ i_zero(state_r);
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_STATE, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ value = dsync_deserializer_decode_get(decoder, "mailbox_guid");
+ if (guid_128_from_string(value, state_r->mailbox_guid) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "last_uidvalidity");
+ if (str_to_uint32(value, &state_r->last_uidvalidity) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_uidvalidity");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "last_common_uid");
+ if (str_to_uint32(value, &state_r->last_common_uid) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_common_uid");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "last_common_modseq");
+ if (str_to_uint64(value, &state_r->last_common_modseq) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_common_modseq");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "last_common_pvt_modseq");
+ if (str_to_uint64(value, &state_r->last_common_pvt_modseq) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_common_pvt_modseq");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "last_messages_count", &value) &&
+ str_to_uint32(value, &state_r->last_messages_count) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_messages_count");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "changes_during_sync", &value))
+ state_r->changes_during_sync = TRUE;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void
+dsync_ibc_stream_send_mailbox_tree_node(struct dsync_ibc *_ibc,
+ const char *const *name,
+ const struct dsync_mailbox_node *node)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str, *namestr;
+
+ i_assert(*name != NULL);
+
+ str = t_str_new(128);
+ str_append_c(str, items[ITEM_MAILBOX_TREE_NODE].chr);
+
+ /* convert all hierarchy separators to tabs. mailbox names really
+ aren't supposed to have any tabs, but escape them anyway if there
+ are. */
+ namestr = t_str_new(128);
+ for (; *name != NULL; name++) {
+ str_append_tabescaped(namestr, *name);
+ str_append_c(namestr, '\t');
+ }
+ str_truncate(namestr, str_len(namestr)-1);
+
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX_TREE_NODE);
+ dsync_serializer_encode_add(encoder, "name", str_c(namestr));
+ switch (node->existence) {
+ case DSYNC_MAILBOX_NODE_NONEXISTENT:
+ dsync_serializer_encode_add(encoder, "existence", "n");
+ break;
+ case DSYNC_MAILBOX_NODE_EXISTS:
+ dsync_serializer_encode_add(encoder, "existence", "y");
+ break;
+ case DSYNC_MAILBOX_NODE_DELETED:
+ dsync_serializer_encode_add(encoder, "existence", "d");
+ break;
+ }
+
+ if (!guid_128_is_empty(node->mailbox_guid)) {
+ dsync_serializer_encode_add(encoder, "mailbox_guid",
+ guid_128_to_string(node->mailbox_guid));
+ }
+ if (node->uid_validity != 0) {
+ dsync_serializer_encode_add(encoder, "uid_validity",
+ dec2str(node->uid_validity));
+ }
+ if (node->uid_next != 0) {
+ dsync_serializer_encode_add(encoder, "uid_next",
+ dec2str(node->uid_next));
+ }
+ if (node->last_renamed_or_created != 0) {
+ dsync_serializer_encode_add(encoder, "last_renamed_or_created",
+ dec2str(node->last_renamed_or_created));
+ }
+ if (node->last_subscription_change != 0) {
+ dsync_serializer_encode_add(encoder, "last_subscription_change",
+ dec2str(node->last_subscription_change));
+ }
+ if (node->subscribed)
+ dsync_serializer_encode_add(encoder, "subscribed", "");
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_mailbox_tree_node(struct dsync_ibc *_ibc,
+ const char *const **name_r,
+ const struct dsync_mailbox_node **node_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_deserializer_decoder *decoder;
+ struct dsync_mailbox_node *node;
+ const char *value;
+ enum dsync_ibc_recv_ret ret;
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_TREE_NODE, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ p_clear(ibc->ret_pool);
+ node = p_new(ibc->ret_pool, struct dsync_mailbox_node, 1);
+
+ value = dsync_deserializer_decode_get(decoder, "name");
+ if (*value == '\0') {
+ dsync_ibc_input_error(ibc, decoder, "Empty name");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ *name_r = (void *)p_strsplit_tabescaped(ibc->ret_pool, value);
+
+ value = dsync_deserializer_decode_get(decoder, "existence");
+ switch (*value) {
+ case 'n':
+ node->existence = DSYNC_MAILBOX_NODE_NONEXISTENT;
+ break;
+ case 'y':
+ node->existence = DSYNC_MAILBOX_NODE_EXISTS;
+ break;
+ case 'd':
+ node->existence = DSYNC_MAILBOX_NODE_DELETED;
+ break;
+ }
+
+ if (dsync_deserializer_decode_try(decoder, "mailbox_guid", &value) &&
+ guid_128_from_string(value, node->mailbox_guid) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "uid_validity", &value) &&
+ str_to_uint32(value, &node->uid_validity) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid uid_validity");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "uid_next", &value) &&
+ str_to_uint32(value, &node->uid_next) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid uid_next");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "last_renamed_or_created", &value) &&
+ str_to_time(value, &node->last_renamed_or_created) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_renamed_or_created");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "last_subscription_change", &value) &&
+ str_to_time(value, &node->last_subscription_change) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_subscription_change");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "subscribed", &value))
+ node->subscribed = TRUE;
+
+ *node_r = node;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void
+dsync_ibc_stream_encode_delete(string_t *str,
+ struct dsync_serializer_encoder *encoder,
+ const struct dsync_mailbox_delete *deletes,
+ unsigned int count, const char *key,
+ enum dsync_mailbox_delete_type type)
+{
+ unsigned int i;
+
+ str_truncate(str, 0);
+ for (i = 0; i < count; i++) {
+ if (deletes[i].type == type) {
+ str_append(str, guid_128_to_string(deletes[i].guid));
+ str_printfa(str, " %ld ", (long)deletes[i].timestamp);
+ }
+ }
+ if (str_len(str) > 0) {
+ str_truncate(str, str_len(str)-1);
+ dsync_serializer_encode_add(encoder, key, str_c(str));
+ }
+}
+
+static void
+dsync_ibc_stream_send_mailbox_deletes(struct dsync_ibc *_ibc,
+ const struct dsync_mailbox_delete *deletes,
+ unsigned int count, char hierarchy_sep,
+ char escape_char)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str, *substr;
+ char sep[2];
+
+ str = t_str_new(128);
+ str_append_c(str, items[ITEM_MAILBOX_DELETE].chr);
+
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX_DELETE);
+ sep[0] = hierarchy_sep; sep[1] = '\0';
+ dsync_serializer_encode_add(encoder, "hierarchy_sep", sep);
+ sep[0] = escape_char; sep[1] = '\0';
+ dsync_serializer_encode_add(encoder, "escape_char", sep);
+
+ substr = t_str_new(128);
+ dsync_ibc_stream_encode_delete(substr, encoder, deletes, count,
+ "mailboxes",
+ DSYNC_MAILBOX_DELETE_TYPE_MAILBOX);
+ dsync_ibc_stream_encode_delete(substr, encoder, deletes, count,
+ "dirs",
+ DSYNC_MAILBOX_DELETE_TYPE_DIR);
+ dsync_ibc_stream_encode_delete(substr, encoder, deletes, count,
+ "unsubscribes",
+ DSYNC_MAILBOX_DELETE_TYPE_UNSUBSCRIBE);
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+}
+
+ARRAY_DEFINE_TYPE(dsync_mailbox_delete, struct dsync_mailbox_delete);
+static int
+decode_mailbox_deletes(ARRAY_TYPE(dsync_mailbox_delete) *deletes,
+ const char *value, enum dsync_mailbox_delete_type type)
+{
+ struct dsync_mailbox_delete *del;
+ const char *const *tmp;
+ unsigned int i;
+
+ tmp = t_strsplit(value, " ");
+ for (i = 0; tmp[i] != NULL; i += 2) {
+ del = array_append_space(deletes);
+ del->type = type;
+ if (guid_128_from_string(tmp[i], del->guid) < 0)
+ return -1;
+ if (tmp[i+1] == NULL ||
+ str_to_time(tmp[i+1], &del->timestamp) < 0)
+ return -1;
+ }
+ return 0;
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_mailbox_deletes(struct dsync_ibc *_ibc,
+ const struct dsync_mailbox_delete **deletes_r,
+ unsigned int *count_r, char *hierarchy_sep_r,
+ char *escape_char_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_deserializer_decoder *decoder;
+ ARRAY_TYPE(dsync_mailbox_delete) deletes;
+ const char *value;
+ enum dsync_ibc_recv_ret ret;
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_DELETE, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ p_clear(ibc->ret_pool);
+ p_array_init(&deletes, ibc->ret_pool, 16);
+
+ value = dsync_deserializer_decode_get(decoder, "hierarchy_sep");
+ if (strlen(value) != 1) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid hierarchy_sep");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ *hierarchy_sep_r = value[0];
+
+ if (!dsync_deserializer_decode_try(decoder, "escape_char", &value))
+ *escape_char_r = '\0';
+ else {
+ if (strlen(value) > 1) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid escape_char '%s'", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ *escape_char_r = value[0];
+ }
+
+ if (dsync_deserializer_decode_try(decoder, "mailboxes", &value) &&
+ decode_mailbox_deletes(&deletes, value,
+ DSYNC_MAILBOX_DELETE_TYPE_MAILBOX) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid mailboxes");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "dirs", &value) &&
+ decode_mailbox_deletes(&deletes, value,
+ DSYNC_MAILBOX_DELETE_TYPE_DIR) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid dirs");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "unsubscribes", &value) &&
+ decode_mailbox_deletes(&deletes, value,
+ DSYNC_MAILBOX_DELETE_TYPE_UNSUBSCRIBE) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid dirs");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ *deletes_r = array_get(&deletes, count_r);
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static const char *
+get_cache_fields(struct dsync_ibc_stream *ibc,
+ const struct dsync_mailbox *dsync_box)
+{
+ struct dsync_serializer_encoder *encoder;
+ string_t *str;
+ const struct mailbox_cache_field *cache_fields;
+ unsigned int i, count;
+ char decision[3];
+
+ cache_fields = array_get(&dsync_box->cache_fields, &count);
+ if (count == 0)
+ return "";
+
+ str = t_str_new(128);
+ for (i = 0; i < count; i++) {
+ const struct mailbox_cache_field *field = &cache_fields[i];
+
+ encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAILBOX_CACHE_FIELD]);
+ dsync_serializer_encode_add(encoder, "name", field->name);
+
+ memset(decision, 0, sizeof(decision));
+ switch (field->decision & ENUM_NEGATE(MAIL_CACHE_DECISION_FORCED)) {
+ case MAIL_CACHE_DECISION_NO:
+ decision[0] = 'n';
+ break;
+ case MAIL_CACHE_DECISION_TEMP:
+ decision[0] = 't';
+ break;
+ case MAIL_CACHE_DECISION_YES:
+ decision[0] = 'y';
+ break;
+ }
+ i_assert(decision[0] != '\0');
+ if ((field->decision & MAIL_CACHE_DECISION_FORCED) != 0)
+ decision[1] = 'F';
+ dsync_serializer_encode_add(encoder, "decision", decision);
+ if (field->last_used != 0) {
+ dsync_serializer_encode_add(encoder, "last_used",
+ dec2str(field->last_used));
+ }
+ dsync_serializer_encode_finish(&encoder, str);
+ }
+ if (i > 0) {
+ /* remove the trailing LF */
+ str_truncate(str, str_len(str)-1);
+ }
+ return str_c(str);
+}
+
+static void
+dsync_ibc_stream_send_mailbox(struct dsync_ibc *_ibc,
+ const struct dsync_mailbox *dsync_box)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str = t_str_new(128);
+ const char *value;
+
+ str_append_c(str, items[ITEM_MAILBOX].chr);
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX);
+ dsync_serializer_encode_add(encoder, "mailbox_guid",
+ guid_128_to_string(dsync_box->mailbox_guid));
+
+ if (dsync_box->mailbox_lost)
+ dsync_serializer_encode_add(encoder, "mailbox_lost", "");
+ if (dsync_box->mailbox_ignore)
+ dsync_serializer_encode_add(encoder, "mailbox_ignore", "");
+ if (dsync_box->have_guids)
+ dsync_serializer_encode_add(encoder, "have_guids", "");
+ if (dsync_box->have_save_guids)
+ dsync_serializer_encode_add(encoder, "have_save_guids", "");
+ if (dsync_box->have_only_guid128)
+ dsync_serializer_encode_add(encoder, "have_only_guid128", "");
+ dsync_serializer_encode_add(encoder, "uid_validity",
+ dec2str(dsync_box->uid_validity));
+ dsync_serializer_encode_add(encoder, "uid_next",
+ dec2str(dsync_box->uid_next));
+ dsync_serializer_encode_add(encoder, "messages_count",
+ dec2str(dsync_box->messages_count));
+ dsync_serializer_encode_add(encoder, "first_recent_uid",
+ dec2str(dsync_box->first_recent_uid));
+ dsync_serializer_encode_add(encoder, "highest_modseq",
+ dec2str(dsync_box->highest_modseq));
+ dsync_serializer_encode_add(encoder, "highest_pvt_modseq",
+ dec2str(dsync_box->highest_pvt_modseq));
+
+ value = get_cache_fields(ibc, dsync_box);
+ if (value != NULL)
+ dsync_serializer_encode_add(encoder, "cache_fields", value);
+
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+}
+
+static int
+parse_cache_field(struct dsync_ibc_stream *ibc, struct dsync_mailbox *box,
+ const char *value)
+{
+ struct dsync_deserializer_decoder *decoder;
+ struct mailbox_cache_field field;
+ const char *error;
+ int ret = 0;
+
+ if (dsync_deserializer_decode_begin(ibc->deserializers[ITEM_MAILBOX_CACHE_FIELD],
+ value, &decoder, &error) < 0) {
+ dsync_ibc_input_error(ibc, NULL,
+ "cache_field: Invalid input: %s", error);
+ return -1;
+ }
+
+ i_zero(&field);
+ value = dsync_deserializer_decode_get(decoder, "name");
+ field.name = p_strdup(ibc->ret_pool, value);
+
+ value = dsync_deserializer_decode_get(decoder, "decision");
+ switch (*value) {
+ case 'n':
+ field.decision = MAIL_CACHE_DECISION_NO;
+ break;
+ case 't':
+ field.decision = MAIL_CACHE_DECISION_TEMP;
+ break;
+ case 'y':
+ field.decision = MAIL_CACHE_DECISION_YES;
+ break;
+ default:
+ dsync_ibc_input_error(ibc, decoder, "Invalid decision: %s",
+ value);
+ ret = -1;
+ break;
+ }
+ if (value[1] == 'F')
+ field.decision |= MAIL_CACHE_DECISION_FORCED;
+
+ if (dsync_deserializer_decode_try(decoder, "last_used", &value) &&
+ str_to_time(value, &field.last_used) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_used");
+ ret = -1;
+ }
+ array_push_back(&box->cache_fields, &field);
+
+ dsync_deserializer_decode_finish(&decoder);
+ return ret;
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_mailbox(struct dsync_ibc *_ibc,
+ const struct dsync_mailbox **dsync_box_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ pool_t pool = ibc->ret_pool;
+ struct dsync_deserializer_decoder *decoder;
+ struct dsync_mailbox *box;
+ const char *value;
+ enum dsync_ibc_recv_ret ret;
+
+ p_clear(pool);
+ box = p_new(pool, struct dsync_mailbox, 1);
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ value = dsync_deserializer_decode_get(decoder, "mailbox_guid");
+ if (guid_128_from_string(value, box->mailbox_guid) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ if (dsync_deserializer_decode_try(decoder, "mailbox_lost", &value))
+ box->mailbox_lost = TRUE;
+ if (dsync_deserializer_decode_try(decoder, "mailbox_ignore", &value))
+ box->mailbox_ignore = TRUE;
+ if (dsync_deserializer_decode_try(decoder, "have_guids", &value))
+ box->have_guids = TRUE;
+ if (dsync_deserializer_decode_try(decoder, "have_save_guids", &value) ||
+ (box->have_guids && ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_SAVE_GUID))
+ box->have_save_guids = TRUE;
+ if (dsync_deserializer_decode_try(decoder, "have_only_guid128", &value))
+ box->have_only_guid128 = TRUE;
+ value = dsync_deserializer_decode_get(decoder, "uid_validity");
+ if (str_to_uint32(value, &box->uid_validity) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid uid_validity");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "uid_next");
+ if (str_to_uint32(value, &box->uid_next) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid uid_next");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "messages_count");
+ if (str_to_uint32(value, &box->messages_count) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid messages_count");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "first_recent_uid");
+ if (str_to_uint32(value, &box->first_recent_uid) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid first_recent_uid");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "highest_modseq");
+ if (str_to_uint64(value, &box->highest_modseq) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid highest_modseq");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ value = dsync_deserializer_decode_get(decoder, "highest_pvt_modseq");
+ if (str_to_uint64(value, &box->highest_pvt_modseq) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid highest_pvt_modseq");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ p_array_init(&box->cache_fields, pool, 32);
+ if (dsync_deserializer_decode_try(decoder, "cache_fields", &value)) {
+ const char *const *fields = t_strsplit(value, "\n");
+ for (; *fields != NULL; fields++) {
+ if (parse_cache_field(ibc, box, *fields) < 0)
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ }
+
+ *dsync_box_r = box;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void
+dsync_ibc_stream_send_mailbox_attribute(struct dsync_ibc *_ibc,
+ const struct dsync_mailbox_attribute *attr)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str = t_str_new(128);
+ char type[2];
+
+ if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES)
+ return;
+
+ str_append_c(str, items[ITEM_MAILBOX_ATTRIBUTE].chr);
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX_ATTRIBUTE);
+
+ type[0] = type[1] = '\0';
+ switch (attr->type) {
+ case MAIL_ATTRIBUTE_TYPE_PRIVATE:
+ type[0] = 'p';
+ break;
+ case MAIL_ATTRIBUTE_TYPE_SHARED:
+ type[0] = 's';
+ break;
+ }
+ i_assert(type[0] != '\0');
+ dsync_serializer_encode_add(encoder, "type", type);
+ dsync_serializer_encode_add(encoder, "key", attr->key);
+ if (attr->value != NULL)
+ dsync_serializer_encode_add(encoder, "value", attr->value);
+ else if (attr->value_stream != NULL)
+ dsync_serializer_encode_add(encoder, "stream", "");
+
+ if (attr->deleted)
+ dsync_serializer_encode_add(encoder, "deleted", "");
+ if (attr->last_change != 0) {
+ dsync_serializer_encode_add(encoder, "last_change",
+ dec2str(attr->last_change));
+ }
+ if (attr->modseq != 0) {
+ dsync_serializer_encode_add(encoder, "modseq",
+ dec2str(attr->modseq));
+ }
+
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+
+ if (attr->value_stream != NULL) {
+ ibc->value_output_last = '\0';
+ ibc->value_output = attr->value_stream;
+ i_stream_ref(ibc->value_output);
+ (void)dsync_ibc_stream_send_value_stream(ibc);
+ }
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_mailbox_attribute(struct dsync_ibc *_ibc,
+ const struct dsync_mailbox_attribute **attr_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ pool_t pool = ibc->ret_pool;
+ struct dsync_deserializer_decoder *decoder;
+ struct dsync_mailbox_attribute *attr;
+ const char *value;
+ enum dsync_ibc_recv_ret ret;
+
+ if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES)
+ return DSYNC_IBC_RECV_RET_FINISHED;
+
+ if (ibc->value_input != NULL) {
+ /* wait until the mail's stream has been read */
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ if (ibc->cur_attr != NULL) {
+ /* finished reading the stream, return the mail now */
+ *attr_r = ibc->cur_attr;
+ ibc->cur_attr = NULL;
+ return DSYNC_IBC_RECV_RET_OK;
+ }
+
+ p_clear(pool);
+ attr = p_new(pool, struct dsync_mailbox_attribute, 1);
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_ATTRIBUTE, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ value = dsync_deserializer_decode_get(decoder, "type");
+ switch (*value) {
+ case 'p':
+ attr->type = MAIL_ATTRIBUTE_TYPE_PRIVATE;
+ break;
+ case 's':
+ attr->type = MAIL_ATTRIBUTE_TYPE_SHARED;
+ break;
+ default:
+ dsync_ibc_input_error(ibc, decoder, "Invalid type: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ value = dsync_deserializer_decode_get(decoder, "key");
+ attr->key = p_strdup(pool, value);
+
+ if (dsync_deserializer_decode_try(decoder, "deleted", &value))
+ attr->deleted = TRUE;
+ if (dsync_deserializer_decode_try(decoder, "last_change", &value) &&
+ str_to_time(value, &attr->last_change) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid last_change");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "modseq", &value) &&
+ str_to_uint64(value, &attr->modseq) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid modseq");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ /* NOTE: stream reading must be the last here, because reading a large
+ stream will be finished later by return TRYAGAIN. We need to
+ deserialize all the other fields before that or they'll get lost. */
+ if (dsync_deserializer_decode_try(decoder, "stream", &value)) {
+ attr->value_stream = dsync_ibc_stream_input_stream(ibc);
+ if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) {
+ ibc->cur_attr = attr;
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ /* already finished reading the stream */
+ i_assert(ibc->value_input == NULL);
+ } else if (dsync_deserializer_decode_try(decoder, "value", &value))
+ attr->value = p_strdup(pool, value);
+
+ *attr_r = attr;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void
+dsync_ibc_stream_send_change(struct dsync_ibc *_ibc,
+ const struct dsync_mail_change *change)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str = t_str_new(128);
+ char type[2];
+
+ str_append_c(str, items[ITEM_MAIL_CHANGE].chr);
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAIL_CHANGE);
+
+ type[0] = type[1] = '\0';
+ switch (change->type) {
+ case DSYNC_MAIL_CHANGE_TYPE_SAVE:
+ type[0] = 's';
+ break;
+ case DSYNC_MAIL_CHANGE_TYPE_EXPUNGE:
+ type[0] = 'e';
+ break;
+ case DSYNC_MAIL_CHANGE_TYPE_FLAG_CHANGE:
+ type[0] = 'f';
+ break;
+ }
+ i_assert(type[0] != '\0');
+ dsync_serializer_encode_add(encoder, "type", type);
+ dsync_serializer_encode_add(encoder, "uid", dec2str(change->uid));
+ if (change->guid != NULL)
+ dsync_serializer_encode_add(encoder, "guid", change->guid);
+ if (change->hdr_hash != NULL) {
+ dsync_serializer_encode_add(encoder, "hdr_hash",
+ change->hdr_hash);
+ }
+ if (change->modseq != 0) {
+ dsync_serializer_encode_add(encoder, "modseq",
+ dec2str(change->modseq));
+ }
+ if (change->pvt_modseq != 0) {
+ dsync_serializer_encode_add(encoder, "pvt_modseq",
+ dec2str(change->pvt_modseq));
+ }
+ if (change->add_flags != 0) {
+ dsync_serializer_encode_add(encoder, "add_flags",
+ t_strdup_printf("%x", change->add_flags));
+ }
+ if (change->remove_flags != 0) {
+ dsync_serializer_encode_add(encoder, "remove_flags",
+ t_strdup_printf("%x", change->remove_flags));
+ }
+ if (change->final_flags != 0) {
+ dsync_serializer_encode_add(encoder, "final_flags",
+ t_strdup_printf("%x", change->final_flags));
+ }
+ if (change->keywords_reset)
+ dsync_serializer_encode_add(encoder, "keywords_reset", "");
+
+ if (array_is_created(&change->keyword_changes) &&
+ array_count(&change->keyword_changes) > 0) {
+ string_t *kw_str = t_str_new(128);
+ const char *const *changes;
+ unsigned int i, count;
+
+ changes = array_get(&change->keyword_changes, &count);
+ str_append_tabescaped(kw_str, changes[0]);
+ for (i = 1; i < count; i++) {
+ str_append_c(kw_str, '\t');
+ str_append_tabescaped(kw_str, changes[i]);
+ }
+ dsync_serializer_encode_add(encoder, "keyword_changes",
+ str_c(kw_str));
+ }
+ if (change->received_timestamp > 0) {
+ dsync_serializer_encode_add(encoder, "received_timestamp",
+ t_strdup_printf("%"PRIxTIME_T, change->received_timestamp));
+ }
+ if (change->virtual_size > 0) {
+ dsync_serializer_encode_add(encoder, "virtual_size",
+ t_strdup_printf("%llx", (unsigned long long)change->virtual_size));
+ }
+
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_change(struct dsync_ibc *_ibc,
+ const struct dsync_mail_change **change_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ pool_t pool = ibc->ret_pool;
+ struct dsync_deserializer_decoder *decoder;
+ struct dsync_mail_change *change;
+ const char *value;
+ unsigned int uintval;
+ unsigned long long ullongval;
+ enum dsync_ibc_recv_ret ret;
+
+ p_clear(pool);
+ change = p_new(pool, struct dsync_mail_change, 1);
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL_CHANGE, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ value = dsync_deserializer_decode_get(decoder, "type");
+ switch (*value) {
+ case 's':
+ change->type = DSYNC_MAIL_CHANGE_TYPE_SAVE;
+ break;
+ case 'e':
+ change->type = DSYNC_MAIL_CHANGE_TYPE_EXPUNGE;
+ break;
+ case 'f':
+ change->type = DSYNC_MAIL_CHANGE_TYPE_FLAG_CHANGE;
+ break;
+ default:
+ dsync_ibc_input_error(ibc, decoder, "Invalid type: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ value = dsync_deserializer_decode_get(decoder, "uid");
+ if (str_to_uint32(value, &change->uid) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid uid");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ if (dsync_deserializer_decode_try(decoder, "guid", &value))
+ change->guid = p_strdup(pool, value);
+ if (dsync_deserializer_decode_try(decoder, "hdr_hash", &value))
+ change->hdr_hash = p_strdup(pool, value);
+ if (dsync_deserializer_decode_try(decoder, "modseq", &value) &&
+ str_to_uint64(value, &change->modseq) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid modseq");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "pvt_modseq", &value) &&
+ str_to_uint64(value, &change->pvt_modseq) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid pvt_modseq");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ if (dsync_deserializer_decode_try(decoder, "add_flags", &value)) {
+ if (str_to_uint_hex(value, &uintval) < 0 ||
+ uintval > (uint8_t)-1) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid add_flags: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ change->add_flags = uintval;
+ }
+ if (dsync_deserializer_decode_try(decoder, "remove_flags", &value)) {
+ if (str_to_uint_hex(value, &uintval) < 0 ||
+ uintval > (uint8_t)-1) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid remove_flags: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ change->remove_flags = uintval;
+ }
+ if (dsync_deserializer_decode_try(decoder, "final_flags", &value)) {
+ if (str_to_uint_hex(value, &uintval) < 0 ||
+ uintval > (uint8_t)-1) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid final_flags: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ change->final_flags = uintval;
+ }
+ if (dsync_deserializer_decode_try(decoder, "keywords_reset", &value))
+ change->keywords_reset = TRUE;
+
+ if (dsync_deserializer_decode_try(decoder, "keyword_changes", &value) &&
+ *value != '\0') {
+ const char *const *changes = t_strsplit_tabescaped(value);
+ unsigned int i, count = str_array_length(changes);
+
+ p_array_init(&change->keyword_changes, pool, count);
+ for (i = 0; i < count; i++) {
+ value = p_strdup(pool, changes[i]);
+ array_push_back(&change->keyword_changes, &value);
+ }
+ }
+ if (dsync_deserializer_decode_try(decoder, "received_timestamp", &value)) {
+ if (str_to_ullong_hex(value, &ullongval) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid received_timestamp");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ change->received_timestamp = ullongval;
+ }
+ if (dsync_deserializer_decode_try(decoder, "virtual_size", &value)) {
+ if (str_to_ullong_hex(value, &ullongval) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid virtual_size");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ change->virtual_size = ullongval;
+ }
+
+ *change_r = change;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void
+dsync_ibc_stream_send_mail_request(struct dsync_ibc *_ibc,
+ const struct dsync_mail_request *request)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str = t_str_new(128);
+
+ str_append_c(str, items[ITEM_MAIL_REQUEST].chr);
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAIL_REQUEST);
+ if (request->guid != NULL)
+ dsync_serializer_encode_add(encoder, "guid", request->guid);
+ if (request->uid != 0) {
+ dsync_serializer_encode_add(encoder, "uid",
+ dec2str(request->uid));
+ }
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_mail_request(struct dsync_ibc *_ibc,
+ const struct dsync_mail_request **request_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_deserializer_decoder *decoder;
+ struct dsync_mail_request *request;
+ const char *value;
+ enum dsync_ibc_recv_ret ret;
+
+ p_clear(ibc->ret_pool);
+ request = p_new(ibc->ret_pool, struct dsync_mail_request, 1);
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL_REQUEST, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ if (dsync_deserializer_decode_try(decoder, "guid", &value))
+ request->guid = p_strdup(ibc->ret_pool, value);
+ if (dsync_deserializer_decode_try(decoder, "uid", &value) &&
+ str_to_uint32(value, &request->uid) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid uid");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+
+ *request_r = request;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void
+dsync_ibc_stream_send_mail(struct dsync_ibc *_ibc,
+ const struct dsync_mail *mail)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str = t_str_new(128);
+
+ i_assert(!mail->minimal_fields);
+ i_assert(ibc->value_output == NULL);
+
+ str_append_c(str, items[ITEM_MAIL].chr);
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAIL);
+ if (mail->guid != NULL)
+ dsync_serializer_encode_add(encoder, "guid", mail->guid);
+ if (mail->uid != 0)
+ dsync_serializer_encode_add(encoder, "uid", dec2str(mail->uid));
+ if (mail->pop3_uidl != NULL) {
+ dsync_serializer_encode_add(encoder, "pop3_uidl",
+ mail->pop3_uidl);
+ }
+ if (mail->pop3_order > 0) {
+ dsync_serializer_encode_add(encoder, "pop3_order",
+ dec2str(mail->pop3_order));
+ }
+ if (mail->received_date > 0) {
+ dsync_serializer_encode_add(encoder, "received_date",
+ dec2str(mail->received_date));
+ }
+ if (mail->saved_date != 0) {
+ dsync_serializer_encode_add(encoder, "saved_date",
+ dec2str(mail->saved_date));
+ }
+ if (mail->input != NULL)
+ dsync_serializer_encode_add(encoder, "stream", "");
+
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+
+ if (mail->input != NULL) {
+ ibc->value_output_last = '\0';
+ ibc->value_output = mail->input;
+ i_stream_ref(ibc->value_output);
+ (void)dsync_ibc_stream_send_value_stream(ibc);
+ }
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_mail(struct dsync_ibc *_ibc, struct dsync_mail **mail_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ pool_t pool = ibc->ret_pool;
+ struct dsync_deserializer_decoder *decoder;
+ struct dsync_mail *mail;
+ const char *value;
+ enum dsync_ibc_recv_ret ret;
+
+ if (ibc->value_input != NULL) {
+ /* wait until the mail's stream has been read */
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (ibc->cur_mail != NULL) {
+ /* finished reading the stream, return the mail now */
+ *mail_r = ibc->cur_mail;
+ ibc->cur_mail = NULL;
+ return DSYNC_IBC_RECV_RET_OK;
+ }
+
+ p_clear(pool);
+ mail = p_new(pool, struct dsync_mail, 1);
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ if (dsync_deserializer_decode_try(decoder, "guid", &value))
+ mail->guid = p_strdup(pool, value);
+ if (dsync_deserializer_decode_try(decoder, "uid", &value) &&
+ str_to_uint32(value, &mail->uid) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid uid");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "pop3_uidl", &value))
+ mail->pop3_uidl = p_strdup(pool, value);
+ if (dsync_deserializer_decode_try(decoder, "pop3_order", &value) &&
+ str_to_uint32(value, &mail->pop3_order) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid pop3_order");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "received_date", &value) &&
+ str_to_time(value, &mail->received_date) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid received_date");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "saved_date", &value) &&
+ str_to_time(value, &mail->saved_date) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid saved_date");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "stream", &value)) {
+ mail->input = dsync_ibc_stream_input_stream(ibc);
+ if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) {
+ ibc->cur_mail = mail;
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ /* already finished reading the stream */
+ i_assert(ibc->value_input == NULL);
+ }
+
+ *mail_r = mail;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void
+dsync_ibc_stream_send_finish(struct dsync_ibc *_ibc, const char *error,
+ enum mail_error mail_error,
+ bool require_full_resync)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_serializer_encoder *encoder;
+ string_t *str = t_str_new(128);
+
+ str_append_c(str, items[ITEM_FINISH].chr);
+ encoder = dsync_ibc_send_encode_begin(ibc, ITEM_FINISH);
+ if (error != NULL)
+ dsync_serializer_encode_add(encoder, "error", error);
+ if (mail_error != 0) {
+ dsync_serializer_encode_add(encoder, "mail_error",
+ dec2str(mail_error));
+ }
+ if (require_full_resync)
+ dsync_serializer_encode_add(encoder, "require_full_resync", "");
+ dsync_serializer_encode_finish(&encoder, str);
+ dsync_ibc_stream_send_string(ibc, str);
+}
+
+static enum dsync_ibc_recv_ret
+dsync_ibc_stream_recv_finish(struct dsync_ibc *_ibc, const char **error_r,
+ enum mail_error *mail_error_r,
+ bool *require_full_resync_r)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ struct dsync_deserializer_decoder *decoder;
+ const char *value;
+ enum dsync_ibc_recv_ret ret;
+ int i = 0;
+
+ *error_r = NULL;
+ *mail_error_r = 0;
+ *require_full_resync_r = FALSE;
+
+ p_clear(ibc->ret_pool);
+
+ if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_FINISH)
+ return DSYNC_IBC_RECV_RET_OK;
+
+ ret = dsync_ibc_stream_input_next(ibc, ITEM_FINISH, &decoder);
+ if (ret != DSYNC_IBC_RECV_RET_OK)
+ return ret;
+
+ if (dsync_deserializer_decode_try(decoder, "error", &value))
+ *error_r = p_strdup(ibc->ret_pool, value);
+ if (dsync_deserializer_decode_try(decoder, "mail_error", &value) &&
+ str_to_int(value, &i) < 0) {
+ dsync_ibc_input_error(ibc, decoder, "Invalid mail_error");
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ if (dsync_deserializer_decode_try(decoder, "require_full_resync", &value))
+ *require_full_resync_r = TRUE;
+ *mail_error_r = i;
+
+ ibc->finish_received = TRUE;
+ return DSYNC_IBC_RECV_RET_OK;
+}
+
+static void dsync_ibc_stream_close_mail_streams(struct dsync_ibc *_ibc)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+
+ if (ibc->value_output != NULL) {
+ i_stream_unref(&ibc->value_output);
+ dsync_ibc_stream_stop(ibc);
+ }
+}
+
+static bool dsync_ibc_stream_is_send_queue_full(struct dsync_ibc *_ibc)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+ size_t bytes;
+
+ if (ibc->value_output != NULL)
+ return TRUE;
+
+ bytes = o_stream_get_buffer_used_size(ibc->output);
+ if (bytes < DSYNC_IBC_STREAM_OUTBUF_THROTTLE_SIZE)
+ return FALSE;
+
+ o_stream_set_flush_pending(ibc->output, TRUE);
+ return TRUE;
+}
+
+static bool dsync_ibc_stream_has_pending_data(struct dsync_ibc *_ibc)
+{
+ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
+
+ return ibc->has_pending_data;
+}
+
+static const struct dsync_ibc_vfuncs dsync_ibc_stream_vfuncs = {
+ dsync_ibc_stream_deinit,
+ dsync_ibc_stream_send_handshake,
+ dsync_ibc_stream_recv_handshake,
+ dsync_ibc_stream_send_end_of_list,
+ dsync_ibc_stream_send_mailbox_state,
+ dsync_ibc_stream_recv_mailbox_state,
+ dsync_ibc_stream_send_mailbox_tree_node,
+ dsync_ibc_stream_recv_mailbox_tree_node,
+ dsync_ibc_stream_send_mailbox_deletes,
+ dsync_ibc_stream_recv_mailbox_deletes,
+ dsync_ibc_stream_send_mailbox,
+ dsync_ibc_stream_recv_mailbox,
+ dsync_ibc_stream_send_mailbox_attribute,
+ dsync_ibc_stream_recv_mailbox_attribute,
+ dsync_ibc_stream_send_change,
+ dsync_ibc_stream_recv_change,
+ dsync_ibc_stream_send_mail_request,
+ dsync_ibc_stream_recv_mail_request,
+ dsync_ibc_stream_send_mail,
+ dsync_ibc_stream_recv_mail,
+ dsync_ibc_stream_send_finish,
+ dsync_ibc_stream_recv_finish,
+ dsync_ibc_stream_close_mail_streams,
+ dsync_ibc_stream_is_send_queue_full,
+ dsync_ibc_stream_has_pending_data
+};
+
+struct dsync_ibc *
+dsync_ibc_init_stream(struct istream *input, struct ostream *output,
+ const char *name, const char *temp_path_prefix,
+ unsigned int timeout_secs)
+{
+ struct dsync_ibc_stream *ibc;
+
+ ibc = i_new(struct dsync_ibc_stream, 1);
+ ibc->ibc.v = dsync_ibc_stream_vfuncs;
+ ibc->input = input;
+ ibc->output = output;
+ i_stream_ref(ibc->input);
+ o_stream_ref(ibc->output);
+ ibc->name = i_strdup(name);
+ ibc->temp_path_prefix = i_strdup(temp_path_prefix);
+ ibc->timeout_secs = timeout_secs;
+ ibc->ret_pool = pool_alloconly_create("ibc stream data", 2048);
+ dsync_ibc_stream_init(ibc);
+ return &ibc->ibc;
+}