diff options
Diffstat (limited to 'src/plugins/fts/fts-parser-tika.c')
-rw-r--r-- | src/plugins/fts/fts-parser-tika.c | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/src/plugins/fts/fts-parser-tika.c b/src/plugins/fts/fts-parser-tika.c new file mode 100644 index 0000000..bb6379c --- /dev/null +++ b/src/plugins/fts/fts-parser-tika.c @@ -0,0 +1,278 @@ +/* Copyright (c) 2014-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "istream.h" +#include "module-context.h" +#include "iostream-ssl.h" +#include "http-url.h" +#include "http-client.h" +#include "message-parser.h" +#include "mail-user.h" +#include "fts-parser.h" + +#define TIKA_USER_CONTEXT(obj) \ + MODULE_CONTEXT(obj, fts_parser_tika_user_module) + +struct fts_parser_tika_user { + union mail_user_module_context module_ctx; + struct http_url *http_url; +}; + +struct tika_fts_parser { + struct fts_parser parser; + struct mail_user *user; + struct http_client_request *http_req; + + struct ioloop *ioloop; + struct io *io; + struct istream *payload; + + bool failed; +}; + +static struct http_client *tika_http_client = NULL; +static MODULE_CONTEXT_DEFINE_INIT(fts_parser_tika_user_module, + &mail_user_module_register); + +static int +tika_get_http_client_url(struct mail_user *user, struct http_url **http_url_r) +{ + struct fts_parser_tika_user *tuser = TIKA_USER_CONTEXT(user); + struct http_client_settings http_set; + struct ssl_iostream_settings ssl_set; + const char *url, *error; + + url = mail_user_plugin_getenv(user, "fts_tika"); + if (url == NULL) { + /* fts_tika disabled */ + return -1; + } + + if (tuser != NULL) { + *http_url_r = tuser->http_url; + return *http_url_r == NULL ? -1 : 0; + } + + tuser = p_new(user->pool, struct fts_parser_tika_user, 1); + MODULE_CONTEXT_SET(user, fts_parser_tika_user_module, tuser); + + if (http_url_parse(url, NULL, 0, user->pool, + &tuser->http_url, &error) < 0) { + i_error("fts_tika: Failed to parse HTTP url %s: %s", url, error); + return -1; + } + + if (tika_http_client == NULL) { + mail_user_init_ssl_client_settings(user, &ssl_set); + + i_zero(&http_set); + http_set.max_idle_time_msecs = 100; + http_set.max_parallel_connections = 1; + http_set.max_pipelined_requests = 1; + http_set.max_redirects = 1; + http_set.max_attempts = 3; + http_set.connect_timeout_msecs = 5*1000; + http_set.request_timeout_msecs = 60*1000; + http_set.ssl = &ssl_set; + http_set.debug = user->mail_debug; + http_set.event_parent = user->event; + + /* FIXME: We should initialize a shared client instead. However, + this is currently not possible due to an obscure bug + in the blocking HTTP payload API, which causes + conflicts with other HTTP applications like FTS Solr. + Using a private client will provide a quick fix for + now. */ + tika_http_client = http_client_init_private(&http_set); + } + *http_url_r = tuser->http_url; + return 0; +} + +static void +fts_tika_parser_response(const struct http_response *response, + struct tika_fts_parser *parser) +{ + i_assert(parser->payload == NULL); + + switch (response->status) { + case 200: + /* read response */ + if (response->payload == NULL) + parser->payload = i_stream_create_from_data("", 0); + else { + i_stream_ref(response->payload); + parser->payload = response->payload; + } + break; + case 204: /* empty response */ + case 415: /* Unsupported Media Type */ + case 422: /* Unprocessable Entity */ + e_debug(parser->user->event, "fts_tika: PUT %s failed: %s", + mail_user_plugin_getenv(parser->user, "fts_tika"), + http_response_get_message(response)); + parser->payload = i_stream_create_from_data("", 0); + break; + default: + if (response->status / 100 == 5) { + /* Server Error - the problem could be anything (in Tika or + HTTP server or proxy) and might be retriable, but Tika has + trouble processing some documents and throws up this error + every time for those documents. */ + parser->parser.may_need_retry = TRUE; + i_free(parser->parser.retriable_error_msg); + parser->parser.retriable_error_msg = + i_strdup_printf("fts_tika: PUT %s failed: %s", + mail_user_plugin_getenv(parser->user, "fts_tika"), + http_response_get_message(response)); + parser->payload = i_stream_create_from_data("", 0); + } else { + i_error("fts_tika: PUT %s failed: %s", + mail_user_plugin_getenv(parser->user, "fts_tika"), + http_response_get_message(response)); + parser->failed = TRUE; + } + break; + } + parser->http_req = NULL; + io_loop_stop(current_ioloop); +} + +static struct fts_parser * +fts_parser_tika_try_init(struct fts_parser_context *parser_context) +{ + struct tika_fts_parser *parser; + struct http_url *http_url; + struct http_client_request *http_req; + + if (tika_get_http_client_url(parser_context->user, &http_url) < 0) + return NULL; + if (http_url->path == NULL) + http_url->path = "/"; + + parser = i_new(struct tika_fts_parser, 1); + parser->parser.v = fts_parser_tika; + parser->user = parser_context->user; + + http_req = http_client_request(tika_http_client, "PUT", + http_url->host.name, + t_strconcat(http_url->path, http_url->enc_query, NULL), + fts_tika_parser_response, parser); + http_client_request_set_port(http_req, http_url->port); + http_client_request_set_ssl(http_req, http_url->have_ssl); + if (parser_context->content_type != NULL) + http_client_request_add_header(http_req, "Content-Type", + parser_context->content_type); + if (parser_context->content_disposition != NULL) + http_client_request_add_header(http_req, "Content-Disposition", + parser_context->content_disposition); + http_client_request_add_header(http_req, "Accept", "text/plain"); + + parser->http_req = http_req; + return &parser->parser; +} + +static void fts_parser_tika_more(struct fts_parser *_parser, + struct message_block *block) +{ + struct tika_fts_parser *parser = (struct tika_fts_parser *)_parser; + struct ioloop *prev_ioloop = current_ioloop; + const unsigned char *data; + size_t size; + ssize_t ret; + + if (block->size > 0) { + /* first we'll send everything to Tika */ + if (!parser->failed && + http_client_request_send_payload(&parser->http_req, + block->data, + block->size) < 0) + parser->failed = TRUE; + block->size = 0; + return; + } + + if (parser->payload == NULL) { + /* read the result from Tika */ + if (!parser->failed && + http_client_request_finish_payload(&parser->http_req) < 0) + parser->failed = TRUE; + if (!parser->failed && parser->payload == NULL) + http_client_wait(tika_http_client); + if (parser->failed) + return; + i_assert(parser->payload != NULL); + } + /* continue returning data from Tika. we'll create a new ioloop just + for reading this one payload. */ + while ((ret = i_stream_read_more(parser->payload, &data, &size)) == 0) { + if (parser->failed) + break; + /* wait for more input from Tika */ + if (parser->ioloop == NULL) { + parser->ioloop = io_loop_create(); + parser->io = io_add_istream(parser->payload, io_loop_stop, + current_ioloop); + } else { + io_loop_set_current(parser->ioloop); + } + io_loop_run(current_ioloop); + } + /* switch back to original ioloop. */ + io_loop_set_current(prev_ioloop); + + if (parser->failed) + ; + else if (size > 0) { + i_assert(ret > 0); + block->data = data; + block->size = size; + i_stream_skip(parser->payload, size); + } else { + /* finished */ + i_assert(ret == -1); + if (parser->payload->stream_errno != 0) { + i_error("read(%s) failed: %s", + i_stream_get_name(parser->payload), + i_stream_get_error(parser->payload)); + parser->failed = TRUE; + } + } +} + +static int fts_parser_tika_deinit(struct fts_parser *_parser, const char **retriable_err_msg_r) +{ + struct tika_fts_parser *parser = (struct tika_fts_parser *)_parser; + int ret = _parser->may_need_retry ? 0: (parser->failed ? -1 : 1); + + i_assert(ret != 0 || _parser->retriable_error_msg != NULL); + if (retriable_err_msg_r != NULL) + *retriable_err_msg_r = t_strdup(_parser->retriable_error_msg); + i_free(_parser->retriable_error_msg); + + /* remove io before unrefing payload - otherwise lib-http adds another + timeout to ioloop unnecessarily */ + i_stream_unref(&parser->payload); + io_remove(&parser->io); + http_client_request_abort(&parser->http_req); + if (parser->ioloop != NULL) { + io_loop_set_current(parser->ioloop); + io_loop_destroy(&parser->ioloop); + } + i_free(parser); + return ret; +} + +static void fts_parser_tika_unload(void) +{ + if (tika_http_client != NULL) + http_client_deinit(&tika_http_client); +} + +struct fts_parser_vfuncs fts_parser_tika = { + fts_parser_tika_try_init, + fts_parser_tika_more, + fts_parser_tika_deinit, + fts_parser_tika_unload +}; |