summaryrefslogtreecommitdiffstats
path: root/src/plugins/fts/fts-indexer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/fts/fts-indexer.c')
-rw-r--r--src/plugins/fts/fts-indexer.c300
1 files changed, 300 insertions, 0 deletions
diff --git a/src/plugins/fts/fts-indexer.c b/src/plugins/fts/fts-indexer.c
new file mode 100644
index 0000000..aca23c9
--- /dev/null
+++ b/src/plugins/fts/fts-indexer.c
@@ -0,0 +1,300 @@
+/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "connection.h"
+#include "write-full.h"
+#include "istream.h"
+#include "ostream.h"
+#include "strescape.h"
+#include "time-util.h"
+#include "settings-parser.h"
+#include "mail-user.h"
+#include "mail-storage-private.h"
+#include "fts-api.h"
+#include "fts-indexer.h"
+
+#define INDEXER_NOTIFY_INTERVAL_SECS 10
+#define INDEXER_SOCKET_NAME "indexer"
+#define INDEXER_WAIT_MSECS 250
+
+struct fts_indexer_context {
+ struct connection conn;
+
+ struct mailbox *box;
+ struct ioloop *ioloop;
+
+ struct timeval search_start_time, last_notify;
+ unsigned int percentage;
+ struct connection_list *connection_list;
+
+ bool notified:1;
+ bool failed:1;
+ bool completed:1;
+};
+
+static void fts_indexer_notify(struct fts_indexer_context *ctx)
+{
+ unsigned long long elapsed_msecs, est_total_msecs;
+ unsigned int eta_secs;
+
+ if (ioloop_time - ctx->last_notify.tv_sec < INDEXER_NOTIFY_INTERVAL_SECS)
+ return;
+ ctx->last_notify = ioloop_timeval;
+
+ if (ctx->box->storage->callbacks.notify_ok == NULL ||
+ ctx->percentage == 0)
+ return;
+
+ elapsed_msecs = timeval_diff_msecs(&ioloop_timeval,
+ &ctx->search_start_time);
+ est_total_msecs = elapsed_msecs * 100 / ctx->percentage;
+ eta_secs = (est_total_msecs - elapsed_msecs) / 1000;
+
+ T_BEGIN {
+ const char *text;
+
+ text = t_strdup_printf("Indexed %d%% of the mailbox, "
+ "ETA %d:%02d", ctx->percentage,
+ eta_secs/60, eta_secs%60);
+ ctx->box->storage->callbacks.
+ notify_ok(ctx->box, text,
+ ctx->box->storage->callback_context);
+ ctx->notified = TRUE;
+ } T_END;
+}
+
+static int fts_indexer_more_int(struct fts_indexer_context *ctx)
+{
+ struct ioloop *prev_ioloop = current_ioloop;
+ struct timeout *to;
+
+ if (ctx->failed)
+ return -1;
+ if (ctx->completed)
+ return 1;
+
+ /* wait for a while for the reply. FIXME: once search API supports
+ asynchronous waits, get rid of this wait and use the mail IO loop */
+ io_loop_set_current(ctx->ioloop);
+ to = timeout_add_short(INDEXER_WAIT_MSECS, io_loop_stop, ctx->ioloop);
+ io_loop_run(ctx->ioloop);
+ timeout_remove(&to);
+ io_loop_set_current(prev_ioloop);
+
+ if (ctx->failed)
+ return -1;
+ if (ctx->completed)
+ return 1;
+ return 0;
+}
+
+int fts_indexer_more(struct fts_indexer_context *ctx)
+{
+ int ret;
+
+ if ((ret = fts_indexer_more_int(ctx)) < 0) {
+ /* If failed is already set, the code has had a chance to
+ * set an internal error already, i.e. MAIL_ERROR_INUSE. */
+ if (!ctx->failed)
+ mail_storage_set_internal_error(ctx->box->storage);
+ ctx->failed = TRUE;
+ return -1;
+ }
+
+ if (ret == 0)
+ fts_indexer_notify(ctx);
+
+ return ret;
+}
+
+static void fts_indexer_destroy(struct connection *conn)
+{
+ struct fts_indexer_context *ctx =
+ container_of(conn, struct fts_indexer_context, conn);
+ connection_deinit(conn);
+ if (!ctx->completed)
+ ctx->failed = TRUE;
+ ctx->completed = TRUE;
+}
+
+int fts_indexer_deinit(struct fts_indexer_context **_ctx)
+{
+ struct fts_indexer_context *ctx = *_ctx;
+ i_assert(ctx != NULL);
+ *_ctx = NULL;
+ if (!ctx->completed)
+ ctx->failed = TRUE;
+ int ret = ctx->failed ? -1 : 0;
+ if (ctx->notified) {
+ /* we notified at least once */
+ ctx->box->storage->callbacks.
+ notify_ok(ctx->box, "Mailbox indexing finished",
+ ctx->box->storage->callback_context);
+ }
+ connection_list_deinit(&ctx->connection_list);
+ io_loop_set_current(ctx->ioloop);
+ io_loop_destroy(&ctx->ioloop);
+ i_free(ctx);
+ return ret;
+}
+
+static int
+fts_indexer_input_args(struct connection *conn, const char *const *args)
+{
+ struct fts_indexer_context *ctx =
+ container_of(conn, struct fts_indexer_context, conn);
+ int percentage;
+ if (args[1] == NULL) {
+ e_error(conn->event, "indexer sent invalid reply");
+ return -1;
+ }
+ if (strcmp(args[0], "1") != 0) {
+ e_error(conn->event, "indexer sent invalid reply");
+ return -1;
+ }
+ if (strcmp(args[1], "OK") == 0)
+ return 1;
+ if (str_to_int(args[1], &percentage) < 0) {
+ e_error(conn->event, "indexer sent invalid progress: %s", args[1]);
+ ctx->failed = TRUE;
+ return -1;
+ }
+ if (percentage < 0) {
+ e_error(ctx->box->event, "indexer failed to index mailbox");
+ ctx->failed = TRUE;
+ return -1;
+ }
+ ctx->percentage = percentage;
+ if (ctx->percentage == 100)
+ ctx->completed = TRUE;
+ return 1;
+}
+
+static void fts_indexer_client_connected(struct connection *conn, bool success)
+{
+ struct fts_indexer_context *ctx =
+ container_of(conn, struct fts_indexer_context, conn);
+ if (!success) {
+ ctx->completed = TRUE;
+ ctx->failed = TRUE;
+ return;
+ }
+ ctx->failed = ctx->completed = FALSE;
+ const char *cmd = t_strdup_printf("PREPEND\t1\t%s\t%s\t0\t%s\n",
+ str_tabescape(ctx->box->storage->user->username),
+ str_tabescape(ctx->box->vname),
+ str_tabescape(ctx->box->storage->user->session_id));
+ o_stream_nsend_str(conn->output, cmd);
+}
+
+static void fts_indexer_idle_timeout(struct connection *conn)
+{
+ struct fts_indexer_context *ctx =
+ container_of(conn, struct fts_indexer_context, conn);
+ mail_storage_set_error(ctx->box->storage, MAIL_ERROR_INUSE,
+ "Timeout while waiting for indexing to finish");
+ ctx->failed = TRUE;
+ connection_disconnect(conn);
+}
+
+static const struct connection_settings indexer_client_set =
+{
+ .service_name_in = "indexer",
+ .service_name_out = "indexer",
+ .major_version = 1,
+ .minor_version = 0,
+ .client_connect_timeout_msecs = 2000,
+ .input_max_size = SIZE_MAX,
+ .output_max_size = IO_BLOCK_SIZE,
+ .client = TRUE,
+};
+
+static const struct connection_vfuncs indexer_client_vfuncs =
+{
+ .destroy = fts_indexer_destroy,
+ .client_connected = fts_indexer_client_connected,
+ .input_args = fts_indexer_input_args,
+ .idle_timeout = fts_indexer_idle_timeout,
+};
+
+int fts_indexer_init(struct fts_backend *backend, struct mailbox *box,
+ struct fts_indexer_context **ctx_r)
+{
+ struct ioloop *prev_ioloop = current_ioloop;
+ struct fts_indexer_context *ctx;
+ struct mailbox_status status;
+ uint32_t last_uid, seq1, seq2;
+ const char *path, *value, *error;
+ unsigned int timeout_secs = 0;
+ int ret;
+
+ value = mail_user_plugin_getenv(box->storage->user, "fts_index_timeout");
+ if (value != NULL) {
+ if (settings_get_time(value, &timeout_secs, &error) < 0) {
+ e_error(box->storage->user->event,
+ "Invalid fts_index_timeout setting: %s",
+ error);
+ return -1;
+ }
+ }
+
+ if (fts_backend_get_last_uid(backend, box, &last_uid) < 0)
+ return -1;
+
+ mailbox_get_open_status(box, STATUS_UIDNEXT, &status);
+ if (status.uidnext == last_uid+1) {
+ /* everything is already indexed */
+ return 0;
+ }
+
+ mailbox_get_seq_range(box, last_uid+1, (uint32_t)-1, &seq1, &seq2);
+ if (seq1 == 0) {
+ /* no new messages (last messages in mailbox were expunged) */
+ return 0;
+ }
+
+ path = t_strconcat(box->storage->user->set->base_dir,
+ "/"INDEXER_SOCKET_NAME, NULL);
+
+ ctx = i_new(struct fts_indexer_context, 1);
+ ctx->box = box;
+ ctx->search_start_time = ioloop_timeval;
+ ctx->conn.event_parent = box->event;
+ ctx->ioloop = io_loop_create();
+ ctx->connection_list = connection_list_init(&indexer_client_set,
+ &indexer_client_vfuncs);
+ ctx->conn.input_idle_timeout_secs = timeout_secs;
+ connection_init_client_unix(ctx->connection_list, &ctx->conn,
+ path);
+ ret = connection_client_connect(&ctx->conn);
+ io_loop_set_current(prev_ioloop);
+ *ctx_r = ctx;
+ return ctx->failed || ret < 0 ? -1 : 1;
+}
+
+#define INDEXER_HANDSHAKE "1\t0\tindexer\tindexer\n"
+
+int fts_indexer_cmd(struct mail_user *user, const char *cmd,
+ const char **path_r)
+{
+ const char *path;
+ int fd;
+
+ path = t_strconcat(user->set->base_dir,
+ "/"INDEXER_SOCKET_NAME, NULL);
+ fd = net_connect_unix_with_retries(path, 1000);
+ if (fd == -1) {
+ i_error("net_connect_unix(%s) failed: %m", path);
+ return -1;
+ }
+
+ cmd = t_strconcat(INDEXER_HANDSHAKE, cmd, NULL);
+ if (write_full(fd, cmd, strlen(cmd)) < 0) {
+ i_error("write(%s) failed: %m", path);
+ i_close_fd(&fd);
+ return -1;
+ }
+ *path_r = path;
+ return fd;
+}