summaryrefslogtreecommitdiffstats
path: root/storage/innobase/fts/fts0opt.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
commita175314c3e5827eb193872241446f2f8f5c9d33c (patch)
treecd3d60ca99ae00829c52a6ca79150a5b6e62528b /storage/innobase/fts/fts0opt.cc
parentInitial commit. (diff)
downloadmariadb-10.5-9e4947182e0b875da38088fdd168e775f473b8ad.tar.xz
mariadb-10.5-9e4947182e0b875da38088fdd168e775f473b8ad.zip
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/innobase/fts/fts0opt.cc')
-rw-r--r--storage/innobase/fts/fts0opt.cc3053
1 files changed, 3053 insertions, 0 deletions
diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc
new file mode 100644
index 00000000..e3c0f8f5
--- /dev/null
+++ b/storage/innobase/fts/fts0opt.cc
@@ -0,0 +1,3053 @@
+/*****************************************************************************
+
+Copyright (c) 2007, 2018, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2016, 2020, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file fts/fts0opt.cc
+Full Text Search optimize thread
+
+Created 2007/03/27 Sunny Bains
+Completed 2011/7/10 Sunny and Jimmy Yang
+
+***********************************************************************/
+
+#include "fts0fts.h"
+#include "row0sel.h"
+#include "que0types.h"
+#include "fts0priv.h"
+#include "fts0types.h"
+#include "ut0wqueue.h"
+#include "srv0start.h"
+#include "ut0list.h"
+#include "zlib.h"
+#include "fts0opt.h"
+
+/** The FTS optimize thread's work queue. */
+ib_wqueue_t* fts_optimize_wq;
+static void fts_optimize_callback(void *);
+static void timer_callback(void*);
+static tpool::timer* timer;
+
+static tpool::task_group task_group(1);
+static tpool::task task(fts_optimize_callback,0, &task_group);
+
+/** FTS optimize thread, for MDL acquisition */
+static THD *fts_opt_thd;
+
+/** The FTS vector to store fts_slot_t */
+static ib_vector_t* fts_slots;
+
+/** Default optimize interval in secs. */
+static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
+
+/** Server is shutting down, so does we exiting the optimize thread */
+static bool fts_opt_start_shutdown = false;
+
+/** Event to wait for shutdown of the optimize thread */
+static os_event_t fts_opt_shutdown_event = NULL;
+
+/** Initial size of nodes in fts_word_t. */
+static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
+
+/** Last time we did check whether system need a sync */
+static time_t last_check_sync_time;
+
+/** FTS optimize thread message types. */
+enum fts_msg_type_t {
+ FTS_MSG_STOP, /*!< Stop optimizing and exit thread */
+
+ FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's
+ work queue */
+
+ FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize
+ threads work queue */
+ FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */
+};
+
+/** Compressed list of words that have been read from FTS INDEX
+that needs to be optimized. */
+struct fts_zip_t {
+ lint status; /*!< Status of (un)/zip operation */
+
+ ulint n_words; /*!< Number of words compressed */
+
+ ulint block_sz; /*!< Size of a block in bytes */
+
+ ib_vector_t* blocks; /*!< Vector of compressed blocks */
+
+ ib_alloc_t* heap_alloc; /*!< Heap to use for allocations */
+
+ ulint pos; /*!< Offset into blocks */
+
+ ulint last_big_block; /*!< Offset of last block in the
+ blocks array that is of size
+ block_sz. Blocks beyond this offset
+ are of size FTS_MAX_WORD_LEN */
+
+ z_streamp zp; /*!< ZLib state */
+
+ /*!< The value of the last word read
+ from the FTS INDEX table. This is
+ used to discard duplicates */
+
+ fts_string_t word; /*!< UTF-8 string */
+
+ ulint max_words; /*!< maximum number of words to read
+ in one pase */
+};
+
+/** Prepared statemets used during optimize */
+struct fts_optimize_graph_t {
+ /*!< Delete a word from FTS INDEX */
+ que_t* delete_nodes_graph;
+ /*!< Insert a word into FTS INDEX */
+ que_t* write_nodes_graph;
+ /*!< COMMIT a transaction */
+ que_t* commit_graph;
+ /*!< Read the nodes from FTS_INDEX */
+ que_t* read_nodes_graph;
+};
+
+/** Used by fts_optimize() to store state. */
+struct fts_optimize_t {
+ trx_t* trx; /*!< The transaction used for all SQL */
+
+ ib_alloc_t* self_heap; /*!< Heap to use for allocations */
+
+ char* name_prefix; /*!< FTS table name prefix */
+
+ fts_table_t fts_index_table;/*!< Common table definition */
+
+ /*!< Common table definition */
+ fts_table_t fts_common_table;
+
+ dict_table_t* table; /*!< Table that has to be queried */
+
+ dict_index_t* index; /*!< The FTS index to be optimized */
+
+ fts_doc_ids_t* to_delete; /*!< doc ids to delete, we check against
+ this vector and purge the matching
+ entries during the optimizing
+ process. The vector entries are
+ sorted on doc id */
+
+ ulint del_pos; /*!< Offset within to_delete vector,
+ this is used to keep track of where
+ we are up to in the vector */
+
+ ibool done; /*!< TRUE when optimize finishes */
+
+ ib_vector_t* words; /*!< Word + Nodes read from FTS_INDEX,
+ it contains instances of fts_word_t */
+
+ fts_zip_t* zip; /*!< Words read from the FTS_INDEX */
+
+ fts_optimize_graph_t /*!< Prepared statements used during */
+ graph; /*optimize */
+
+ ulint n_completed; /*!< Number of FTS indexes that have
+ been optimized */
+ ibool del_list_regenerated;
+ /*!< BEING_DELETED list regenarated */
+};
+
+/** Used by the optimize, to keep state during compacting nodes. */
+struct fts_encode_t {
+ doc_id_t src_last_doc_id;/*!< Last doc id read from src node */
+ byte* src_ilist_ptr; /*!< Current ptr within src ilist */
+};
+
+/** We use this information to determine when to start the optimize
+cycle for a table. */
+struct fts_slot_t {
+ /** table, or NULL if the slot is unused */
+ dict_table_t* table;
+
+ /** whether this slot is being processed */
+ bool running;
+
+ ulint added; /*!< Number of doc ids added since the
+ last time this table was optimized */
+
+ ulint deleted; /*!< Number of doc ids deleted since the
+ last time this table was optimized */
+
+ /** time(NULL) of completing fts_optimize_table_bk() */
+ time_t last_run;
+
+ /** time(NULL) of latest successful fts_optimize_table() */
+ time_t completed;
+};
+
+/** A table remove message for the FTS optimize thread. */
+struct fts_msg_del_t {
+ dict_table_t* table; /*!< The table to remove */
+
+ os_event_t event; /*!< Event to synchronize acknowledgement
+ of receipt and processing of the
+ this message by the consumer */
+};
+
+/** The FTS optimize message work queue message type. */
+struct fts_msg_t {
+ fts_msg_type_t type; /*!< Message type */
+
+ void* ptr; /*!< The message contents */
+
+ mem_heap_t* heap; /*!< The heap used to allocate this
+ message, the message consumer will
+ free the heap. */
+};
+
+/** The number of words to read and optimize in a single pass. */
+ulong fts_num_word_optimize;
+
+/** Whether to enable additional FTS diagnostic printout. */
+char fts_enable_diag_print;
+
+/** ZLib compressed block size.*/
+static ulint FTS_ZIP_BLOCK_SIZE = 1024;
+
+/** The amount of time optimizing in a single pass, in seconds. */
+static ulint fts_optimize_time_limit;
+
+/** It's defined in fts0fts.cc */
+extern const char* fts_common_tables[];
+
+/** SQL Statement for changing state of rows to be deleted from FTS Index. */
+static const char* fts_init_delete_sql =
+ "BEGIN\n"
+ "\n"
+ "INSERT INTO $BEING_DELETED\n"
+ "SELECT doc_id FROM $DELETED;\n"
+ "\n"
+ "INSERT INTO $BEING_DELETED_CACHE\n"
+ "SELECT doc_id FROM $DELETED_CACHE;\n";
+
+static const char* fts_delete_doc_ids_sql =
+ "BEGIN\n"
+ "\n"
+ "DELETE FROM $DELETED WHERE doc_id = :doc_id1;\n"
+ "DELETE FROM $DELETED_CACHE WHERE doc_id = :doc_id2;\n";
+
+static const char* fts_end_delete_sql =
+ "BEGIN\n"
+ "\n"
+ "DELETE FROM $BEING_DELETED;\n"
+ "DELETE FROM $BEING_DELETED_CACHE;\n";
+
+/**********************************************************************//**
+Initialize fts_zip_t. */
+static
+void
+fts_zip_initialize(
+/*===============*/
+ fts_zip_t* zip) /*!< out: zip instance to initialize */
+{
+ zip->pos = 0;
+ zip->n_words = 0;
+
+ zip->status = Z_OK;
+
+ zip->last_big_block = 0;
+
+ zip->word.f_len = 0;
+ *zip->word.f_str = 0;
+
+ ib_vector_reset(zip->blocks);
+
+ memset(zip->zp, 0, sizeof(*zip->zp));
+}
+
+/**********************************************************************//**
+Create an instance of fts_zip_t.
+@return a new instance of fts_zip_t */
+static
+fts_zip_t*
+fts_zip_create(
+/*===========*/
+ mem_heap_t* heap, /*!< in: heap */
+ ulint block_sz, /*!< in: size of a zip block.*/
+ ulint max_words) /*!< in: max words to read */
+{
+ fts_zip_t* zip;
+
+ zip = static_cast<fts_zip_t*>(mem_heap_zalloc(heap, sizeof(*zip)));
+
+ zip->word.f_str = static_cast<byte*>(
+ mem_heap_zalloc(heap, FTS_MAX_WORD_LEN + 1));
+
+ zip->block_sz = block_sz;
+
+ zip->heap_alloc = ib_heap_allocator_create(heap);
+
+ zip->blocks = ib_vector_create(zip->heap_alloc, sizeof(void*), 128);
+
+ zip->max_words = max_words;
+
+ zip->zp = static_cast<z_stream*>(
+ mem_heap_zalloc(heap, sizeof(*zip->zp)));
+
+ return(zip);
+}
+
+/**********************************************************************//**
+Initialize an instance of fts_zip_t. */
+static
+void
+fts_zip_init(
+/*=========*/
+
+ fts_zip_t* zip) /*!< in: zip instance to init */
+{
+ memset(zip->zp, 0, sizeof(*zip->zp));
+
+ zip->word.f_len = 0;
+ *zip->word.f_str = '\0';
+}
+
+/**********************************************************************//**
+Create a fts_optimizer_word_t instance.
+@return new instance */
+static
+fts_word_t*
+fts_word_init(
+/*==========*/
+ fts_word_t* word, /*!< in: word to initialize */
+ byte* utf8, /*!< in: UTF-8 string */
+ ulint len) /*!< in: length of string in bytes */
+{
+ mem_heap_t* heap = mem_heap_create(sizeof(fts_node_t));
+
+ memset(word, 0, sizeof(*word));
+
+ word->text.f_len = len;
+ word->text.f_str = static_cast<byte*>(mem_heap_alloc(heap, len + 1));
+
+ /* Need to copy the NUL character too. */
+ memcpy(word->text.f_str, utf8, word->text.f_len);
+ word->text.f_str[word->text.f_len] = 0;
+
+ word->heap_alloc = ib_heap_allocator_create(heap);
+
+ word->nodes = ib_vector_create(
+ word->heap_alloc, sizeof(fts_node_t), FTS_WORD_NODES_INIT_SIZE);
+
+ return(word);
+}
+
+/**********************************************************************//**
+Read the FTS INDEX row.
+@return fts_node_t instance */
+static
+fts_node_t*
+fts_optimize_read_node(
+/*===================*/
+ fts_word_t* word, /*!< in: */
+ que_node_t* exp) /*!< in: */
+{
+ int i;
+ fts_node_t* node = static_cast<fts_node_t*>(
+ ib_vector_push(word->nodes, NULL));
+
+ /* Start from 1 since the first node has been read by the caller */
+ for (i = 1; exp; exp = que_node_get_next(exp), ++i) {
+
+ dfield_t* dfield = que_node_get_val(exp);
+ byte* data = static_cast<byte*>(
+ dfield_get_data(dfield));
+ ulint len = dfield_get_len(dfield);
+
+ ut_a(len != UNIV_SQL_NULL);
+
+ /* Note: The column numbers below must match the SELECT */
+ switch (i) {
+ case 1: /* DOC_COUNT */
+ node->doc_count = mach_read_from_4(data);
+ break;
+
+ case 2: /* FIRST_DOC_ID */
+ node->first_doc_id = fts_read_doc_id(data);
+ break;
+
+ case 3: /* LAST_DOC_ID */
+ node->last_doc_id = fts_read_doc_id(data);
+ break;
+
+ case 4: /* ILIST */
+ node->ilist_size_alloc = node->ilist_size = len;
+ node->ilist = static_cast<byte*>(ut_malloc_nokey(len));
+ memcpy(node->ilist, data, len);
+ break;
+
+ default:
+ ut_error;
+ }
+ }
+
+ /* Make sure all columns were read. */
+ ut_a(i == 5);
+
+ return(node);
+}
+
+/**********************************************************************//**
+Callback function to fetch the rows in an FTS INDEX record.
+@return always returns non-NULL */
+ibool
+fts_optimize_index_fetch_node(
+/*==========================*/
+ void* row, /*!< in: sel_node_t* */
+ void* user_arg) /*!< in: pointer to ib_vector_t */
+{
+ fts_word_t* word;
+ sel_node_t* sel_node = static_cast<sel_node_t*>(row);
+ fts_fetch_t* fetch = static_cast<fts_fetch_t*>(user_arg);
+ ib_vector_t* words = static_cast<ib_vector_t*>(fetch->read_arg);
+ que_node_t* exp = sel_node->select_list;
+ dfield_t* dfield = que_node_get_val(exp);
+ void* data = dfield_get_data(dfield);
+ ulint dfield_len = dfield_get_len(dfield);
+ fts_node_t* node;
+ bool is_word_init = false;
+
+ ut_a(dfield_len <= FTS_MAX_WORD_LEN);
+
+ if (ib_vector_size(words) == 0) {
+
+ word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
+ fts_word_init(word, (byte*) data, dfield_len);
+ is_word_init = true;
+ }
+
+ word = static_cast<fts_word_t*>(ib_vector_last(words));
+
+ if (dfield_len != word->text.f_len
+ || memcmp(word->text.f_str, data, dfield_len)) {
+
+ word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
+ fts_word_init(word, (byte*) data, dfield_len);
+ is_word_init = true;
+ }
+
+ node = fts_optimize_read_node(word, que_node_get_next(exp));
+
+ fetch->total_memory += node->ilist_size;
+ if (is_word_init) {
+ fetch->total_memory += sizeof(fts_word_t)
+ + sizeof(ib_alloc_t) + sizeof(ib_vector_t) + dfield_len
+ + sizeof(fts_node_t) * FTS_WORD_NODES_INIT_SIZE;
+ } else if (ib_vector_size(words) > FTS_WORD_NODES_INIT_SIZE) {
+ fetch->total_memory += sizeof(fts_node_t);
+ }
+
+ if (fetch->total_memory >= fts_result_cache_limit) {
+ return(FALSE);
+ }
+
+ return(TRUE);
+}
+
+/**********************************************************************//**
+Read the rows from the FTS inde.
+@return DB_SUCCESS or error code */
+dberr_t
+fts_index_fetch_nodes(
+/*==================*/
+ trx_t* trx, /*!< in: transaction */
+ que_t** graph, /*!< in: prepared statement */
+ fts_table_t* fts_table, /*!< in: table of the FTS INDEX */
+ const fts_string_t*
+ word, /*!< in: the word to fetch */
+ fts_fetch_t* fetch) /*!< in: fetch callback.*/
+{
+ pars_info_t* info;
+ dberr_t error;
+ char table_name[MAX_FULL_NAME_LEN];
+
+ trx->op_info = "fetching FTS index nodes";
+
+ if (*graph) {
+ info = (*graph)->info;
+ } else {
+ ulint selected;
+
+ info = pars_info_create();
+
+ ut_a(fts_table->type == FTS_INDEX_TABLE);
+
+ selected = fts_select_index(fts_table->charset,
+ word->f_str, word->f_len);
+
+ fts_table->suffix = fts_get_suffix(selected);
+
+ fts_get_table_name(fts_table, table_name);
+
+ pars_info_bind_id(info, true, "table_name", table_name);
+ }
+
+ pars_info_bind_function(info, "my_func", fetch->read_record, fetch);
+ pars_info_bind_varchar_literal(info, "word", word->f_str, word->f_len);
+
+ if (!*graph) {
+
+ *graph = fts_parse_sql(
+ fts_table,
+ info,
+ "DECLARE FUNCTION my_func;\n"
+ "DECLARE CURSOR c IS"
+ " SELECT word, doc_count, first_doc_id, last_doc_id,"
+ " ilist\n"
+ " FROM $table_name\n"
+ " WHERE word LIKE :word\n"
+ " ORDER BY first_doc_id;\n"
+ "BEGIN\n"
+ "\n"
+ "OPEN c;\n"
+ "WHILE 1 = 1 LOOP\n"
+ " FETCH c INTO my_func();\n"
+ " IF c % NOTFOUND THEN\n"
+ " EXIT;\n"
+ " END IF;\n"
+ "END LOOP;\n"
+ "CLOSE c;");
+ }
+
+ for (;;) {
+ error = fts_eval_sql(trx, *graph);
+
+ if (UNIV_LIKELY(error == DB_SUCCESS)) {
+ fts_sql_commit(trx);
+
+ break; /* Exit the loop. */
+ } else {
+ fts_sql_rollback(trx);
+
+ if (error == DB_LOCK_WAIT_TIMEOUT) {
+ ib::warn() << "lock wait timeout reading"
+ " FTS index. Retrying!";
+
+ trx->error_state = DB_SUCCESS;
+ } else {
+ ib::error() << "(" << error
+ << ") while reading FTS index.";
+
+ break; /* Exit the loop. */
+ }
+ }
+ }
+
+ return(error);
+}
+
+/**********************************************************************//**
+Read a word */
+static
+byte*
+fts_zip_read_word(
+/*==============*/
+ fts_zip_t* zip, /*!< in: Zip state + data */
+ fts_string_t* word) /*!< out: uncompressed word */
+{
+ short len = 0;
+ void* null = NULL;
+ byte* ptr = word->f_str;
+ int flush = Z_NO_FLUSH;
+
+ /* Either there was an error or we are at the Z_STREAM_END. */
+ if (zip->status != Z_OK) {
+ return(NULL);
+ }
+
+ zip->zp->next_out = reinterpret_cast<byte*>(&len);
+ zip->zp->avail_out = sizeof(len);
+
+ while (zip->status == Z_OK && zip->zp->avail_out > 0) {
+
+ /* Finished decompressing block. */
+ if (zip->zp->avail_in == 0) {
+
+ /* Free the block that's been decompressed. */
+ if (zip->pos > 0) {
+ ulint prev = zip->pos - 1;
+
+ ut_a(zip->pos < ib_vector_size(zip->blocks));
+
+ ut_free(ib_vector_getp(zip->blocks, prev));
+ ib_vector_set(zip->blocks, prev, &null);
+ }
+
+ /* Any more blocks to decompress. */
+ if (zip->pos < ib_vector_size(zip->blocks)) {
+
+ zip->zp->next_in = static_cast<byte*>(
+ ib_vector_getp(
+ zip->blocks, zip->pos));
+
+ if (zip->pos > zip->last_big_block) {
+ zip->zp->avail_in =
+ FTS_MAX_WORD_LEN;
+ } else {
+ zip->zp->avail_in =
+ static_cast<uInt>(zip->block_sz);
+ }
+
+ ++zip->pos;
+ } else {
+ flush = Z_FINISH;
+ }
+ }
+
+ switch (zip->status = inflate(zip->zp, flush)) {
+ case Z_OK:
+ if (zip->zp->avail_out == 0 && len > 0) {
+
+ ut_a(len <= FTS_MAX_WORD_LEN);
+ ptr[len] = 0;
+
+ zip->zp->next_out = ptr;
+ zip->zp->avail_out = uInt(len);
+
+ word->f_len = ulint(len);
+ len = 0;
+ }
+ break;
+
+ case Z_BUF_ERROR: /* No progress possible. */
+ case Z_STREAM_END:
+ inflateEnd(zip->zp);
+ break;
+
+ case Z_STREAM_ERROR:
+ default:
+ ut_error;
+ }
+ }
+
+ /* All blocks must be freed at end of inflate. */
+ if (zip->status != Z_OK) {
+ for (ulint i = 0; i < ib_vector_size(zip->blocks); ++i) {
+ if (ib_vector_getp(zip->blocks, i)) {
+ ut_free(ib_vector_getp(zip->blocks, i));
+ ib_vector_set(zip->blocks, i, &null);
+ }
+ }
+ }
+
+ if (ptr != NULL) {
+ ut_ad(word->f_len == strlen((char*) ptr));
+ }
+
+ return(zip->status == Z_OK || zip->status == Z_STREAM_END ? ptr : NULL);
+}
+
+/**********************************************************************//**
+Callback function to fetch and compress the word in an FTS
+INDEX record.
+@return FALSE on EOF */
+static
+ibool
+fts_fetch_index_words(
+/*==================*/
+ void* row, /*!< in: sel_node_t* */
+ void* user_arg) /*!< in: pointer to ib_vector_t */
+{
+ sel_node_t* sel_node = static_cast<sel_node_t*>(row);
+ fts_zip_t* zip = static_cast<fts_zip_t*>(user_arg);
+ que_node_t* exp = sel_node->select_list;
+ dfield_t* dfield = que_node_get_val(exp);
+
+ ut_a(dfield_get_len(dfield) <= FTS_MAX_WORD_LEN);
+
+ uint16 len = uint16(dfield_get_len(dfield));
+ void* data = dfield_get_data(dfield);
+
+ /* Skip the duplicate words. */
+ if (zip->word.f_len == len && !memcmp(zip->word.f_str, data, len)) {
+ return(TRUE);
+ }
+
+ memcpy(zip->word.f_str, data, len);
+ zip->word.f_len = len;
+
+ ut_a(zip->zp->avail_in == 0);
+ ut_a(zip->zp->next_in == NULL);
+
+ /* The string is prefixed by len. */
+ /* FIXME: This is not byte order agnostic (InnoDB data files
+ with FULLTEXT INDEX are not portable between little-endian and
+ big-endian systems!) */
+ zip->zp->next_in = reinterpret_cast<byte*>(&len);
+ zip->zp->avail_in = sizeof(len);
+
+ /* Compress the word, create output blocks as necessary. */
+ while (zip->zp->avail_in > 0) {
+
+ /* No space left in output buffer, create a new one. */
+ if (zip->zp->avail_out == 0) {
+ byte* block;
+
+ block = static_cast<byte*>(
+ ut_malloc_nokey(zip->block_sz));
+
+ ib_vector_push(zip->blocks, &block);
+
+ zip->zp->next_out = block;
+ zip->zp->avail_out = static_cast<uInt>(zip->block_sz);
+ }
+
+ switch (zip->status = deflate(zip->zp, Z_NO_FLUSH)) {
+ case Z_OK:
+ if (zip->zp->avail_in == 0) {
+ zip->zp->next_in = static_cast<byte*>(data);
+ zip->zp->avail_in = uInt(len);
+ ut_a(len <= FTS_MAX_WORD_LEN);
+ len = 0;
+ }
+ continue;
+
+ case Z_STREAM_END:
+ case Z_BUF_ERROR:
+ case Z_STREAM_ERROR:
+ default:
+ ut_error;
+ }
+ }
+
+ /* All data should have been compressed. */
+ ut_a(zip->zp->avail_in == 0);
+ zip->zp->next_in = NULL;
+
+ ++zip->n_words;
+
+ return(zip->n_words >= zip->max_words ? FALSE : TRUE);
+}
+
+/**********************************************************************//**
+Finish Zip deflate. */
+static
+void
+fts_zip_deflate_end(
+/*================*/
+ fts_zip_t* zip) /*!< in: instance that should be closed*/
+{
+ ut_a(zip->zp->avail_in == 0);
+ ut_a(zip->zp->next_in == NULL);
+
+ zip->status = deflate(zip->zp, Z_FINISH);
+
+ ut_a(ib_vector_size(zip->blocks) > 0);
+ zip->last_big_block = ib_vector_size(zip->blocks) - 1;
+
+ /* Allocate smaller block(s), since this is trailing data. */
+ while (zip->status == Z_OK) {
+ byte* block;
+
+ ut_a(zip->zp->avail_out == 0);
+
+ block = static_cast<byte*>(
+ ut_malloc_nokey(FTS_MAX_WORD_LEN + 1));
+
+ ib_vector_push(zip->blocks, &block);
+
+ zip->zp->next_out = block;
+ zip->zp->avail_out = FTS_MAX_WORD_LEN;
+
+ zip->status = deflate(zip->zp, Z_FINISH);
+ }
+
+ ut_a(zip->status == Z_STREAM_END);
+
+ zip->status = deflateEnd(zip->zp);
+ ut_a(zip->status == Z_OK);
+
+ /* Reset the ZLib data structure. */
+ memset(zip->zp, 0, sizeof(*zip->zp));
+}
+
+/**********************************************************************//**
+Read the words from the FTS INDEX.
+@return DB_SUCCESS if all OK, DB_TABLE_NOT_FOUND if no more indexes
+ to search else error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_index_fetch_words(
+/*==================*/
+ fts_optimize_t* optim, /*!< in: optimize scratch pad */
+ const fts_string_t* word, /*!< in: get words greater than this
+ word */
+ ulint n_words)/*!< in: max words to read */
+{
+ pars_info_t* info;
+ que_t* graph;
+ ulint selected;
+ fts_zip_t* zip = NULL;
+ dberr_t error = DB_SUCCESS;
+ mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
+ ibool inited = FALSE;
+
+ optim->trx->op_info = "fetching FTS index words";
+
+ if (optim->zip == NULL) {
+ optim->zip = fts_zip_create(heap, FTS_ZIP_BLOCK_SIZE, n_words);
+ } else {
+ fts_zip_initialize(optim->zip);
+ }
+
+ for (selected = fts_select_index(
+ optim->fts_index_table.charset, word->f_str, word->f_len);
+ selected < FTS_NUM_AUX_INDEX;
+ selected++) {
+
+ char table_name[MAX_FULL_NAME_LEN];
+
+ optim->fts_index_table.suffix = fts_get_suffix(selected);
+
+ info = pars_info_create();
+
+ pars_info_bind_function(
+ info, "my_func", fts_fetch_index_words, optim->zip);
+
+ pars_info_bind_varchar_literal(
+ info, "word", word->f_str, word->f_len);
+
+ fts_get_table_name(&optim->fts_index_table, table_name);
+ pars_info_bind_id(info, true, "table_name", table_name);
+
+ graph = fts_parse_sql(
+ &optim->fts_index_table,
+ info,
+ "DECLARE FUNCTION my_func;\n"
+ "DECLARE CURSOR c IS"
+ " SELECT word\n"
+ " FROM $table_name\n"
+ " WHERE word > :word\n"
+ " ORDER BY word;\n"
+ "BEGIN\n"
+ "\n"
+ "OPEN c;\n"
+ "WHILE 1 = 1 LOOP\n"
+ " FETCH c INTO my_func();\n"
+ " IF c % NOTFOUND THEN\n"
+ " EXIT;\n"
+ " END IF;\n"
+ "END LOOP;\n"
+ "CLOSE c;");
+
+ zip = optim->zip;
+
+ for (;;) {
+ int err;
+
+ if (!inited && ((err = deflateInit(zip->zp, 9))
+ != Z_OK)) {
+ ib::error() << "ZLib deflateInit() failed: "
+ << err;
+
+ error = DB_ERROR;
+ break;
+ } else {
+ inited = TRUE;
+ error = fts_eval_sql(optim->trx, graph);
+ }
+
+ if (UNIV_LIKELY(error == DB_SUCCESS)) {
+ //FIXME fts_sql_commit(optim->trx);
+ break;
+ } else {
+ //FIXME fts_sql_rollback(optim->trx);
+
+ if (error == DB_LOCK_WAIT_TIMEOUT) {
+ ib::warn() << "Lock wait timeout"
+ " reading document. Retrying!";
+
+ /* We need to reset the ZLib state. */
+ inited = FALSE;
+ deflateEnd(zip->zp);
+ fts_zip_init(zip);
+
+ optim->trx->error_state = DB_SUCCESS;
+ } else {
+ ib::error() << "(" << error
+ << ") while reading document.";
+
+ break; /* Exit the loop. */
+ }
+ }
+ }
+
+ fts_que_graph_free(graph);
+
+ /* Check if max word to fetch is exceeded */
+ if (optim->zip->n_words >= n_words) {
+ break;
+ }
+ }
+
+ if (error == DB_SUCCESS && zip->status == Z_OK && zip->n_words > 0) {
+
+ /* All data should have been read. */
+ ut_a(zip->zp->avail_in == 0);
+
+ fts_zip_deflate_end(zip);
+ } else {
+ deflateEnd(zip->zp);
+ }
+
+ return(error);
+}
+
+/**********************************************************************//**
+Callback function to fetch the doc id from the record.
+@return always returns TRUE */
+static
+ibool
+fts_fetch_doc_ids(
+/*==============*/
+ void* row, /*!< in: sel_node_t* */
+ void* user_arg) /*!< in: pointer to ib_vector_t */
+{
+ que_node_t* exp;
+ int i = 0;
+ sel_node_t* sel_node = static_cast<sel_node_t*>(row);
+ fts_doc_ids_t* fts_doc_ids = static_cast<fts_doc_ids_t*>(user_arg);
+ doc_id_t* update = static_cast<doc_id_t*>(
+ ib_vector_push(fts_doc_ids->doc_ids, NULL));
+
+ for (exp = sel_node->select_list;
+ exp;
+ exp = que_node_get_next(exp), ++i) {
+
+ dfield_t* dfield = que_node_get_val(exp);
+ void* data = dfield_get_data(dfield);
+ ulint len = dfield_get_len(dfield);
+
+ ut_a(len != UNIV_SQL_NULL);
+
+ /* Note: The column numbers below must match the SELECT. */
+ switch (i) {
+ case 0: /* DOC_ID */
+ *update = fts_read_doc_id(
+ static_cast<byte*>(data));
+ break;
+
+ default:
+ ut_error;
+ }
+ }
+
+ return(TRUE);
+}
+
+/**********************************************************************//**
+Read the rows from a FTS common auxiliary table.
+@return DB_SUCCESS or error code */
+dberr_t
+fts_table_fetch_doc_ids(
+/*====================*/
+ trx_t* trx, /*!< in: transaction */
+ fts_table_t* fts_table, /*!< in: table */
+ fts_doc_ids_t* doc_ids) /*!< in: For collecting doc ids */
+{
+ dberr_t error;
+ que_t* graph;
+ pars_info_t* info = pars_info_create();
+ ibool alloc_bk_trx = FALSE;
+ char table_name[MAX_FULL_NAME_LEN];
+
+ ut_a(fts_table->suffix != NULL);
+ ut_a(fts_table->type == FTS_COMMON_TABLE);
+
+ if (!trx) {
+ trx = trx_create();
+ alloc_bk_trx = TRUE;
+ }
+
+ trx->op_info = "fetching FTS doc ids";
+
+ pars_info_bind_function(info, "my_func", fts_fetch_doc_ids, doc_ids);
+
+ fts_get_table_name(fts_table, table_name);
+ pars_info_bind_id(info, true, "table_name", table_name);
+
+ graph = fts_parse_sql(
+ fts_table,
+ info,
+ "DECLARE FUNCTION my_func;\n"
+ "DECLARE CURSOR c IS"
+ " SELECT doc_id FROM $table_name;\n"
+ "BEGIN\n"
+ "\n"
+ "OPEN c;\n"
+ "WHILE 1 = 1 LOOP\n"
+ " FETCH c INTO my_func();\n"
+ " IF c % NOTFOUND THEN\n"
+ " EXIT;\n"
+ " END IF;\n"
+ "END LOOP;\n"
+ "CLOSE c;");
+
+ error = fts_eval_sql(trx, graph);
+ fts_sql_commit(trx);
+
+ mutex_enter(&dict_sys.mutex);
+ que_graph_free(graph);
+ mutex_exit(&dict_sys.mutex);
+
+ if (error == DB_SUCCESS) {
+ ib_vector_sort(doc_ids->doc_ids, fts_doc_id_cmp);
+ }
+
+ if (alloc_bk_trx) {
+ trx->free();
+ }
+
+ return(error);
+}
+
+/**********************************************************************//**
+Do a binary search for a doc id in the array
+@return +ve index if found -ve index where it should be inserted
+ if not found */
+int
+fts_bsearch(
+/*========*/
+ doc_id_t* array, /*!< in: array to sort */
+ int lower, /*!< in: the array lower bound */
+ int upper, /*!< in: the array upper bound */
+ doc_id_t doc_id) /*!< in: the doc id to search for */
+{
+ int orig_size = upper;
+
+ if (upper == 0) {
+ /* Nothing to search */
+ return(-1);
+ } else {
+ while (lower < upper) {
+ int i = (lower + upper) >> 1;
+
+ if (doc_id > array[i]) {
+ lower = i + 1;
+ } else if (doc_id < array[i]) {
+ upper = i - 1;
+ } else {
+ return(i); /* Found. */
+ }
+ }
+ }
+
+ if (lower == upper && lower < orig_size) {
+ if (doc_id == array[lower]) {
+ return(lower);
+ } else if (lower == 0) {
+ return(-1);
+ }
+ }
+
+ /* Not found. */
+ return( (lower == 0) ? -1 : -(lower));
+}
+
+/**********************************************************************//**
+Search in the to delete array whether any of the doc ids within
+the [first, last] range are to be deleted
+@return +ve index if found -ve index where it should be inserted
+ if not found */
+static
+int
+fts_optimize_lookup(
+/*================*/
+ ib_vector_t* doc_ids, /*!< in: array to search */
+ ulint lower, /*!< in: lower limit of array */
+ doc_id_t first_doc_id, /*!< in: doc id to lookup */
+ doc_id_t last_doc_id) /*!< in: doc id to lookup */
+{
+ int pos;
+ int upper = static_cast<int>(ib_vector_size(doc_ids));
+ doc_id_t* array = (doc_id_t*) doc_ids->data;
+
+ pos = fts_bsearch(array, static_cast<int>(lower), upper, first_doc_id);
+
+ ut_a(abs(pos) <= upper + 1);
+
+ if (pos < 0) {
+
+ int i = abs(pos);
+
+ /* If i is 1, it could be first_doc_id is less than
+ either the first or second array item, do a
+ double check */
+ if (i == 1 && array[0] <= last_doc_id
+ && first_doc_id < array[0]) {
+ pos = 0;
+ } else if (i < upper && array[i] <= last_doc_id) {
+
+ /* Check if the "next" doc id is within the
+ first & last doc id of the node. */
+ pos = i;
+ }
+ }
+
+ return(pos);
+}
+
+/**********************************************************************//**
+Encode the word pos list into the node
+@return DB_SUCCESS or error code*/
+static MY_ATTRIBUTE((nonnull))
+dberr_t
+fts_optimize_encode_node(
+/*=====================*/
+ fts_node_t* node, /*!< in: node to fill*/
+ doc_id_t doc_id, /*!< in: doc id to encode */
+ fts_encode_t* enc) /*!< in: encoding state.*/
+{
+ byte* dst;
+ ulint enc_len;
+ ulint pos_enc_len;
+ doc_id_t doc_id_delta;
+ dberr_t error = DB_SUCCESS;
+ byte* src = enc->src_ilist_ptr;
+
+ if (node->first_doc_id == 0) {
+ ut_a(node->last_doc_id == 0);
+
+ node->first_doc_id = doc_id;
+ }
+
+ /* Calculate the space required to store the ilist. */
+ ut_ad(doc_id > node->last_doc_id);
+ doc_id_delta = doc_id - node->last_doc_id;
+ enc_len = fts_get_encoded_len(static_cast<ulint>(doc_id_delta));
+
+ /* Calculate the size of the encoded pos array. */
+ while (*src) {
+ fts_decode_vlc(&src);
+ }
+
+ /* Skip the 0x00 byte at the end of the word positions list. */
+ ++src;
+
+ /* Number of encoded pos bytes to copy. */
+ pos_enc_len = ulint(src - enc->src_ilist_ptr);
+
+ /* Total number of bytes required for copy. */
+ enc_len += pos_enc_len;
+
+ /* Check we have enough space in the destination buffer for
+ copying the document word list. */
+ if (!node->ilist) {
+ ulint new_size;
+
+ ut_a(node->ilist_size == 0);
+
+ new_size = enc_len > FTS_ILIST_MAX_SIZE
+ ? enc_len : FTS_ILIST_MAX_SIZE;
+
+ node->ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
+ node->ilist_size_alloc = new_size;
+
+ } else if ((node->ilist_size + enc_len) > node->ilist_size_alloc) {
+ ulint new_size = node->ilist_size + enc_len;
+ byte* ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
+
+ memcpy(ilist, node->ilist, node->ilist_size);
+
+ ut_free(node->ilist);
+
+ node->ilist = ilist;
+ node->ilist_size_alloc = new_size;
+ }
+
+ src = enc->src_ilist_ptr;
+ dst = node->ilist + node->ilist_size;
+
+ /* Encode the doc id. Cast to ulint, the delta should be small and
+ therefore no loss of precision. */
+ dst += fts_encode_int((ulint) doc_id_delta, dst);
+
+ /* Copy the encoded pos array. */
+ memcpy(dst, src, pos_enc_len);
+
+ node->last_doc_id = doc_id;
+
+ /* Data copied upto here. */
+ node->ilist_size += enc_len;
+ enc->src_ilist_ptr += pos_enc_len;
+
+ ut_a(node->ilist_size <= node->ilist_size_alloc);
+
+ return(error);
+}
+
+/**********************************************************************//**
+Optimize the data contained in a node.
+@return DB_SUCCESS or error code*/
+static MY_ATTRIBUTE((nonnull))
+dberr_t
+fts_optimize_node(
+/*==============*/
+ ib_vector_t* del_vec, /*!< in: vector of doc ids to delete*/
+ int* del_pos, /*!< in: offset into above vector */
+ fts_node_t* dst_node, /*!< in: node to fill*/
+ fts_node_t* src_node, /*!< in: source node for data*/
+ fts_encode_t* enc) /*!< in: encoding state */
+{
+ ulint copied;
+ dberr_t error = DB_SUCCESS;
+ doc_id_t doc_id = enc->src_last_doc_id;
+
+ if (!enc->src_ilist_ptr) {
+ enc->src_ilist_ptr = src_node->ilist;
+ }
+
+ copied = ulint(enc->src_ilist_ptr - src_node->ilist);
+
+ /* While there is data in the source node and space to copy
+ into in the destination node. */
+ while (copied < src_node->ilist_size
+ && dst_node->ilist_size < FTS_ILIST_MAX_SIZE) {
+
+ doc_id_t delta;
+ doc_id_t del_doc_id = FTS_NULL_DOC_ID;
+
+ delta = fts_decode_vlc(&enc->src_ilist_ptr);
+
+test_again:
+ /* Check whether the doc id is in the delete list, if
+ so then we skip the entries but we need to track the
+ delta for decoding the entries following this document's
+ entries. */
+ if (*del_pos >= 0 && *del_pos < (int) ib_vector_size(del_vec)) {
+ doc_id_t* update;
+
+ update = (doc_id_t*) ib_vector_get(
+ del_vec, ulint(*del_pos));
+
+ del_doc_id = *update;
+ }
+
+ if (enc->src_ilist_ptr == src_node->ilist && doc_id == 0) {
+ ut_a(delta == src_node->first_doc_id);
+ }
+
+ doc_id += delta;
+
+ if (del_doc_id > 0 && doc_id == del_doc_id) {
+
+ ++*del_pos;
+
+ /* Skip the entries for this document. */
+ while (*enc->src_ilist_ptr) {
+ fts_decode_vlc(&enc->src_ilist_ptr);
+ }
+
+ /* Skip the end of word position marker. */
+ ++enc->src_ilist_ptr;
+
+ } else {
+
+ /* DOC ID already becomes larger than
+ del_doc_id, check the next del_doc_id */
+ if (del_doc_id > 0 && doc_id > del_doc_id) {
+ del_doc_id = 0;
+ ++*del_pos;
+ delta = 0;
+ goto test_again;
+ }
+
+ /* Decode and copy the word positions into
+ the dest node. */
+ fts_optimize_encode_node(dst_node, doc_id, enc);
+
+ ++dst_node->doc_count;
+
+ ut_a(dst_node->last_doc_id == doc_id);
+ }
+
+ /* Bytes copied so for from source. */
+ copied = ulint(enc->src_ilist_ptr - src_node->ilist);
+ }
+
+ if (copied >= src_node->ilist_size) {
+ ut_a(doc_id == src_node->last_doc_id);
+ }
+
+ enc->src_last_doc_id = doc_id;
+
+ return(error);
+}
+
+/**********************************************************************//**
+Determine the starting pos within the deleted doc id vector for a word.
+@return delete position */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+int
+fts_optimize_deleted_pos(
+/*=====================*/
+ fts_optimize_t* optim, /*!< in: optimize state data */
+ fts_word_t* word) /*!< in: the word data to check */
+{
+ int del_pos;
+ ib_vector_t* del_vec = optim->to_delete->doc_ids;
+
+ /* Get the first and last dict ids for the word, we will use
+ these values to determine which doc ids need to be removed
+ when we coalesce the nodes. This way we can reduce the numer
+ of elements that need to be searched in the deleted doc ids
+ vector and secondly we can remove the doc ids during the
+ coalescing phase. */
+ if (ib_vector_size(del_vec) > 0) {
+ fts_node_t* node;
+ doc_id_t last_id;
+ doc_id_t first_id;
+ ulint size = ib_vector_size(word->nodes);
+
+ node = (fts_node_t*) ib_vector_get(word->nodes, 0);
+ first_id = node->first_doc_id;
+
+ node = (fts_node_t*) ib_vector_get(word->nodes, size - 1);
+ last_id = node->last_doc_id;
+
+ ut_a(first_id <= last_id);
+
+ del_pos = fts_optimize_lookup(
+ del_vec, optim->del_pos, first_id, last_id);
+ } else {
+
+ del_pos = -1; /* Note that there is nothing to delete. */
+ }
+
+ return(del_pos);
+}
+
+#define FTS_DEBUG_PRINT
+/**********************************************************************//**
+Compact the nodes for a word, we also remove any doc ids during the
+compaction pass.
+@return DB_SUCCESS or error code.*/
+static
+ib_vector_t*
+fts_optimize_word(
+/*==============*/
+ fts_optimize_t* optim, /*!< in: optimize state data */
+ fts_word_t* word) /*!< in: the word to optimize */
+{
+ fts_encode_t enc;
+ ib_vector_t* nodes;
+ ulint i = 0;
+ int del_pos;
+ fts_node_t* dst_node = NULL;
+ ib_vector_t* del_vec = optim->to_delete->doc_ids;
+ ulint size = ib_vector_size(word->nodes);
+
+ del_pos = fts_optimize_deleted_pos(optim, word);
+ nodes = ib_vector_create(word->heap_alloc, sizeof(*dst_node), 128);
+
+ enc.src_last_doc_id = 0;
+ enc.src_ilist_ptr = NULL;
+
+ while (i < size) {
+ ulint copied;
+ fts_node_t* src_node;
+
+ src_node = (fts_node_t*) ib_vector_get(word->nodes, i);
+
+ if (dst_node == NULL
+ || dst_node->last_doc_id > src_node->first_doc_id) {
+
+ dst_node = static_cast<fts_node_t*>(
+ ib_vector_push(nodes, NULL));
+ memset(dst_node, 0, sizeof(*dst_node));
+ }
+
+ /* Copy from the src to the dst node. */
+ fts_optimize_node(del_vec, &del_pos, dst_node, src_node, &enc);
+
+ ut_a(enc.src_ilist_ptr != NULL);
+
+ /* Determine the numer of bytes copied to dst_node. */
+ copied = ulint(enc.src_ilist_ptr - src_node->ilist);
+
+ /* Can't copy more than whats in the vlc array. */
+ ut_a(copied <= src_node->ilist_size);
+
+ /* We are done with this node release the resources. */
+ if (copied == src_node->ilist_size) {
+
+ enc.src_last_doc_id = 0;
+ enc.src_ilist_ptr = NULL;
+
+ ut_free(src_node->ilist);
+
+ src_node->ilist = NULL;
+ src_node->ilist_size = src_node->ilist_size_alloc = 0;
+
+ src_node = NULL;
+
+ ++i; /* Get next source node to OPTIMIZE. */
+ }
+
+ if (dst_node->ilist_size >= FTS_ILIST_MAX_SIZE || i >= size) {
+
+ dst_node = NULL;
+ }
+ }
+
+ /* All dst nodes created should have been added to the vector. */
+ ut_a(dst_node == NULL);
+
+ /* Return the OPTIMIZED nodes. */
+ return(nodes);
+}
+
+/**********************************************************************//**
+Update the FTS index table. This is a delete followed by an insert.
+@return DB_SUCCESS or error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_write_word(
+/*====================*/
+ trx_t* trx, /*!< in: transaction */
+ fts_table_t* fts_table, /*!< in: table of FTS index */
+ fts_string_t* word, /*!< in: word data to write */
+ ib_vector_t* nodes) /*!< in: the nodes to write */
+{
+ ulint i;
+ pars_info_t* info;
+ que_t* graph;
+ ulint selected;
+ dberr_t error = DB_SUCCESS;
+ char table_name[MAX_FULL_NAME_LEN];
+
+ info = pars_info_create();
+
+ ut_ad(fts_table->charset);
+
+ pars_info_bind_varchar_literal(
+ info, "word", word->f_str, word->f_len);
+
+ selected = fts_select_index(fts_table->charset,
+ word->f_str, word->f_len);
+
+ fts_table->suffix = fts_get_suffix(selected);
+ fts_get_table_name(fts_table, table_name);
+ pars_info_bind_id(info, true, "table_name", table_name);
+
+ graph = fts_parse_sql(
+ fts_table,
+ info,
+ "BEGIN DELETE FROM $table_name WHERE word = :word;");
+
+ error = fts_eval_sql(trx, graph);
+
+ if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
+ ib::error() << "(" << error << ") during optimize,"
+ " when deleting a word from the FTS index.";
+ }
+
+ fts_que_graph_free(graph);
+ graph = NULL;
+
+ /* Even if the operation needs to be rolled back and redone,
+ we iterate over the nodes in order to free the ilist. */
+ for (i = 0; i < ib_vector_size(nodes); ++i) {
+
+ fts_node_t* node = (fts_node_t*) ib_vector_get(nodes, i);
+
+ if (error == DB_SUCCESS) {
+ /* Skip empty node. */
+ if (node->ilist == NULL) {
+ ut_ad(node->ilist_size == 0);
+ continue;
+ }
+
+ error = fts_write_node(
+ trx, &graph, fts_table, word, node);
+
+ if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
+ ib::error() << "(" << error << ")"
+ " during optimize, while adding a"
+ " word to the FTS index.";
+ }
+ }
+
+ ut_free(node->ilist);
+ node->ilist = NULL;
+ node->ilist_size = node->ilist_size_alloc = 0;
+ }
+
+ if (graph != NULL) {
+ fts_que_graph_free(graph);
+ }
+
+ return(error);
+}
+
+/**********************************************************************//**
+Free fts_optimizer_word_t instanace.*/
+void
+fts_word_free(
+/*==========*/
+ fts_word_t* word) /*!< in: instance to free.*/
+{
+ mem_heap_t* heap = static_cast<mem_heap_t*>(word->heap_alloc->arg);
+
+#ifdef UNIV_DEBUG
+ memset(word, 0, sizeof(*word));
+#endif /* UNIV_DEBUG */
+
+ mem_heap_free(heap);
+}
+
+/**********************************************************************//**
+Optimize the word ilist and rewrite data to the FTS index.
+@return status one of RESTART, EXIT, ERROR */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_compact(
+/*=================*/
+ fts_optimize_t* optim, /*!< in: optimize state data */
+ dict_index_t* index, /*!< in: current FTS being optimized */
+ time_t start_time) /*!< in: optimize start time */
+{
+ ulint i;
+ dberr_t error = DB_SUCCESS;
+ ulint size = ib_vector_size(optim->words);
+
+ for (i = 0; i < size && error == DB_SUCCESS && !optim->done; ++i) {
+ fts_word_t* word;
+ ib_vector_t* nodes;
+ trx_t* trx = optim->trx;
+
+ word = (fts_word_t*) ib_vector_get(optim->words, i);
+
+ /* nodes is allocated from the word heap and will be destroyed
+ when the word is freed. We however have to be careful about
+ the ilist, that needs to be freed explicitly. */
+ nodes = fts_optimize_word(optim, word);
+
+ /* Update the data on disk. */
+ error = fts_optimize_write_word(
+ trx, &optim->fts_index_table, &word->text, nodes);
+
+ if (error == DB_SUCCESS) {
+ /* Write the last word optimized to the config table,
+ we use this value for restarting optimize. */
+ error = fts_config_set_index_value(
+ optim->trx, index,
+ FTS_LAST_OPTIMIZED_WORD, &word->text);
+ }
+
+ /* Free the word that was optimized. */
+ fts_word_free(word);
+
+ ulint interval = ulint(time(NULL) - start_time);
+
+ if (fts_optimize_time_limit > 0
+ && (lint(interval) < 0
+ || interval > fts_optimize_time_limit)) {
+
+ optim->done = TRUE;
+ }
+ }
+
+ return(error);
+}
+
+/**********************************************************************//**
+Create an instance of fts_optimize_t. Also create a new
+background transaction.*/
+static
+fts_optimize_t*
+fts_optimize_create(
+/*================*/
+ dict_table_t* table) /*!< in: table with FTS indexes */
+{
+ fts_optimize_t* optim;
+ mem_heap_t* heap = mem_heap_create(128);
+
+ optim = (fts_optimize_t*) mem_heap_zalloc(heap, sizeof(*optim));
+
+ optim->self_heap = ib_heap_allocator_create(heap);
+
+ optim->to_delete = fts_doc_ids_create();
+
+ optim->words = ib_vector_create(
+ optim->self_heap, sizeof(fts_word_t), 256);
+
+ optim->table = table;
+
+ optim->trx = trx_create();
+ trx_start_internal(optim->trx);
+
+ optim->fts_common_table.table_id = table->id;
+ optim->fts_common_table.type = FTS_COMMON_TABLE;
+ optim->fts_common_table.table = table;
+
+ optim->fts_index_table.table_id = table->id;
+ optim->fts_index_table.type = FTS_INDEX_TABLE;
+ optim->fts_index_table.table = table;
+
+ /* The common prefix for all this parent table's aux tables. */
+ optim->name_prefix = fts_get_table_name_prefix(
+ &optim->fts_common_table);
+
+ return(optim);
+}
+
+#ifdef FTS_OPTIMIZE_DEBUG
+/**********************************************************************//**
+Get optimize start time of an FTS index.
+@return DB_SUCCESS if all OK else error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_get_index_start_time(
+/*==============================*/
+ trx_t* trx, /*!< in: transaction */
+ dict_index_t* index, /*!< in: FTS index */
+ time_t* start_time) /*!< out: time in secs */
+{
+ return(fts_config_get_index_ulint(
+ trx, index, FTS_OPTIMIZE_START_TIME,
+ (ulint*) start_time));
+}
+
+/**********************************************************************//**
+Set the optimize start time of an FTS index.
+@return DB_SUCCESS if all OK else error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_set_index_start_time(
+/*==============================*/
+ trx_t* trx, /*!< in: transaction */
+ dict_index_t* index, /*!< in: FTS index */
+ time_t start_time) /*!< in: start time */
+{
+ return(fts_config_set_index_ulint(
+ trx, index, FTS_OPTIMIZE_START_TIME,
+ (ulint) start_time));
+}
+
+/**********************************************************************//**
+Get optimize end time of an FTS index.
+@return DB_SUCCESS if all OK else error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_get_index_end_time(
+/*============================*/
+ trx_t* trx, /*!< in: transaction */
+ dict_index_t* index, /*!< in: FTS index */
+ time_t* end_time) /*!< out: time in secs */
+{
+ return(fts_config_get_index_ulint(
+ trx, index, FTS_OPTIMIZE_END_TIME, (ulint*) end_time));
+}
+
+/**********************************************************************//**
+Set the optimize end time of an FTS index.
+@return DB_SUCCESS if all OK else error code */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_set_index_end_time(
+/*============================*/
+ trx_t* trx, /*!< in: transaction */
+ dict_index_t* index, /*!< in: FTS index */
+ time_t end_time) /*!< in: end time */
+{
+ return(fts_config_set_index_ulint(
+ trx, index, FTS_OPTIMIZE_END_TIME, (ulint) end_time));
+}
+#endif
+
+/**********************************************************************//**
+Free the optimize prepared statements.*/
+static
+void
+fts_optimize_graph_free(
+/*====================*/
+ fts_optimize_graph_t* graph) /*!< in/out: The graph instances
+ to free */
+{
+ if (graph->commit_graph) {
+ que_graph_free(graph->commit_graph);
+ graph->commit_graph = NULL;
+ }
+
+ if (graph->write_nodes_graph) {
+ que_graph_free(graph->write_nodes_graph);
+ graph->write_nodes_graph = NULL;
+ }
+
+ if (graph->delete_nodes_graph) {
+ que_graph_free(graph->delete_nodes_graph);
+ graph->delete_nodes_graph = NULL;
+ }
+
+ if (graph->read_nodes_graph) {
+ que_graph_free(graph->read_nodes_graph);
+ graph->read_nodes_graph = NULL;
+ }
+}
+
+/**********************************************************************//**
+Free all optimize resources. */
+static
+void
+fts_optimize_free(
+/*==============*/
+ fts_optimize_t* optim) /*!< in: table with on FTS index */
+{
+ mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
+
+ trx_commit_for_mysql(optim->trx);
+ optim->trx->free();
+ optim->trx = NULL;
+
+ fts_doc_ids_free(optim->to_delete);
+ fts_optimize_graph_free(&optim->graph);
+
+ ut_free(optim->name_prefix);
+
+ /* This will free the heap from which optim itself was allocated. */
+ mem_heap_free(heap);
+}
+
+/**********************************************************************//**
+Get the max time optimize should run in millisecs.
+@return max optimize time limit in millisecs. */
+static
+ulint
+fts_optimize_get_time_limit(
+/*========================*/
+ trx_t* trx, /*!< in: transaction */
+ fts_table_t* fts_table) /*!< in: aux table */
+{
+ ulint time_limit = 0;
+
+ fts_config_get_ulint(
+ trx, fts_table,
+ FTS_OPTIMIZE_LIMIT_IN_SECS, &time_limit);
+
+ /* FIXME: This is returning milliseconds, while the variable
+ is being stored and interpreted as seconds! */
+ return(time_limit * 1000);
+}
+
+/**********************************************************************//**
+Run OPTIMIZE on the given table. Note: this can take a very long time
+(hours). */
+static
+void
+fts_optimize_words(
+/*===============*/
+ fts_optimize_t* optim, /*!< in: optimize instance */
+ dict_index_t* index, /*!< in: current FTS being optimized */
+ fts_string_t* word) /*!< in: the starting word to optimize */
+{
+ fts_fetch_t fetch;
+ que_t* graph = NULL;
+ CHARSET_INFO* charset = optim->fts_index_table.charset;
+
+ ut_a(!optim->done);
+
+ /* Get the time limit from the config table. */
+ fts_optimize_time_limit = fts_optimize_get_time_limit(
+ optim->trx, &optim->fts_common_table);
+
+ const time_t start_time = time(NULL);
+
+ /* Setup the callback to use for fetching the word ilist etc. */
+ fetch.read_arg = optim->words;
+ fetch.read_record = fts_optimize_index_fetch_node;
+
+ while (!optim->done) {
+ dberr_t error;
+ trx_t* trx = optim->trx;
+ ulint selected;
+
+ ut_a(ib_vector_size(optim->words) == 0);
+
+ selected = fts_select_index(charset, word->f_str, word->f_len);
+
+ /* Read the index records to optimize. */
+ fetch.total_memory = 0;
+ error = fts_index_fetch_nodes(
+ trx, &graph, &optim->fts_index_table, word,
+ &fetch);
+ ut_ad(fetch.total_memory < fts_result_cache_limit);
+
+ if (error == DB_SUCCESS) {
+ /* There must be some nodes to read. */
+ ut_a(ib_vector_size(optim->words) > 0);
+
+ /* Optimize the nodes that were read and write
+ back to DB. */
+ error = fts_optimize_compact(optim, index, start_time);
+
+ if (error == DB_SUCCESS) {
+ fts_sql_commit(optim->trx);
+ } else {
+ fts_sql_rollback(optim->trx);
+ }
+ }
+
+ ib_vector_reset(optim->words);
+
+ if (error == DB_SUCCESS) {
+ if (!optim->done) {
+ if (!fts_zip_read_word(optim->zip, word)) {
+ optim->done = TRUE;
+ } else if (selected
+ != fts_select_index(
+ charset, word->f_str,
+ word->f_len)
+ && graph) {
+ fts_que_graph_free(graph);
+ graph = NULL;
+ }
+ }
+ } else if (error == DB_LOCK_WAIT_TIMEOUT) {
+ ib::warn() << "Lock wait timeout during optimize."
+ " Retrying!";
+
+ trx->error_state = DB_SUCCESS;
+ } else if (error == DB_DEADLOCK) {
+ ib::warn() << "Deadlock during optimize. Retrying!";
+
+ trx->error_state = DB_SUCCESS;
+ } else {
+ optim->done = TRUE; /* Exit the loop. */
+ }
+ }
+
+ if (graph != NULL) {
+ fts_que_graph_free(graph);
+ }
+}
+
+/**********************************************************************//**
+Optimize is complete. Set the completion time, and reset the optimize
+start string for this FTS index to "".
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_index_completed(
+/*=========================*/
+ fts_optimize_t* optim, /*!< in: optimize instance */
+ dict_index_t* index) /*!< in: table with one FTS index */
+{
+ fts_string_t word;
+ dberr_t error;
+ byte buf[sizeof(ulint)];
+#ifdef FTS_OPTIMIZE_DEBUG
+ time_t end_time = time(NULL);
+
+ error = fts_optimize_set_index_end_time(optim->trx, index, end_time);
+#endif
+
+ /* If we've reached the end of the index then set the start
+ word to the empty string. */
+
+ word.f_len = 0;
+ word.f_str = buf;
+ *word.f_str = '\0';
+
+ error = fts_config_set_index_value(
+ optim->trx, index, FTS_LAST_OPTIMIZED_WORD, &word);
+
+ if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
+ ib::error() << "(" << error << ") while updating"
+ " last optimized word!";
+ }
+
+ return(error);
+}
+
+
+/**********************************************************************//**
+Read the list of words from the FTS auxiliary index that will be
+optimized in this pass.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_index_read_words(
+/*==========================*/
+ fts_optimize_t* optim, /*!< in: optimize instance */
+ dict_index_t* index, /*!< in: table with one FTS index */
+ fts_string_t* word) /*!< in: buffer to use */
+{
+ dberr_t error = DB_SUCCESS;
+
+ if (optim->del_list_regenerated) {
+ word->f_len = 0;
+ } else {
+
+ /* Get the last word that was optimized from
+ the config table. */
+ error = fts_config_get_index_value(
+ optim->trx, index, FTS_LAST_OPTIMIZED_WORD, word);
+ }
+
+ /* If record not found then we start from the top. */
+ if (error == DB_RECORD_NOT_FOUND) {
+ word->f_len = 0;
+ error = DB_SUCCESS;
+ }
+
+ while (error == DB_SUCCESS) {
+
+ error = fts_index_fetch_words(
+ optim, word, fts_num_word_optimize);
+
+ if (error == DB_SUCCESS) {
+ /* Reset the last optimized word to '' if no
+ more words could be read from the FTS index. */
+ if (optim->zip->n_words == 0) {
+ word->f_len = 0;
+ *word->f_str = 0;
+ }
+
+ break;
+ }
+ }
+
+ return(error);
+}
+
+/**********************************************************************//**
+Run OPTIMIZE on the given FTS index. Note: this can take a very long
+time (hours).
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_index(
+/*===============*/
+ fts_optimize_t* optim, /*!< in: optimize instance */
+ dict_index_t* index) /*!< in: table with one FTS index */
+{
+ fts_string_t word;
+ dberr_t error;
+ byte str[FTS_MAX_WORD_LEN + 1];
+
+ /* Set the current index that we have to optimize. */
+ optim->fts_index_table.index_id = index->id;
+ optim->fts_index_table.charset = fts_index_get_charset(index);
+
+ optim->done = FALSE; /* Optimize until !done */
+
+ /* We need to read the last word optimized so that we start from
+ the next word. */
+ word.f_str = str;
+
+ /* We set the length of word to the size of str since we
+ need to pass the max len info to the fts_get_config_value() function. */
+ word.f_len = sizeof(str) - 1;
+
+ memset(word.f_str, 0x0, word.f_len);
+
+ /* Read the words that will be optimized in this pass. */
+ error = fts_optimize_index_read_words(optim, index, &word);
+
+ if (error == DB_SUCCESS) {
+ int zip_error;
+
+ ut_a(optim->zip->pos == 0);
+ ut_a(optim->zip->zp->total_in == 0);
+ ut_a(optim->zip->zp->total_out == 0);
+
+ zip_error = inflateInit(optim->zip->zp);
+ ut_a(zip_error == Z_OK);
+
+ word.f_len = 0;
+ word.f_str = str;
+
+ /* Read the first word to optimize from the Zip buffer. */
+ if (!fts_zip_read_word(optim->zip, &word)) {
+
+ optim->done = TRUE;
+ } else {
+ fts_optimize_words(optim, index, &word);
+ }
+
+ /* If we couldn't read any records then optimize is
+ complete. Increment the number of indexes that have
+ been optimized and set FTS index optimize state to
+ completed. */
+ if (error == DB_SUCCESS && optim->zip->n_words == 0) {
+
+ error = fts_optimize_index_completed(optim, index);
+
+ if (error == DB_SUCCESS) {
+ ++optim->n_completed;
+ }
+ }
+ }
+
+ return(error);
+}
+
+/**********************************************************************//**
+Delete the document ids in the delete, and delete cache tables.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_purge_deleted_doc_ids(
+/*===============================*/
+ fts_optimize_t* optim) /*!< in: optimize instance */
+{
+ ulint i;
+ pars_info_t* info;
+ que_t* graph;
+ doc_id_t* update;
+ doc_id_t write_doc_id;
+ dberr_t error = DB_SUCCESS;
+ char deleted[MAX_FULL_NAME_LEN];
+ char deleted_cache[MAX_FULL_NAME_LEN];
+
+ info = pars_info_create();
+
+ ut_a(ib_vector_size(optim->to_delete->doc_ids) > 0);
+
+ update = static_cast<doc_id_t*>(
+ ib_vector_get(optim->to_delete->doc_ids, 0));
+
+ /* Convert to "storage" byte order. */
+ fts_write_doc_id((byte*) &write_doc_id, *update);
+
+ /* This is required for the SQL parser to work. It must be able
+ to find the following variables. So we do it twice. */
+ fts_bind_doc_id(info, "doc_id1", &write_doc_id);
+ fts_bind_doc_id(info, "doc_id2", &write_doc_id);
+
+ /* Make sure the following two names are consistent with the name
+ used in the fts_delete_doc_ids_sql */
+ optim->fts_common_table.suffix = fts_common_tables[3];
+ fts_get_table_name(&optim->fts_common_table, deleted);
+ pars_info_bind_id(info, true, fts_common_tables[3], deleted);
+
+ optim->fts_common_table.suffix = fts_common_tables[4];
+ fts_get_table_name(&optim->fts_common_table, deleted_cache);
+ pars_info_bind_id(info, true, fts_common_tables[4], deleted_cache);
+
+ graph = fts_parse_sql(NULL, info, fts_delete_doc_ids_sql);
+
+ /* Delete the doc ids that were copied at the start. */
+ for (i = 0; i < ib_vector_size(optim->to_delete->doc_ids); ++i) {
+
+ update = static_cast<doc_id_t*>(ib_vector_get(
+ optim->to_delete->doc_ids, i));
+
+ /* Convert to "storage" byte order. */
+ fts_write_doc_id((byte*) &write_doc_id, *update);
+
+ fts_bind_doc_id(info, "doc_id1", &write_doc_id);
+
+ fts_bind_doc_id(info, "doc_id2", &write_doc_id);
+
+ error = fts_eval_sql(optim->trx, graph);
+
+ // FIXME: Check whether delete actually succeeded!
+ if (error != DB_SUCCESS) {
+
+ fts_sql_rollback(optim->trx);
+ break;
+ }
+ }
+
+ fts_que_graph_free(graph);
+
+ return(error);
+}
+
+/**********************************************************************//**
+Delete the document ids in the pending delete, and delete tables.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_purge_deleted_doc_id_snapshot(
+/*=======================================*/
+ fts_optimize_t* optim) /*!< in: optimize instance */
+{
+ dberr_t error;
+ que_t* graph;
+ pars_info_t* info;
+ char being_deleted[MAX_FULL_NAME_LEN];
+ char being_deleted_cache[MAX_FULL_NAME_LEN];
+
+ info = pars_info_create();
+
+ /* Make sure the following two names are consistent with the name
+ used in the fts_end_delete_sql */
+ optim->fts_common_table.suffix = fts_common_tables[0];
+ fts_get_table_name(&optim->fts_common_table, being_deleted);
+ pars_info_bind_id(info, true, fts_common_tables[0], being_deleted);
+
+ optim->fts_common_table.suffix = fts_common_tables[1];
+ fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
+ pars_info_bind_id(info, true, fts_common_tables[1],
+ being_deleted_cache);
+
+ /* Delete the doc ids that were copied to delete pending state at
+ the start of optimize. */
+ graph = fts_parse_sql(NULL, info, fts_end_delete_sql);
+
+ error = fts_eval_sql(optim->trx, graph);
+ fts_que_graph_free(graph);
+
+ return(error);
+}
+
+/**********************************************************************//**
+Copy the deleted doc ids that will be purged during this optimize run
+to the being deleted FTS auxiliary tables. The transaction is committed
+upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
+@return DB_SUCCESS if all OK */
+static
+ulint
+fts_optimize_being_deleted_count(
+/*=============================*/
+ fts_optimize_t* optim) /*!< in: optimize instance */
+{
+ fts_table_t fts_table;
+
+ FTS_INIT_FTS_TABLE(&fts_table, "BEING_DELETED", FTS_COMMON_TABLE,
+ optim->table);
+
+ return(fts_get_rows_count(&fts_table));
+}
+
+/*********************************************************************//**
+Copy the deleted doc ids that will be purged during this optimize run
+to the being deleted FTS auxiliary tables. The transaction is committed
+upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_create_deleted_doc_id_snapshot(
+/*========================================*/
+ fts_optimize_t* optim) /*!< in: optimize instance */
+{
+ dberr_t error;
+ que_t* graph;
+ pars_info_t* info;
+ char being_deleted[MAX_FULL_NAME_LEN];
+ char deleted[MAX_FULL_NAME_LEN];
+ char being_deleted_cache[MAX_FULL_NAME_LEN];
+ char deleted_cache[MAX_FULL_NAME_LEN];
+
+ info = pars_info_create();
+
+ /* Make sure the following four names are consistent with the name
+ used in the fts_init_delete_sql */
+ optim->fts_common_table.suffix = fts_common_tables[0];
+ fts_get_table_name(&optim->fts_common_table, being_deleted);
+ pars_info_bind_id(info, true, fts_common_tables[0], being_deleted);
+
+ optim->fts_common_table.suffix = fts_common_tables[3];
+ fts_get_table_name(&optim->fts_common_table, deleted);
+ pars_info_bind_id(info, true, fts_common_tables[3], deleted);
+
+ optim->fts_common_table.suffix = fts_common_tables[1];
+ fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
+ pars_info_bind_id(info, true, fts_common_tables[1],
+ being_deleted_cache);
+
+ optim->fts_common_table.suffix = fts_common_tables[4];
+ fts_get_table_name(&optim->fts_common_table, deleted_cache);
+ pars_info_bind_id(info, true, fts_common_tables[4], deleted_cache);
+
+ /* Move doc_ids that are to be deleted to state being deleted. */
+ graph = fts_parse_sql(NULL, info, fts_init_delete_sql);
+
+ error = fts_eval_sql(optim->trx, graph);
+
+ fts_que_graph_free(graph);
+
+ if (error != DB_SUCCESS) {
+ fts_sql_rollback(optim->trx);
+ } else {
+ fts_sql_commit(optim->trx);
+ }
+
+ optim->del_list_regenerated = TRUE;
+
+ return(error);
+}
+
+/*********************************************************************//**
+Read in the document ids that are to be purged during optimize. The
+transaction is committed upon successfully read.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_read_deleted_doc_id_snapshot(
+/*======================================*/
+ fts_optimize_t* optim) /*!< in: optimize instance */
+{
+ dberr_t error;
+
+ optim->fts_common_table.suffix = "BEING_DELETED";
+
+ /* Read the doc_ids to delete. */
+ error = fts_table_fetch_doc_ids(
+ optim->trx, &optim->fts_common_table, optim->to_delete);
+
+ if (error == DB_SUCCESS) {
+
+ optim->fts_common_table.suffix = "BEING_DELETED_CACHE";
+
+ /* Read additional doc_ids to delete. */
+ error = fts_table_fetch_doc_ids(
+ optim->trx, &optim->fts_common_table, optim->to_delete);
+ }
+
+ if (error != DB_SUCCESS) {
+
+ fts_doc_ids_free(optim->to_delete);
+ optim->to_delete = NULL;
+ }
+
+ return(error);
+}
+
+/*********************************************************************//**
+Optimze all the FTS indexes, skipping those that have already been
+optimized, since the FTS auxiliary indexes are not guaranteed to be
+of the same cardinality.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_indexes(
+/*=================*/
+ fts_optimize_t* optim) /*!< in: optimize instance */
+{
+ ulint i;
+ dberr_t error = DB_SUCCESS;
+ fts_t* fts = optim->table->fts;
+
+ /* Optimize the FTS indexes. */
+ for (i = 0; i < ib_vector_size(fts->indexes); ++i) {
+ dict_index_t* index;
+
+#ifdef FTS_OPTIMIZE_DEBUG
+ time_t end_time;
+ time_t start_time;
+
+ /* Get the start and end optimize times for this index. */
+ error = fts_optimize_get_index_start_time(
+ optim->trx, index, &start_time);
+
+ if (error != DB_SUCCESS) {
+ break;
+ }
+
+ error = fts_optimize_get_index_end_time(
+ optim->trx, index, &end_time);
+
+ if (error != DB_SUCCESS) {
+ break;
+ }
+
+ /* Start time will be 0 only for the first time or after
+ completing the optimization of all FTS indexes. */
+ if (start_time == 0) {
+ start_time = time(NULL);
+
+ error = fts_optimize_set_index_start_time(
+ optim->trx, index, start_time);
+ }
+
+ /* Check if this index needs to be optimized or not. */
+ if (difftime(end_time, start_time) < 0) {
+ error = fts_optimize_index(optim, index);
+
+ if (error != DB_SUCCESS) {
+ break;
+ }
+ } else {
+ ++optim->n_completed;
+ }
+#endif
+ index = static_cast<dict_index_t*>(
+ ib_vector_getp(fts->indexes, i));
+ error = fts_optimize_index(optim, index);
+ }
+
+ if (error == DB_SUCCESS) {
+ fts_sql_commit(optim->trx);
+ } else {
+ fts_sql_rollback(optim->trx);
+ }
+
+ return(error);
+}
+
+/*********************************************************************//**
+Cleanup the snapshot tables and the master deleted table.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_purge_snapshot(
+/*========================*/
+ fts_optimize_t* optim) /*!< in: optimize instance */
+{
+ dberr_t error;
+
+ /* Delete the doc ids from the master deleted tables, that were
+ in the snapshot that was taken at the start of optimize. */
+ error = fts_optimize_purge_deleted_doc_ids(optim);
+
+ if (error == DB_SUCCESS) {
+ /* Destroy the deleted doc id snapshot. */
+ error = fts_optimize_purge_deleted_doc_id_snapshot(optim);
+ }
+
+ if (error == DB_SUCCESS) {
+ fts_sql_commit(optim->trx);
+ } else {
+ fts_sql_rollback(optim->trx);
+ }
+
+ return(error);
+}
+
+/*********************************************************************//**
+Reset the start time to 0 so that a new optimize can be started.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull, warn_unused_result))
+dberr_t
+fts_optimize_reset_start_time(
+/*==========================*/
+ fts_optimize_t* optim) /*!< in: optimize instance */
+{
+ dberr_t error = DB_SUCCESS;
+#ifdef FTS_OPTIMIZE_DEBUG
+ fts_t* fts = optim->table->fts;
+
+ /* Optimization should have been completed for all indexes. */
+ ut_a(optim->n_completed == ib_vector_size(fts->indexes));
+
+ for (uint i = 0; i < ib_vector_size(fts->indexes); ++i) {
+ dict_index_t* index;
+
+ time_t start_time = 0;
+
+ /* Reset the start time to 0 for this index. */
+ error = fts_optimize_set_index_start_time(
+ optim->trx, index, start_time);
+
+ index = static_cast<dict_index_t*>(
+ ib_vector_getp(fts->indexes, i));
+ }
+#endif
+
+ if (error == DB_SUCCESS) {
+ fts_sql_commit(optim->trx);
+ } else {
+ fts_sql_rollback(optim->trx);
+ }
+
+ return(error);
+}
+
+/*********************************************************************//**
+Run OPTIMIZE on the given table by a background thread.
+@return DB_SUCCESS if all OK */
+static MY_ATTRIBUTE((nonnull))
+dberr_t
+fts_optimize_table_bk(
+/*==================*/
+ fts_slot_t* slot) /*!< in: table to optimiza */
+{
+ const time_t now = time(NULL);
+ const ulint interval = ulint(now - slot->last_run);
+
+ /* Avoid optimizing tables that were optimized recently. */
+ if (slot->last_run > 0
+ && lint(interval) >= 0
+ && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS) {
+
+ return(DB_SUCCESS);
+ }
+
+ dict_table_t* table = slot->table;
+ dberr_t error;
+
+ if (table->is_accessible()
+ && table->fts && table->fts->cache
+ && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
+ error = fts_optimize_table(table);
+
+ slot->last_run = time(NULL);
+
+ if (error == DB_SUCCESS) {
+ slot->running = false;
+ slot->completed = slot->last_run;
+ }
+ } else {
+ /* Note time this run completed. */
+ slot->last_run = now;
+ error = DB_SUCCESS;
+ }
+
+ return(error);
+}
+/*********************************************************************//**
+Run OPTIMIZE on the given table.
+@return DB_SUCCESS if all OK */
+dberr_t
+fts_optimize_table(
+/*===============*/
+ dict_table_t* table) /*!< in: table to optimiza */
+{
+ if (srv_read_only_mode) {
+ return DB_READ_ONLY;
+ }
+
+ dberr_t error = DB_SUCCESS;
+ fts_optimize_t* optim = NULL;
+ fts_t* fts = table->fts;
+
+ if (UNIV_UNLIKELY(fts_enable_diag_print)) {
+ ib::info() << "FTS start optimize " << table->name;
+ }
+
+ optim = fts_optimize_create(table);
+
+ // FIXME: Call this only at the start of optimize, currently we
+ // rely on DB_DUPLICATE_KEY to handle corrupting the snapshot.
+
+ /* Check whether there are still records in BEING_DELETED table */
+ if (fts_optimize_being_deleted_count(optim) == 0) {
+ /* Take a snapshot of the deleted document ids, they are copied
+ to the BEING_ tables. */
+ error = fts_optimize_create_deleted_doc_id_snapshot(optim);
+ }
+
+ /* A duplicate error is OK, since we don't erase the
+ doc ids from the being deleted state until all FTS
+ indexes have been optimized. */
+ if (error == DB_DUPLICATE_KEY) {
+ error = DB_SUCCESS;
+ }
+
+ if (error == DB_SUCCESS) {
+
+ /* These document ids will be filtered out during the
+ index optimization phase. They are in the snapshot that we
+ took above, at the start of the optimize. */
+ error = fts_optimize_read_deleted_doc_id_snapshot(optim);
+
+ if (error == DB_SUCCESS) {
+
+ /* Commit the read of being deleted
+ doc ids transaction. */
+ fts_sql_commit(optim->trx);
+
+ /* We would do optimization only if there
+ are deleted records to be cleaned up */
+ if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
+ error = fts_optimize_indexes(optim);
+ }
+
+ } else {
+ ut_a(optim->to_delete == NULL);
+ }
+
+ /* Only after all indexes have been optimized can we
+ delete the (snapshot) doc ids in the pending delete,
+ and master deleted tables. */
+ if (error == DB_SUCCESS
+ && optim->n_completed == ib_vector_size(fts->indexes)) {
+
+ if (UNIV_UNLIKELY(fts_enable_diag_print)) {
+ ib::info() << "FTS_OPTIMIZE: Completed"
+ " Optimize, cleanup DELETED table";
+ }
+
+ if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
+
+ /* Purge the doc ids that were in the
+ snapshot from the snapshot tables and
+ the master deleted table. */
+ error = fts_optimize_purge_snapshot(optim);
+ }
+
+ if (error == DB_SUCCESS) {
+ /* Reset the start time of all the FTS indexes
+ so that optimize can be restarted. */
+ error = fts_optimize_reset_start_time(optim);
+ }
+ }
+ }
+
+ fts_optimize_free(optim);
+
+ if (UNIV_UNLIKELY(fts_enable_diag_print)) {
+ ib::info() << "FTS end optimize " << table->name;
+ }
+
+ return(error);
+}
+
+/********************************************************************//**
+Add the table to add to the OPTIMIZER's list.
+@return new message instance */
+static
+fts_msg_t*
+fts_optimize_create_msg(
+/*====================*/
+ fts_msg_type_t type, /*!< in: type of message */
+ void* ptr) /*!< in: message payload */
+{
+ mem_heap_t* heap;
+ fts_msg_t* msg;
+
+ heap = mem_heap_create(sizeof(*msg) + sizeof(ib_list_node_t) + 16);
+ msg = static_cast<fts_msg_t*>(mem_heap_alloc(heap, sizeof(*msg)));
+
+ msg->ptr = ptr;
+ msg->type = type;
+ msg->heap = heap;
+
+ return(msg);
+}
+
+/** Add message to wqueue, signal thread pool*/
+static void add_msg(fts_msg_t *msg, bool wq_locked= false)
+{
+ ib_wqueue_add(fts_optimize_wq, msg, msg->heap, wq_locked);
+ srv_thread_pool->submit_task(&task);
+}
+
+/**
+Called by "idle" timer. Submits optimize task, which
+will only recalculate is_sync_needed, in case the queue is empty.
+*/
+static void timer_callback(void*)
+{
+ srv_thread_pool->submit_task(&task);
+}
+
+/** Add the table to add to the OPTIMIZER's list.
+@param[in] table table to add */
+void fts_optimize_add_table(dict_table_t* table)
+{
+ fts_msg_t* msg;
+
+ if (!fts_optimize_wq) {
+ return;
+ }
+
+ /* Make sure table with FTS index cannot be evicted */
+ dict_table_prevent_eviction(table);
+
+ msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
+
+ mutex_enter(&fts_optimize_wq->mutex);
+
+ add_msg(msg, true);
+
+ table->fts->in_queue = true;
+
+ mutex_exit(&fts_optimize_wq->mutex);
+}
+
+/**********************************************************************//**
+Remove the table from the OPTIMIZER's list. We do wait for
+acknowledgement from the consumer of the message. */
+void
+fts_optimize_remove_table(
+/*======================*/
+ dict_table_t* table) /*!< in: table to remove */
+{
+ fts_msg_t* msg;
+ os_event_t event;
+ fts_msg_del_t* remove;
+
+ /* if the optimize system not yet initialized, return */
+ if (!fts_optimize_wq) {
+ return;
+ }
+
+ /* FTS optimizer thread is already exited */
+ if (fts_opt_start_shutdown) {
+ ib::info() << "Try to remove table " << table->name
+ << " after FTS optimize thread exiting.";
+ /* If the table can't be removed then wait till
+ fts optimize thread shuts down */
+ while (fts_optimize_wq) {
+ os_thread_sleep(10000);
+ }
+ return;
+ }
+
+ mutex_enter(&fts_optimize_wq->mutex);
+
+ if (!table->fts->in_queue) {
+ mutex_exit(&fts_optimize_wq->mutex);
+ return;
+ }
+
+ msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
+
+ /* We will wait on this event until signalled by the consumer. */
+ event = os_event_create(0);
+
+ remove = static_cast<fts_msg_del_t*>(
+ mem_heap_alloc(msg->heap, sizeof(*remove)));
+
+ remove->table = table;
+ remove->event = event;
+ msg->ptr = remove;
+
+ ut_ad(!mutex_own(&dict_sys.mutex));
+
+ add_msg(msg, true);
+
+ mutex_exit(&fts_optimize_wq->mutex);
+
+ os_event_wait(event);
+
+ os_event_destroy(event);
+
+#ifdef UNIV_DEBUG
+ if (!fts_opt_start_shutdown) {
+ mutex_enter(&fts_optimize_wq->mutex);
+ ut_ad(!table->fts->in_queue);
+ mutex_exit(&fts_optimize_wq->mutex);
+ }
+#endif /* UNIV_DEBUG */
+}
+
+/** Send sync fts cache for the table.
+@param[in] table table to sync */
+void
+fts_optimize_request_sync_table(
+ dict_table_t* table)
+{
+ /* if the optimize system not yet initialized, return */
+ if (!fts_optimize_wq) {
+ return;
+ }
+
+ /* FTS optimizer thread is already exited */
+ if (fts_opt_start_shutdown) {
+ ib::info() << "Try to sync table " << table->name
+ << " after FTS optimize thread exiting.";
+ return;
+ }
+
+ mutex_enter(&fts_optimize_wq->mutex);
+
+ if (table->fts->sync_message) {
+ /* If the table already has SYNC message in
+ fts_optimize_wq queue then ignore it */
+ mutex_exit(&fts_optimize_wq->mutex);
+ return;
+ }
+
+ fts_msg_t* msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);
+
+ add_msg(msg, true);
+
+ table->fts->sync_message = true;
+
+ mutex_exit(&fts_optimize_wq->mutex);
+}
+
+/** Add a table to fts_slots if it doesn't already exist. */
+static bool fts_optimize_new_table(dict_table_t* table)
+{
+ ut_ad(table);
+
+ ulint i;
+ fts_slot_t* slot;
+ fts_slot_t* empty = NULL;
+
+ /* Search for duplicates, also find a free slot if one exists. */
+ for (i = 0; i < ib_vector_size(fts_slots); ++i) {
+
+ slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
+
+ if (!slot->table) {
+ empty = slot;
+ } else if (slot->table == table) {
+ /* Already exists in our optimize queue. */
+ return false;
+ }
+ }
+
+ slot = empty ? empty : static_cast<fts_slot_t*>(
+ ib_vector_push(fts_slots, NULL));
+
+ memset(slot, 0x0, sizeof(*slot));
+
+ slot->table = table;
+ return true;
+}
+
+/** Remove a table from fts_slots if it exists.
+@param[in,out] table table to be removed from fts_slots */
+static bool fts_optimize_del_table(const dict_table_t* table)
+{
+ ut_ad(table);
+ for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
+ fts_slot_t* slot;
+
+ slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
+
+ if (slot->table == table) {
+ if (UNIV_UNLIKELY(fts_enable_diag_print)) {
+ ib::info() << "FTS Optimize Removing table "
+ << table->name;
+ }
+
+ mutex_enter(&fts_optimize_wq->mutex);
+ slot->table->fts->in_queue = false;
+ mutex_exit(&fts_optimize_wq->mutex);
+ slot->table = NULL;
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/**********************************************************************//**
+Calculate how many tables in fts_slots need to be optimized.
+@return no. of tables to optimize */
+static ulint fts_optimize_how_many()
+{
+ ulint n_tables = 0;
+ const time_t current_time = time(NULL);
+
+ for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
+ const fts_slot_t* slot = static_cast<const fts_slot_t*>(
+ ib_vector_get_const(fts_slots, i));
+ if (!slot->table) {
+ continue;
+ }
+
+ const time_t end = slot->running
+ ? slot->last_run : slot->completed;
+ ulint interval = ulint(current_time - end);
+
+ if (lint(interval) < 0
+ || interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS) {
+ ++n_tables;
+ }
+ }
+
+ return(n_tables);
+}
+
+/**********************************************************************//**
+Check if the total memory used by all FTS table exceeds the maximum limit.
+@return true if a sync is needed, false otherwise */
+static bool fts_is_sync_needed()
+{
+ ulint total_memory = 0;
+ const time_t now = time(NULL);
+ double time_diff = difftime(now, last_check_sync_time);
+
+ if (fts_need_sync || (time_diff >= 0 && time_diff < 5)) {
+ return(false);
+ }
+
+ last_check_sync_time = now;
+
+ for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
+ const fts_slot_t* slot = static_cast<const fts_slot_t*>(
+ ib_vector_get_const(fts_slots, i));
+
+ if (!slot->table) {
+ continue;
+ }
+
+ if (slot->table->fts && slot->table->fts->cache) {
+ total_memory += slot->table->fts->cache->total_size;
+ }
+
+ if (total_memory > fts_max_total_cache_size) {
+ return(true);
+ }
+ }
+
+ return(false);
+}
+
+/** Sync fts cache of a table
+@param[in,out] table table to be synced
+@param[in] process_message processing messages from fts_optimize_wq */
+static void fts_optimize_sync_table(dict_table_t *table,
+ bool process_message= false)
+{
+ MDL_ticket* mdl_ticket= nullptr;
+ dict_table_t *sync_table= dict_acquire_mdl_shared<true>(table, fts_opt_thd,
+ &mdl_ticket);
+
+ if (!sync_table)
+ return;
+
+ if (sync_table->fts && sync_table->fts->cache && sync_table->is_accessible())
+ {
+ fts_sync_table(sync_table, false);
+ if (process_message)
+ {
+ mutex_enter(&fts_optimize_wq->mutex);
+ sync_table->fts->sync_message = false;
+ mutex_exit(&fts_optimize_wq->mutex);
+ }
+ }
+
+ DBUG_EXECUTE_IF("ib_optimize_wq_hang", os_thread_sleep(6000000););
+
+ if (mdl_ticket)
+ dict_table_close(sync_table, false, false, fts_opt_thd, mdl_ticket);
+}
+
+/**********************************************************************//**
+Optimize all FTS tables.
+@return Dummy return */
+static void fts_optimize_callback(void *)
+{
+ ut_ad(!srv_read_only_mode);
+
+ if (!fts_optimize_wq) {
+ /* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/
+ return;
+ }
+
+ static ulint current = 0;
+ static ibool done = FALSE;
+ static ulint n_tables = ib_vector_size(fts_slots);
+ static ulint n_optimize = 0;
+
+ while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) {
+ /* If there is no message in the queue and we have tables
+ to optimize then optimize the tables. */
+
+ if (!done
+ && ib_wqueue_is_empty(fts_optimize_wq)
+ && n_tables > 0
+ && n_optimize > 0) {
+ fts_slot_t* slot = static_cast<fts_slot_t*>(
+ ib_vector_get(fts_slots, current));
+
+ /* Handle the case of empty slots. */
+ if (slot->table) {
+ slot->running = true;
+ fts_optimize_table_bk(slot);
+ }
+
+ /* Wrap around the counter. */
+ if (++current >= ib_vector_size(fts_slots)) {
+ n_optimize = fts_optimize_how_many();
+ current = 0;
+ }
+
+ } else if (n_optimize == 0
+ || !ib_wqueue_is_empty(fts_optimize_wq)) {
+ fts_msg_t* msg = static_cast<fts_msg_t*>
+ (ib_wqueue_nowait(fts_optimize_wq));
+ /* Timeout ? */
+ if (msg == NULL) {
+ if (fts_is_sync_needed()) {
+ fts_need_sync = true;
+ }
+ if (n_tables)
+ timer->set_time(5000, 0);
+ return;
+ }
+
+ switch (msg->type) {
+ case FTS_MSG_STOP:
+ done = TRUE;
+ break;
+
+ case FTS_MSG_ADD_TABLE:
+ ut_a(!done);
+ if (fts_optimize_new_table(
+ static_cast<dict_table_t*>(
+ msg->ptr))) {
+ ++n_tables;
+ }
+ break;
+
+ case FTS_MSG_DEL_TABLE:
+ if (fts_optimize_del_table(
+ static_cast<fts_msg_del_t*>(
+ msg->ptr)->table)) {
+ --n_tables;
+ }
+
+ /* Signal the producer that we have
+ removed the table. */
+ os_event_set(
+ ((fts_msg_del_t*) msg->ptr)->event);
+ break;
+
+ case FTS_MSG_SYNC_TABLE:
+ DBUG_EXECUTE_IF(
+ "fts_instrument_msg_sync_sleep",
+ os_thread_sleep(300000););
+
+ fts_optimize_sync_table(
+ static_cast<dict_table_t*>(msg->ptr),
+ true);
+ break;
+
+ default:
+ ut_error;
+ }
+
+ mem_heap_free(msg->heap);
+ n_optimize = done ? 0 : fts_optimize_how_many();
+ }
+ }
+
+ /* Server is being shutdown, sync the data from FTS cache to disk
+ if needed */
+ if (n_tables > 0) {
+ for (ulint i = 0; i < ib_vector_size(fts_slots); i++) {
+ fts_slot_t* slot = static_cast<fts_slot_t*>(
+ ib_vector_get(fts_slots, i));
+
+ if (slot->table) {
+ fts_optimize_sync_table(slot->table);
+ }
+ }
+ }
+
+ ib_vector_free(fts_slots);
+ fts_slots = NULL;
+
+ ib_wqueue_free(fts_optimize_wq);
+ fts_optimize_wq = NULL;
+
+ innobase_destroy_background_thd(fts_opt_thd);
+ ib::info() << "FTS optimize thread exiting.";
+
+ os_event_set(fts_opt_shutdown_event);
+}
+
+/**********************************************************************//**
+Startup the optimize thread and create the work queue. */
+void
+fts_optimize_init(void)
+/*===================*/
+{
+ mem_heap_t* heap;
+ ib_alloc_t* heap_alloc;
+
+ ut_ad(!srv_read_only_mode);
+
+ /* For now we only support one optimize thread. */
+ ut_a(!fts_optimize_wq);
+
+ /* Create FTS optimize work queue */
+ fts_optimize_wq = ib_wqueue_create();
+ ut_a(fts_optimize_wq != NULL);
+ timer = srv_thread_pool->create_timer(timer_callback);
+
+ /* Create FTS vector to store fts_slot_t */
+ heap = mem_heap_create(sizeof(dict_table_t*) * 64);
+ heap_alloc = ib_heap_allocator_create(heap);
+ fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
+
+ fts_opt_thd = innobase_create_background_thd("InnoDB FTS optimizer");
+ /* Add fts tables to fts_slots which could be skipped
+ during dict_load_table_one() because fts_optimize_thread
+ wasn't even started. */
+ mutex_enter(&dict_sys.mutex);
+ for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys.table_LRU);
+ table != NULL;
+ table = UT_LIST_GET_NEXT(table_LRU, table)) {
+ if (!table->fts || !dict_table_has_fts_index(table)) {
+ continue;
+ }
+
+ /* fts_optimize_thread is not started yet. So there is no
+ need to acquire fts_optimize_wq->mutex for adding the fts
+ table to the fts slots. */
+ ut_ad(!table->can_be_evicted);
+ fts_optimize_new_table(table);
+ table->fts->in_queue = true;
+ }
+ mutex_exit(&dict_sys.mutex);
+
+ fts_opt_shutdown_event = os_event_create(0);
+ last_check_sync_time = time(NULL);
+}
+
+/** Shutdown fts optimize thread. */
+void
+fts_optimize_shutdown()
+{
+ ut_ad(!srv_read_only_mode);
+
+ fts_msg_t* msg;
+
+ /* If there is an ongoing activity on dictionary, such as
+ srv_master_evict_from_table_cache(), wait for it */
+ dict_mutex_enter_for_mysql();
+
+ /* Tells FTS optimizer system that we are exiting from
+ optimizer thread, message send their after will not be
+ processed */
+ fts_opt_start_shutdown = true;
+ dict_mutex_exit_for_mysql();
+
+ /* We tell the OPTIMIZE thread to switch to state done, we
+ can't delete the work queue here because the add thread needs
+ deregister the FTS tables. */
+ timer->disarm();
+ task_group.cancel_pending(&task);
+
+ msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
+
+ add_msg(msg);
+
+ os_event_wait(fts_opt_shutdown_event);
+
+ os_event_destroy(fts_opt_shutdown_event);
+ fts_opt_thd = NULL;
+ delete timer;
+ timer = NULL;
+}
+
+/** Sync the table during commit phase
+@param[in] table table to be synced */
+void fts_sync_during_ddl(dict_table_t* table)
+{
+ mutex_enter(&fts_optimize_wq->mutex);
+ if (!table->fts->sync_message)
+ {
+ mutex_exit(&fts_optimize_wq->mutex);
+ return;
+ }
+
+ mutex_exit(&fts_optimize_wq->mutex);
+ fts_sync_table(table, false);
+
+ mutex_enter(&fts_optimize_wq->mutex);
+ table->fts->sync_message = false;
+ mutex_exit(&fts_optimize_wq->mutex);
+}