diff options
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 4062 |
1 files changed, 4062 insertions, 0 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc new file mode 100644 index 00000000..11629059 --- /dev/null +++ b/sql/rpl_gtid.cc @@ -0,0 +1,4062 @@ +/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab. + Copyright (c) 2020, 2022, MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */ + + +/* Definitions for MariaDB global transaction ID (GTID). */ +#include <type_traits> + +#ifndef MYSQL_CLIENT +#include "mariadb.h" +#include "sql_priv.h" +#include "unireg.h" +#include "mariadb.h" +#include "sql_base.h" +#include "sql_parse.h" +#include "key.h" +#include "rpl_rli.h" +#include "slave.h" +#include "log_event.h" +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" // wsrep_thd_is_local +#include "wsrep_trans_observer.h" // wsrep_start_trx_if_not_started +#endif + +const LEX_CSTRING rpl_gtid_slave_state_table_name= + { STRING_WITH_LEN("gtid_slave_pos") }; + + +void +rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton, + rpl_group_info *rgi) +{ + int err; + /* + Add the gtid to the HASH in the replication slave state. + + We must do this only _after_ commit, so that for parallel replication, + there will not be an attempt to delete the corresponding table row before + it is even committed. + */ + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi); + if (err) + { + sql_print_warning("Slave: Out of memory during slave state maintenance. " + "Some no longer necessary rows in table " + "mysql.%s may be left undeleted.", + rpl_gtid_slave_state_table_name.str); + /* + Such failure is not fatal. We will fail to delete the row for this + GTID, but it will do no harm and will be removed automatically on next + server restart. + */ + } +} + + +int +rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) +{ + DBUG_ENTER("rpl_slave_state::record_and_update_gtid"); + + /* + Update the GTID position, if we have it and did not already update + it in a GTID transaction. + */ + if (rgi->gtid_pending) + { + uint64 sub_id= rgi->gtid_sub_id; + void *hton= NULL; + + rgi->gtid_pending= false; + if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) + { + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton)) + DBUG_RETURN(1); + update_state_hash(sub_id, &rgi->current_gtid, hton, rgi); + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; + } + DBUG_RETURN(0); +} + + +/* + Check GTID event execution when --gtid-ignore-duplicates. + + The idea with --gtid-ignore-duplicates is that we allow multiple master + connections (in multi-source replication) to all receive the same GTIDs and + event groups. Only one instance of each is applied; we use the sequence + number in the GTID to decide whether a GTID has already been applied. + + So if the seq_no of a GTID (or a higher sequence number) has already been + applied, then the event should be skipped. If not then the event should be + applied. + + To avoid two master connections tring to apply the same event + simultaneously, only one is allowed to work in any given domain at any point + in time. The associated Relay_log_info object is called the owner of the + domain (and there can be multiple parallel worker threads working in that + domain for that Relay_log_info). Any other Relay_log_info/master connection + must wait for the domain to become free, or for their GTID to have been + applied, before being allowed to proceed. + + Returns: + 0 This GTID is already applied, it should be skipped. + 1 The GTID is not yet applied; this rli is now the owner, and must apply + the event and release the domain afterwards. + -1 Error (out of memory to allocate a new element for the domain). +*/ +int +rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi) +{ + uint32 domain_id= gtid->domain_id; + uint64 seq_no= gtid->seq_no; + rpl_slave_state::element *elem; + int res; + bool did_enter_cond= false; + PSI_stage_info old_stage; + THD *UNINIT_VAR(thd); + Relay_log_info *rli= rgi->rli; + + mysql_mutex_lock(&LOCK_slave_state); + if (!(elem= get_element(domain_id))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + res= -1; + goto err; + } + /* + Note that the elem pointer does not change once inserted in the hash. So + we can re-use the pointer without looking it up again in the hash after + each lock release and re-take. + */ + + for (;;) + { + if (elem->highest_seq_no >= seq_no) + { + /* This sequence number is already applied, ignore it. */ + res= 0; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE; + break; + } + if (!elem->owner_rli) + { + /* The domain became free, grab it and apply the event. */ + elem->owner_rli= rli; + elem->owner_count= 1; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER; + res= 1; + break; + } + if (elem->owner_rli == rli) + { + /* Already own this domain, increment reference count and apply event. */ + ++elem->owner_count; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER; + res= 1; + break; + } + thd= rgi->thd; + if (unlikely(thd->check_killed())) + { + res= -1; + break; + } + /* + Someone else is currently processing this GTID (or an earlier one). + Wait for them to complete (or fail), and then check again. + */ + if (!did_enter_cond) + { + thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state, + &stage_gtid_wait_other_connection, &old_stage); + did_enter_cond= true; + } + mysql_cond_wait(&elem->COND_gtid_ignore_duplicates, + &LOCK_slave_state); + } + +err: + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&LOCK_slave_state); + return res; +} + + +void +rpl_slave_state::release_domain_owner(rpl_group_info *rgi) +{ + element *elem= NULL; + + mysql_mutex_lock(&LOCK_slave_state); + if (!(elem= get_element(rgi->current_gtid.domain_id))) + { + /* + We cannot really deal with error here, as we are already called in an + error handling case (transaction failure and rollback). + + However, get_element() only fails if the element did not exist already + and could not be allocated due to out-of-memory - and if it did not + exist, then we would not get here in the first place. + */ + mysql_mutex_unlock(&LOCK_slave_state); + return; + } + + if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER) + { + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rgi->rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; + mysql_mutex_unlock(&LOCK_slave_state); +} + + +static void +rpl_slave_state_free_element(void *arg) +{ + struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg; + mysql_cond_destroy(&elem->COND_wait_gtid); + mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates); + my_free(elem); +} + + +rpl_slave_state::rpl_slave_state() + : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false) +{ + mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, + MY_MUTEX_INIT_SLOW); + my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, + offsetof(element, domain_id), sizeof(element::domain_id), + NULL, rpl_slave_state_free_element, HASH_UNIQUE); + my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid), + 8, 8, MYF(0)); +} + + +rpl_slave_state::~rpl_slave_state() +{ + free_gtid_pos_tables(gtid_pos_tables.load(std::memory_order_relaxed)); + truncate_hash(); + my_hash_free(&hash); + delete_dynamic(>id_sort_array); + mysql_mutex_destroy(&LOCK_slave_state); +} + + +void +rpl_slave_state::truncate_hash() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + list_element *l= e->list; + list_element *next; + while (l) + { + next= l->next; + my_free(l); + l= next; + } + /* The element itself is freed by the hash element free function. */ + } + my_hash_reset(&hash); +} + + +int +rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, + uint64 seq_no, void *hton, rpl_group_info *rgi) +{ + int res; + mysql_mutex_lock(&LOCK_slave_state); + res= update_nolock(domain_id, server_id, sub_id, seq_no, hton, rgi); + mysql_mutex_unlock(&LOCK_slave_state); + return res; +} + + +int +rpl_slave_state::update_nolock(uint32 domain_id, uint32 server_id, uint64 sub_id, + uint64 seq_no, void *hton, rpl_group_info *rgi) +{ + element *elem= NULL; + list_element *list_elem= NULL; + + DBUG_ASSERT(hton || !loaded); + mysql_mutex_assert_owner(&LOCK_slave_state); + if (!(elem= get_element(domain_id))) + return 1; + + if (seq_no > elem->highest_seq_no) + elem->highest_seq_no= seq_no; + if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no) + { + /* + Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear. + Signal (and remove) them. The waiter will handle all the processing + of all pending MASTER_GTID_WAIT(), so we do not slow down the + replication SQL thread. + */ + elem->gtid_waiter= NULL; + mysql_cond_broadcast(&elem->COND_wait_gtid); + } + + if (rgi) + { + if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER) + { +#ifdef DBUG_ASSERT_EXISTS + Relay_log_info *rli= rgi->rli; +#endif + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; + } + + if (!(list_elem= (list_element *)my_malloc(PSI_INSTRUMENT_ME, + sizeof(*list_elem), MYF(MY_WME)))) + return 1; + list_elem->domain_id= domain_id; + list_elem->server_id= server_id; + list_elem->sub_id= sub_id; + list_elem->seq_no= seq_no; + list_elem->hton= hton; + + elem->add(list_elem); + if (last_sub_id < sub_id) + last_sub_id= sub_id; + +#ifdef HAVE_REPLICATION + ++pending_gtid_count; + if (pending_gtid_count >= opt_gtid_cleanup_batch_size) + { + pending_gtid_count = 0; + slave_background_gtid_pending_delete_request(); + } +#endif + + return 0; +} + + +struct rpl_slave_state::element * +rpl_slave_state::get_element(uint32 domain_id) +{ + struct element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, + sizeof(domain_id)); + if (elem) + return elem; + + if (!(elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), MYF(MY_WME)))) + return NULL; + elem->list= NULL; + elem->domain_id= domain_id; + elem->highest_seq_no= 0; + elem->gtid_waiter= NULL; + elem->owner_rli= NULL; + elem->owner_count= 0; + mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0); + mysql_cond_init(key_COND_gtid_ignore_duplicates, + &elem->COND_gtid_ignore_duplicates, 0); + if (my_hash_insert(&hash, (uchar *)elem)) + { + my_free(elem); + return NULL; + } + return elem; +} + + +int +rpl_slave_state::put_back_list(list_element *list) +{ + element *e= NULL; + int err= 0; + + mysql_mutex_lock(&LOCK_slave_state); + while (list) + { + list_element *next= list->next; + + if ((!e || e->domain_id != list->domain_id) && + !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id, + sizeof(list->domain_id)))) + { + err= 1; + goto end; + } + e->add(list); + list= next; + } + +end: + mysql_mutex_unlock(&LOCK_slave_state); + return err; +} + + +int +rpl_slave_state::truncate_state_table(THD *thd) +{ + TABLE_LIST tlist; + int err= 0; + + tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, + NULL, TL_WRITE); + tlist.mdl_request.set_type(MDL_EXCLUSIVE); + if (!(err= open_and_lock_tables(thd, &tlist, FALSE, + MYSQL_OPEN_IGNORE_LOGGING_FORMAT))) + { + DBUG_ASSERT(!tlist.table->file->row_logging); + tlist.table->s->tdc->flush(thd, true); + err= tlist.table->file->ha_truncate(); + + if (err) + { + ha_rollback_trans(thd, FALSE); + close_thread_tables(thd); + ha_rollback_trans(thd, TRUE); + } + else + { + ha_commit_trans(thd, FALSE); + close_thread_tables(thd); + ha_commit_trans(thd, TRUE); + } + thd->release_transactional_locks(); + } + return err; +} + + +static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= { + { { STRING_WITH_LEN("domain_id") }, + { STRING_WITH_LEN("int(10) unsigned") }, + {NULL, 0} }, + { { STRING_WITH_LEN("sub_id") }, + { STRING_WITH_LEN("bigint(20) unsigned") }, + {NULL, 0} }, + { { STRING_WITH_LEN("server_id") }, + { STRING_WITH_LEN("int(10) unsigned") }, + {NULL, 0} }, + { { STRING_WITH_LEN("seq_no") }, + { STRING_WITH_LEN("bigint(20) unsigned") }, + {NULL, 0} }, +}; + +static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1}; + +static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= { + array_elements(mysql_rpl_slave_state_coltypes), + mysql_rpl_slave_state_coltypes, + array_elements(mysql_rpl_slave_state_pk_parts), + mysql_rpl_slave_state_pk_parts +}; + +static Table_check_intact_log_error gtid_table_intact; + +/* + Check that the mysql.gtid_slave_pos table has the correct definition. +*/ +int +gtid_check_rpl_slave_state_table(TABLE *table) +{ + int err; + + if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef))) + my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql", + rpl_gtid_slave_state_table_name.str); + return err; +} + + +/* + Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine + that is already in use by the current transaction, if any. +*/ +void +rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) +{ + /* + See comments on rpl_slave_state::gtid_pos_tables for rules around proper + access to the list. + */ + auto list= gtid_pos_tables.load(std::memory_order_acquire); + + Ha_trx_info *ha_info; + uint count = 0; + for (ha_info= thd->transaction->all.ha_list; ha_info; ha_info= ha_info->next()) + { + void *trx_hton= ha_info->ht(); + auto table_entry= list; + + if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton) + continue; + while (table_entry) + { + if (table_entry->table_hton == trx_hton) + { + if (likely(table_entry->state == GTID_POS_AVAILABLE)) + { + *out_tablename= table_entry->table_name; + /* + Check if this is a cross-engine transaction, so we can correctly + maintain the rpl_transactions_multi_engine status variable. + */ + if (count >= 1) + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + else + { + for (;;) + { + ha_info= ha_info->next(); + if (!ha_info) + break; + if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) + { + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + break; + } + } + } + return; + } + /* + This engine is marked to automatically create the table. + We cannot easily do this here (possibly in the middle of a + transaction). But we can request the slave background thread + to create it, and in a short while it should become available + for following transactions. + */ +#ifdef HAVE_REPLICATION + slave_background_gtid_pos_create_request(table_entry); +#endif + break; + } + table_entry= table_entry->next; + } + ++count; + } + /* + If we cannot find any table whose engine matches an engine that is + already active in the transaction, or if there is no current transaction + engines available, we return the default gtid_slave_pos table. + */ + *out_tablename= + default_gtid_pos_table.load(std::memory_order_acquire)->table_name; + /* Record in status that we failed to find a suitable gtid_pos table. */ + if (count > 0) + { + statistic_increment(transactions_gtid_foreign_engine, LOCK_status); + if (count > 1) + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + } +} + + +/* + Write a gtid to the replication slave state table. + + Do it as part of the transaction, to get slave crash safety, or as a separate + transaction if !in_transaction (eg. MyISAM or DDL). + + gtid The global transaction id for this event group. + sub_id Value allocated within the sub_id when the event group was + read (sub_id must be consistent with commit order in master binlog). + + Note that caller must later ensure that the new gtid and sub_id is inserted + into the appropriate HASH element with rpl_slave_state.add(), so that it can + be deleted later. But this must only be done after COMMIT if in transaction. +*/ +int +rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, + bool in_transaction, bool in_statement, + void **out_hton) +{ + TABLE_LIST tlist; + int err= 0, not_sql_thread; + bool table_opened= false; + TABLE *table; + ulonglong thd_saved_option= thd->variables.option_bits; + Query_tables_list lex_backup; + wait_for_commit* suspended_wfc; + void *hton= NULL; + LEX_CSTRING gtid_pos_table_name; + TABLE *tbl= nullptr; + MDL_savepoint m_start_of_statement_svp(thd->mdl_context.mdl_savepoint()); + DBUG_ENTER("record_gtid"); + + *out_hton= NULL; + if (unlikely(!loaded)) + { + /* + Probably the mysql.gtid_slave_pos table is missing (eg. upgrade) or + corrupt. + + We already complained loudly about this, but we can try to continue + until the DBA fixes it. + */ + DBUG_RETURN(0); + } + + if (!in_statement) + thd->reset_for_next_command(); + + if (thd->rgi_slave && (thd->rgi_slave->gtid_ev_flags_extra & + Gtid_log_event::FL_START_ALTER_E1)) + { + /* + store the open table table list in ptr, so that is close_thread_tables + is called start alter tables are not closed + */ + mysql_mutex_lock(&thd->LOCK_thd_data); + tbl= thd->open_tables; + thd->open_tables= nullptr; + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + /* + Only the SQL thread can call select_gtid_pos_table without a mutex + Other threads needs to use a mutex and take into account that the + result may change during execution, so we have to make a copy. + */ + + if ((not_sql_thread= (thd->system_thread != SYSTEM_THREAD_SLAVE_SQL))) + mysql_mutex_lock(&LOCK_slave_state); + select_gtid_pos_table(thd, >id_pos_table_name); + if (not_sql_thread) + { + LEX_CSTRING *tmp= thd->make_clex_string(gtid_pos_table_name.str, + gtid_pos_table_name.length); + mysql_mutex_unlock(&LOCK_slave_state); + if (!tmp) + DBUG_RETURN(1); + gtid_pos_table_name= *tmp; + } + + DBUG_EXECUTE_IF("gtid_inject_record_gtid", + { + my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0)); + DBUG_RETURN(1); + } ); + + /* + If we are applying a non-transactional event group, we will be committing + here a transaction, but that does not imply that the event group has + completed or has been binlogged. So we should not trigger + wakeup_subsequent_commits() here. + + Note: An alternative here could be to put a call to mark_start_commit() in + stmt_done() before the call to record_and_update_gtid(). This would + prevent later calling mark_start_commit() after we have run + wakeup_subsequent_commits() from committing the GTID update transaction + (which must be avoided to avoid accessing freed group_commit_orderer + object). It would also allow following event groups to start slightly + earlier. And in the cases where record_gtid() is called without an active + transaction, the current statement should have been binlogged already, so + binlog order is preserved. + + But this is rather subtle, and potentially fragile. And it does not really + seem worth it; non-transactional loads are unlikely to benefit much from + parallel replication in any case. So for now, we go with the simple + suspend/resume of wakeup_subsequent_commits() here in record_gtid(). + */ + suspended_wfc= thd->suspend_subsequent_commits(); + thd->lex->reset_n_backup_query_tables_list(&lex_backup); + tlist.init_one_table(&MYSQL_SCHEMA_NAME, >id_pos_table_name, NULL, TL_WRITE); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + hton= table->s->db_type(); + table->file->row_logging= 0; // No binary logging + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + +#ifdef WITH_WSREP + /* + We should replicate local gtid_slave_pos updates to other nodes. + In applier we should not append them to galera writeset. + */ + if (WSREP_ON_ && wsrep_thd_is_local(thd)) + { + thd->wsrep_ignore_table= false; + wsrep_start_trx_if_not_started(thd); + } + else + { + thd->wsrep_ignore_table= true; + } +#endif + + if (!in_transaction) + { + DBUG_PRINT("info", ("resetting OPTION_BEGIN")); + thd->variables.option_bits&= + ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG | + OPTION_GTID_BEGIN); + } + else + thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG; + + bitmap_set_all(table->write_set); + table->rpl_write_set= table->write_set; + + table->field[0]->store((ulonglong)gtid->domain_id, true); + table->field[1]->store(sub_id, true); + table->field[2]->store((ulonglong)gtid->server_id, true); + table->field[3]->store(gtid->seq_no, true); + DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE();); + if ((err= table->file->ha_write_row(table->record[0]))) + { + table->file->print_error(err, MYF(0)); + goto end; + } + *out_hton= hton; + + if(opt_bin_log && + (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id, + gtid->seq_no))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } +end: + +#ifdef WITH_WSREP + thd->wsrep_ignore_table= false; +#endif + + if (table_opened) + { + if (err || (err= ha_commit_trans(thd, FALSE))) + ha_rollback_trans(thd, FALSE); + close_thread_tables(thd); + if (!thd->rgi_slave || !(thd->rgi_slave->gtid_ev_flags_extra & + Gtid_log_event::FL_START_ALTER_E1)) + { + if (in_transaction) + thd->mdl_context.release_statement_locks(); + else + thd->release_transactional_locks(); + } + } + if (thd->rgi_slave && + thd->rgi_slave->gtid_ev_flags_extra & Gtid_log_event::FL_START_ALTER_E1) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->open_tables= tbl; + mysql_mutex_unlock(&thd->LOCK_thd_data); + thd->mdl_context.rollback_to_savepoint(m_start_of_statement_svp); + } + thd->lex->restore_backup_query_tables_list(&lex_backup); + thd->variables.option_bits= thd_saved_option; + thd->resume_subsequent_commits(suspended_wfc); + DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep", + { + if (gtid->server_id == 100) + my_sleep(500000); + }); + DBUG_RETURN(err); +} + + +/* + Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are + no longer needed and can be deleted from the table. + + Within each domain, we need to keep around the latest GTID (the one with the + highest sub_id), but any others in that domain can be deleted. +*/ +rpl_slave_state::list_element * +rpl_slave_state::gtid_grab_pending_delete_list() +{ + uint32 i; + list_element *full_list; + + mysql_mutex_lock(&LOCK_slave_state); + full_list= NULL; + for (i= 0; i < hash.records; ++i) + { + element *elem= (element *)my_hash_element(&hash, i); + list_element *elist= elem->list; + list_element *last_elem, **best_ptr_ptr, *cur, *next; + uint64 best_sub_id; + + if (!elist) + continue; /* Nothing here */ + + /* Delete any old stuff, but keep around the most recent one. */ + cur= elist; + best_sub_id= cur->sub_id; + best_ptr_ptr= &elist; + last_elem= cur; + while ((next= cur->next)) { + last_elem= next; + if (next->sub_id > best_sub_id) + { + best_sub_id= next->sub_id; + best_ptr_ptr= &cur->next; + } + cur= next; + } + /* + Append the new elements to the full list. Note the order is important; + we do it here so that we do not break the list if best_sub_id is the + last of the new elements. + */ + last_elem->next= full_list; + /* + Delete the highest sub_id element from the old list, and put it back as + the single-element new list. + */ + cur= *best_ptr_ptr; + *best_ptr_ptr= cur->next; + cur->next= NULL; + elem->list= cur; + + /* + Collect the full list so far here. Note that elist may have moved if we + deleted the first element, so order is again important. + */ + full_list= elist; + } + mysql_mutex_unlock(&LOCK_slave_state); + + return full_list; +} + + +/* Find the mysql.gtid_slave_posXXX table associated with a given hton. */ +LEX_CSTRING * +rpl_slave_state::select_gtid_pos_table(void *hton) +{ + /* + See comments on rpl_slave_state::gtid_pos_tables for rules around proper + access to the list. + */ + auto table_entry= gtid_pos_tables.load(std::memory_order_acquire); + + while (table_entry) + { + if (table_entry->table_hton == hton) + { + if (likely(table_entry->state == GTID_POS_AVAILABLE)) + return &table_entry->table_name; + } + table_entry= table_entry->next; + } + + return &default_gtid_pos_table.load(std::memory_order_acquire)->table_name; +} + + +void +rpl_slave_state::gtid_delete_pending(THD *thd, + rpl_slave_state::list_element **list_ptr) +{ + int err= 0; + ulonglong thd_saved_option; + + if (unlikely(!loaded)) + return; + +#ifdef WITH_WSREP + /* + We should replicate local gtid_slave_pos updates to other nodes. + In applier we should not append them to galera writeset. + */ + if (WSREP_ON_ && wsrep_thd_is_local(thd) && + thd->wsrep_cs().state() != wsrep::client_state::s_none) + { + if (thd->wsrep_trx().active() == false) + { + if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID) + thd->set_query_id(next_query_id()); + wsrep_start_transaction(thd, thd->wsrep_next_trx_id()); + } + thd->wsrep_ignore_table= false; + } + thd->wsrep_ignore_table= true; +#endif + + thd_saved_option= thd->variables.option_bits; + thd->variables.option_bits&= + ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG | + OPTION_GTID_BEGIN); + + while (*list_ptr) + { + LEX_CSTRING *gtid_pos_table_name, *tmp_table_name; + Query_tables_list lex_backup; + TABLE_LIST tlist; + TABLE *table; + handler::Table_flags direct_pos= 0; + list_element *cur, **cur_ptr_ptr; + bool table_opened= false; + bool index_inited= false; + void *hton= (*list_ptr)->hton; + + thd->reset_for_next_command(); + + /* + Only the SQL thread can call select_gtid_pos_table without a mutex + Other threads needs to use a mutex and take into account that the + result may change during execution, so we have to make a copy. + */ + mysql_mutex_lock(&LOCK_slave_state); + tmp_table_name= select_gtid_pos_table(hton); + gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str, + tmp_table_name->length); + mysql_mutex_unlock(&LOCK_slave_state); + if (!gtid_pos_table_name) + { + /* Out of memory - we can try again later. */ + break; + } + + thd->lex->reset_n_backup_query_tables_list(&lex_backup); + tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + + direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION; + bitmap_set_all(table->write_set); + table->rpl_write_set= table->write_set; + + /* Now delete any already committed GTIDs. */ + bitmap_set_bit(table->read_set, table->field[0]->field_index); + bitmap_set_bit(table->read_set, table->field[1]->field_index); + + if (!direct_pos) + { + if ((err= table->file->ha_index_init(0, 0))) + { + table->file->print_error(err, MYF(0)); + goto end; + } + index_inited= true; + } + + cur = *list_ptr; + cur_ptr_ptr = list_ptr; + do + { + uchar key_buffer[4+8]; + list_element *next= cur->next; + + if (cur->hton == hton) + { + int res; + + table->field[0]->store((ulonglong)cur->domain_id, true); + table->field[1]->store(cur->sub_id, true); + if (direct_pos) + { + res= table->file->ha_rnd_pos_by_record(table->record[0]); + } + else + { + key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); + res= table->file->ha_index_read_map(table->record[0], key_buffer, + HA_WHOLE_KEY, HA_READ_KEY_EXACT); + } + DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete", + { res= 1; + err= ENOENT; + sql_print_error("<DEBUG> Error deleting old GTID row"); + }); + if (res) + /* We cannot find the row, assume it is already deleted. */ + ; + else if ((err= table->file->ha_delete_row(table->record[0]))) + { + sql_print_error("Error deleting old GTID row: %s", + thd->get_stmt_da()->message()); + /* + In case of error, we still discard the element from the list. We do + not want to endlessly error on the same element in case of table + corruption or such. + */ + } + *cur_ptr_ptr= next; + my_free(cur); + } + else + { + /* Leave this one in the list until we get to the table for its hton. */ + cur_ptr_ptr= &cur->next; + } + cur= next; + if (err) + break; + } while (cur); +end: + if (table_opened) + { + DBUG_ASSERT(direct_pos || index_inited || err); + /* + Index may not be initialized if there was a failure during + 'ha_index_init'. Hence check if index initialization is successful and + then invoke ha_index_end(). Ending an index which is not initialized + will lead to assert. + */ + if (index_inited) + table->file->ha_index_end(); + + if (err || (err= ha_commit_trans(thd, FALSE))) + ha_rollback_trans(thd, FALSE); + } + close_thread_tables(thd); + thd->release_transactional_locks(); + thd->lex->restore_backup_query_tables_list(&lex_backup); + + if (err) + break; + } + thd->variables.option_bits= thd_saved_option; + +#ifdef WITH_WSREP + thd->wsrep_ignore_table= false; +#endif +} + + +uint64 +rpl_slave_state::next_sub_id(uint32 domain_id) +{ + uint64 sub_id= 0; + + mysql_mutex_lock(&LOCK_slave_state); + sub_id= ++last_sub_id; + mysql_mutex_unlock(&LOCK_slave_state); + + return sub_id; +} + +/* A callback used in sorting of gtid list based on domain_id. */ +static int rpl_gtid_cmp_cb(const void *id1, const void *id2) +{ + uint32 d1= ((rpl_gtid *)id1)->domain_id; + uint32 d2= ((rpl_gtid *)id2)->domain_id; + + if (d1 < d2) + return -1; + else if (d1 > d2) + return 1; + return 0; +} + +/* Format the specified gtid and store it in the given string buffer. */ +bool +rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first) +{ + if (*first) + *first= false; + else + if (dest->append(',')) + return true; + return + dest->append_ulonglong(gtid->domain_id) || + dest->append('-') || + dest->append_ulonglong(gtid->server_id) || + dest->append('-') || + dest->append_ulonglong(gtid->seq_no); +} + +/* + Sort the given gtid list based on domain_id and store them in the specified + string. +*/ +static bool +rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr, String *str) +{ + bool first= true, res= true; + + sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb); + + for (uint i= 0; i < gtid_dynarr->elements; i ++) + { + rpl_gtid *gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *); + if (rpl_slave_state_tostring_helper(str, gtid, &first)) + goto err; + } + res= false; + +err: + return res; +} + + +/* Sort the given gtid list based on domain_id and call cb for each gtid. */ +static bool +rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr, + int (*cb)(rpl_gtid *, void *), + void *data) +{ + rpl_gtid *gtid; + bool res= true; + + sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb); + + for (uint i= 0; i < gtid_dynarr->elements; i ++) + { + gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *); + if ((*cb)(gtid, data)) + goto err; + } + res= false; + +err: + return res; +} + +int +rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data, + rpl_gtid *extra_gtids, uint32 num_extra, + bool sort) +{ + uint32 i; + HASH gtid_hash; + uchar *rec; + rpl_gtid *gtid; + int res= 1; + bool locked= false; + + my_hash_init(PSI_INSTRUMENT_ME, >id_hash, &my_charset_bin, 32, + offsetof(rpl_gtid, domain_id), sizeof(rpl_gtid::domain_id), + NULL, NULL, HASH_UNIQUE); + for (i= 0; i < num_extra; ++i) + if (extra_gtids[i].server_id == global_system_variables.server_id && + my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i]))) + goto err; + + mysql_mutex_lock(&LOCK_slave_state); + locked= true; + reset_dynamic(>id_sort_array); + + for (i= 0; i < hash.records; ++i) + { + uint64 best_sub_id; + rpl_gtid best_gtid; + element *e= (element *)my_hash_element(&hash, i); + list_element *l= e->list; + + if (!l) + continue; /* Nothing here */ + + best_gtid.domain_id= e->domain_id; + best_gtid.server_id= l->server_id; + best_gtid.seq_no= l->seq_no; + best_sub_id= l->sub_id; + while ((l= l->next)) + { + if (l->sub_id > best_sub_id) + { + best_sub_id= l->sub_id; + best_gtid.server_id= l->server_id; + best_gtid.seq_no= l->seq_no; + } + } + + /* Check if we have something newer in the extra list. */ + rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id, + sizeof(best_gtid.domain_id)); + if (rec) + { + gtid= (rpl_gtid *)rec; + if (gtid->seq_no > best_gtid.seq_no) + memcpy(&best_gtid, gtid, sizeof(best_gtid)); + if (my_hash_delete(>id_hash, rec)) + { + goto err; + } + } + + if ((res= sort ? insert_dynamic(>id_sort_array, + (const void *) &best_gtid) : + (*cb)(&best_gtid, data))) + { + goto err; + } + } + + /* Also add any remaining extra domain_ids. */ + for (i= 0; i < gtid_hash.records; ++i) + { + gtid= (rpl_gtid *)my_hash_element(>id_hash, i); + if ((res= sort ? insert_dynamic(>id_sort_array, (const void *) gtid) : + (*cb)(gtid, data))) + { + goto err; + } + } + + if (sort && rpl_slave_state_tostring_helper(>id_sort_array, cb, data)) + { + goto err; + } + + res= 0; + +err: + if (locked) mysql_mutex_unlock(&LOCK_slave_state); + my_hash_free(>id_hash); + + return res; +} + + +struct rpl_slave_state_tostring_data { + String *dest; + bool first; +}; +static int +rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data) +{ + rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data; + return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first); +} + + +/* + Prepare the current slave state as a string, suitable for sending to the + master to request to receive binlog events starting from that GTID state. + + The state consists of the most recently applied GTID for each domain_id, + ie. the one with the highest sub_id within each domain_id. + + Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when + a server was previously a master and now needs to connect to a new master as + a slave. For each domain_id, if the GTID in the binlog was logged with our + own server_id _and_ has a higher seq_no than what is in the slave state, + then this should be used as the position to start replicating at. This + allows to promote a slave as new master, and connect the old master as a + slave with MASTER_GTID_POS=AUTO. +*/ +int +rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) +{ + struct rpl_slave_state_tostring_data data; + data.first= true; + data.dest= dest; + + return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids, + num_extra, true); +} + + +/* + Lookup a domain_id in the current replication slave state. + + Returns false if the domain_id has no entries in the slave state. + Otherwise returns true, and fills in out_gtid with the corresponding + GTID. +*/ +bool +rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) +{ + element *elem; + list_element *list; + uint64 best_sub_id; + + mysql_mutex_lock(&LOCK_slave_state); + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, + sizeof(domain_id)); + if (!elem || !(list= elem->list)) + { + mysql_mutex_unlock(&LOCK_slave_state); + return false; + } + + out_gtid->domain_id= domain_id; + out_gtid->server_id= list->server_id; + out_gtid->seq_no= list->seq_no; + best_sub_id= list->sub_id; + + while ((list= list->next)) + { + if (best_sub_id > list->sub_id) + continue; + best_sub_id= list->sub_id; + out_gtid->server_id= list->server_id; + out_gtid->seq_no= list->seq_no; + } + + mysql_mutex_unlock(&LOCK_slave_state); + return true; +} + +#endif + +/* + Parse a GTID at the start of a string, and update the pointer to point + at the first character after the parsed GTID. + + Returns 0 on ok, non-zero on parse error. +*/ +static int +gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) +{ + char *q; + const char *p= *ptr; + uint64 v1, v2, v3; + int err= 0; + + q= (char*) end; + v1= (uint64)my_strtoll10(p, &q, &err); + if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-') + return 1; + p= q+1; + q= (char*) end; + v2= (uint64)my_strtoll10(p, &q, &err); + if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-') + return 1; + p= q+1; + q= (char*) end; + v3= (uint64)my_strtoll10(p, &q, &err); + if (err != 0) + return 1; + + out_gtid->domain_id= (uint32) v1; + out_gtid->server_id= (uint32) v2; + out_gtid->seq_no= v3; + *ptr= q; + return 0; +} + +rpl_gtid * +gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) +{ + const char *p= const_cast<char *>(str); + const char *end= p + str_len; + uint32 len= 0, alloc_len= 5; + rpl_gtid *list= NULL; + + for (;;) + { + rpl_gtid gtid; + + if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, >id)) + { + my_free(list); + return NULL; + } + if ((!list || len >= alloc_len) && + !(list= + (rpl_gtid *)my_realloc(PSI_INSTRUMENT_ME, list, + (alloc_len= alloc_len*2) * sizeof(rpl_gtid), + MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR)))) + return NULL; + list[len++]= gtid; + + if (p == end) + break; + if (*p != ',') + { + my_free(list); + return NULL; + } + ++p; + } + *out_len= len; + return list; +} + +#ifndef MYSQL_CLIENT + +/* + Update the slave replication state with the GTID position obtained from + master when connecting with old-style (filename,offset) position. + + If RESET is true then all existing entries are removed. Otherwise only + domain_ids mentioned in the STATE_FROM_MASTER are changed. + + Returns 0 if ok, non-zero if error. +*/ +int +rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len, + bool reset, bool in_statement) +{ + const char *end= state_from_master + len; + + mysql_mutex_assert_not_owner(&LOCK_slave_state); + if (reset) + { + if (truncate_state_table(thd)) + return 1; + truncate_hash(); + } + if (state_from_master == end) + return 0; + for (;;) + { + rpl_gtid gtid; + uint64 sub_id; + void *hton= NULL; + + if (gtid_parser_helper(&state_from_master, end, >id) || + !(sub_id= next_sub_id(gtid.domain_id)) || + record_gtid(thd, >id, sub_id, false, in_statement, &hton) || + update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL)) + return 1; + if (state_from_master == end) + break; + if (*state_from_master != ',') + return 1; + ++state_from_master; + } + return 0; +} + + +bool +rpl_slave_state::is_empty() +{ + uint32 i; + bool result= true; + + mysql_mutex_lock(&LOCK_slave_state); + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (e->list) + { + result= false; + break; + } + } + mysql_mutex_unlock(&LOCK_slave_state); + + return result; +} + + +void +rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list) +{ + struct gtid_pos_table *cur, *next; + + cur= list; + while (cur) + { + next= cur->next; + my_free(cur); + cur= next; + } +} + + +/* + Replace the list of available mysql.gtid_slave_posXXX tables with a new list. + The caller must be holding LOCK_slave_state. Additionally, this function + must only be called while all SQL threads are stopped. +*/ +void +rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list, + rpl_slave_state::gtid_pos_table *default_entry) +{ + mysql_mutex_assert_owner(&LOCK_slave_state); + auto old_list= gtid_pos_tables.load(std::memory_order_relaxed); + gtid_pos_tables.store(new_list, std::memory_order_release); + default_gtid_pos_table.store(default_entry, std::memory_order_release); + free_gtid_pos_tables(old_list); +} + + +void +rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry) +{ + mysql_mutex_assert_owner(&LOCK_slave_state); + entry->next= gtid_pos_tables.load(std::memory_order_relaxed); + gtid_pos_tables.store(entry, std::memory_order_release); +} + + +struct rpl_slave_state::gtid_pos_table * +rpl_slave_state::alloc_gtid_pos_table(LEX_CSTRING *table_name, void *hton, + rpl_slave_state::gtid_pos_table_state state) +{ + struct gtid_pos_table *p; + char *allocated_str; + + if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME), &p, sizeof(*p), + &allocated_str, table_name->length+1, NULL)) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1)); + return NULL; + } + memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0' + p->next = NULL; + p->table_hton= hton; + p->table_name.str= allocated_str; + p->table_name.length= table_name->length; + p->state= state; + return p; +} + + +void rpl_binlog_state::init() +{ + my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, + offsetof(element, domain_id), sizeof(element::domain_id), + NULL, my_free, HASH_UNIQUE); + my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0)); + mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, + MY_MUTEX_INIT_SLOW); + initialized= 1; +} + +void +rpl_binlog_state::reset_nolock() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + my_hash_free(&((element *)my_hash_element(&hash, i))->hash); + my_hash_reset(&hash); +} + + +void +rpl_binlog_state::reset() +{ + mysql_mutex_lock(&LOCK_binlog_state); + reset_nolock(); + mysql_mutex_unlock(&LOCK_binlog_state); +} + + +void rpl_binlog_state::free() +{ + if (initialized) + { + initialized= 0; + reset_nolock(); + my_hash_free(&hash); + delete_dynamic(>id_sort_array); + mysql_mutex_destroy(&LOCK_binlog_state); + } +} + + +bool +rpl_binlog_state::load(struct rpl_gtid *list, uint32 count) +{ + uint32 i; + bool res= false; + + mysql_mutex_lock(&LOCK_binlog_state); + reset_nolock(); + for (i= 0; i < count; ++i) + { + if (update_nolock(&(list[i]), false)) + { + res= true; + break; + } + } + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data) +{ + rpl_binlog_state *self= (rpl_binlog_state *)data; + return self->update_nolock(gtid, false); +} + + +bool +rpl_binlog_state::load(rpl_slave_state *slave_pos) +{ + bool res= false; + + mysql_mutex_lock(&LOCK_binlog_state); + reset_nolock(); + if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0, false)) + res= true; + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +rpl_binlog_state::~rpl_binlog_state() +{ + free(); +} + + +/* + Update replication state with a new GTID. + + If the (domain_id, server_id) pair already exists, then the new GTID replaces + the old one for that domain id. Else a new entry is inserted. + + Returns 0 for ok, 1 for error. +*/ +int +rpl_binlog_state::update_nolock(const struct rpl_gtid *gtid, bool strict) +{ + element *elem; + + if ((elem= (element *)my_hash_search(&hash, + (const uchar *)(>id->domain_id), + sizeof(gtid->domain_id)))) + { + if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no) + { + my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id, + gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id, + elem->last_gtid->server_id, elem->last_gtid->seq_no); + return 1; + } + if (elem->seq_no_counter < gtid->seq_no) + elem->seq_no_counter= gtid->seq_no; + if (!elem->update_element(gtid)) + return 0; + } + else if (!alloc_element_nolock(gtid)) + return 0; + + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; +} + + +int +rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict) +{ + int res; + mysql_mutex_lock(&LOCK_binlog_state); + res= update_nolock(gtid, strict); + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +/* + Fill in a new GTID, allocating next sequence number, and update state + accordingly. +*/ +int +rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id, + rpl_gtid *gtid) +{ + element *elem; + int res= 0; + + gtid->domain_id= domain_id; + gtid->server_id= server_id; + + mysql_mutex_lock(&LOCK_binlog_state); + if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), + sizeof(domain_id)))) + { + gtid->seq_no= ++elem->seq_no_counter; + if (!elem->update_element(gtid)) + goto end; + } + else + { + gtid->seq_no= 1; + if (!alloc_element_nolock(gtid)) + goto end; + } + + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + res= 1; +end: + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +/* Helper functions for update. */ +int +rpl_binlog_state::element::update_element(const rpl_gtid *gtid) +{ + rpl_gtid *lookup_gtid; + + /* + By far the most common case is that successive events within same + replication domain have the same server id (it changes only when + switching to a new master). So save a hash lookup in this case. + */ + if (likely(last_gtid && last_gtid->server_id == gtid->server_id)) + { + last_gtid->seq_no= gtid->seq_no; + return 0; + } + + lookup_gtid= (rpl_gtid *) + my_hash_search(&hash, (const uchar *)>id->server_id, + sizeof(gtid->server_id)); + if (lookup_gtid) + { + lookup_gtid->seq_no= gtid->seq_no; + last_gtid= lookup_gtid; + return 0; + } + + /* Allocate a new GTID and insert it. */ + lookup_gtid= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*lookup_gtid), + MYF(MY_WME)); + if (!lookup_gtid) + return 1; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (my_hash_insert(&hash, (const uchar *)lookup_gtid)) + { + my_free(lookup_gtid); + return 1; + } + last_gtid= lookup_gtid; + return 0; +} + + +int +rpl_binlog_state::alloc_element_nolock(const rpl_gtid *gtid) +{ + element *elem; + rpl_gtid *lookup_gtid; + + /* First time we see this domain_id; allocate a new element. */ + elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), MYF(MY_WME)); + lookup_gtid= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*lookup_gtid), + MYF(MY_WME)); + if (elem && lookup_gtid) + { + elem->domain_id= gtid->domain_id; + my_hash_init(PSI_INSTRUMENT_ME, &elem->hash, &my_charset_bin, 32, + offsetof(rpl_gtid, server_id), sizeof(rpl_gtid::domain_id), + NULL, my_free, HASH_UNIQUE); + elem->last_gtid= lookup_gtid; + elem->seq_no_counter= gtid->seq_no; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) + { + lookup_gtid= NULL; /* Do not free. */ + if (0 == my_hash_insert(&hash, (const uchar *)elem)) + return 0; + } + my_hash_free(&elem->hash); + } + + /* An error. */ + if (elem) + my_free(elem); + if (lookup_gtid) + my_free(lookup_gtid); + return 1; +} + + +/* + Check that a new GTID can be logged without creating an out-of-order + sequence number with existing GTIDs. +*/ +bool +rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no, bool no_error) +{ + element *elem; + bool res= 0; + + mysql_mutex_lock(&LOCK_binlog_state); + if ((elem= (element *)my_hash_search(&hash, + (const uchar *)(&domain_id), + sizeof(domain_id))) && + elem->last_gtid && elem->last_gtid->seq_no >= seq_no) + { + if (!no_error) + my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no, + elem->last_gtid->domain_id, elem->last_gtid->server_id, + elem->last_gtid->seq_no); + res= 1; + } + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +/* + When we see a new GTID that will not be binlogged (eg. slave thread + with --log-slave-updates=0), then we need to remember to allocate any + GTID seq_no of our own within that domain starting from there. + + Returns 0 if ok, non-zero if out-of-memory. +*/ +int +rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no) +{ + element *elem; + int res; + + mysql_mutex_lock(&LOCK_binlog_state); + if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), + sizeof(domain_id)))) + { + if (elem->seq_no_counter < seq_no) + elem->seq_no_counter= seq_no; + res= 0; + goto end; + } + + /* We need to allocate a new, empty element to remember the next seq_no. */ + if (!(elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), + MYF(MY_WME)))) + { + res= 1; + goto end; + } + + elem->domain_id= domain_id; + my_hash_init(PSI_INSTRUMENT_ME, &elem->hash, &my_charset_bin, 32, + offsetof(rpl_gtid, server_id), sizeof(rpl_gtid::server_id), + NULL, my_free, HASH_UNIQUE); + elem->last_gtid= NULL; + elem->seq_no_counter= seq_no; + if (0 == my_hash_insert(&hash, (const uchar *)elem)) + { + res= 0; + goto end; + } + + my_hash_free(&elem->hash); + my_free(elem); + res= 1; + +end: + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +/* + Write binlog state to text file, so we can read it in again without having + to scan last binlog file (normal shutdown/startup, not crash recovery). + + The most recent GTID within each domain_id is written after any other GTID + within this domain. +*/ +int +rpl_binlog_state::write_to_iocache(IO_CACHE *dest) +{ + ulong i, j; + char buf[21]; + int res= 0; + + mysql_mutex_lock(&LOCK_binlog_state); + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records == 0); + continue; + } + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + longlong10_to_str(gtid->seq_no, buf, 10); + if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, + buf)) + { + res= 1; + goto end; + } + } + } + +end: + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +int +rpl_binlog_state::read_from_iocache(IO_CACHE *src) +{ + /* 10-digit - 10-digit - 20-digit \n \0 */ + char buf[10+1+10+1+20+1+1]; + const char *p, *end; + rpl_gtid gtid; + int res= 0; + + mysql_mutex_lock(&LOCK_binlog_state); + reset_nolock(); + for (;;) + { + size_t len= my_b_gets(src, buf, sizeof(buf)); + if (!len) + break; + p= buf; + end= buf + len; + if (gtid_parser_helper(&p, end, >id) || + update_nolock(>id, false)) + { + res= 1; + break; + } + } + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +rpl_gtid * +rpl_binlog_state::find_nolock(uint32 domain_id, uint32 server_id) +{ + element *elem; + if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, + sizeof(domain_id)))) + return NULL; + return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, + sizeof(server_id)); +} + +rpl_gtid * +rpl_binlog_state::find(uint32 domain_id, uint32 server_id) +{ + rpl_gtid *p; + mysql_mutex_lock(&LOCK_binlog_state); + p= find_nolock(domain_id, server_id); + mysql_mutex_unlock(&LOCK_binlog_state); + return p; +} + +rpl_gtid * +rpl_binlog_state::find_most_recent(uint32 domain_id) +{ + element *elem; + rpl_gtid *gtid= NULL; + + mysql_mutex_lock(&LOCK_binlog_state); + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, + sizeof(domain_id)); + if (elem && elem->last_gtid) + gtid= elem->last_gtid; + mysql_mutex_unlock(&LOCK_binlog_state); + + return gtid; +} + + +uint32 +rpl_binlog_state::count() +{ + uint32 c= 0; + uint32 i; + + mysql_mutex_lock(&LOCK_binlog_state); + for (i= 0; i < hash.records; ++i) + c+= ((element *)my_hash_element(&hash, i))->hash.records; + mysql_mutex_unlock(&LOCK_binlog_state); + + return c; +} + + +int +rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) +{ + uint32 i, j, pos; + int res= 0; + + mysql_mutex_lock(&LOCK_binlog_state); + pos= 0; + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records==0); + continue; + } + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + if (pos >= list_size) + { + res= 1; + goto end; + } + memcpy(>id_list[pos++], gtid, sizeof(*gtid)); + } + } + +end: + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + +/* + Get a list of the most recently binlogged GTID, for each domain_id. + + This can be used when switching from being a master to being a slave, + to know where to start replicating from the new master. + + The returned list must be de-allocated with my_free(). + + Returns 0 for ok, non-zero for out-of-memory. +*/ +int +rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) +{ + uint32 i; + uint32 alloc_size, out_size; + int res= 0; + + out_size= 0; + mysql_mutex_lock(&LOCK_binlog_state); + alloc_size= hash.records; + if (!(*list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, + alloc_size * sizeof(rpl_gtid), MYF(MY_WME)))) + { + res= 1; + goto end; + } + for (i= 0; i < alloc_size; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + continue; + memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid)); + } + +end: + mysql_mutex_unlock(&LOCK_binlog_state); + *size= out_size; + return res; +} + +bool +rpl_binlog_state::append_pos(String *str) +{ + uint32 i; + + mysql_mutex_lock(&LOCK_binlog_state); + reset_dynamic(>id_sort_array); + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (e->last_gtid && + insert_dynamic(>id_sort_array, (const void *) e->last_gtid)) + { + mysql_mutex_unlock(&LOCK_binlog_state); + return true; + } + } + rpl_slave_state_tostring_helper(>id_sort_array, str); + mysql_mutex_unlock(&LOCK_binlog_state); + + return false; +} + + +bool +rpl_binlog_state::append_state(String *str) +{ + uint32 i, j; + bool res= false; + + mysql_mutex_lock(&LOCK_binlog_state); + reset_dynamic(>id_sort_array); + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records==0); + continue; + } + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + if (insert_dynamic(>id_sort_array, (const void *) gtid)) + { + res= true; + goto end; + } + } + } + + rpl_slave_state_tostring_helper(>id_sort_array, str); + +end: + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + +/** + Remove domains supplied by the first argument from binlog state. + Removal is done for any domain whose last gtids (from all its servers) match + ones in Gtid list event of the 2nd argument. + + @param ids gtid domain id sequence, may contain dups + @param glev pointer to Gtid list event describing + the match condition + @param errbuf [out] pointer to possible error message array + + @retval NULL as success when at least one domain is removed + @retval "" empty string to indicate ineffective call + when no domains removed + @retval NOT EMPTY string otherwise an error message +*/ +const char* +rpl_binlog_state::drop_domain(DYNAMIC_ARRAY *ids, + Gtid_list_log_event *glev, + char* errbuf) +{ + DYNAMIC_ARRAY domain_unique; // sequece (unsorted) of unique element*:s + rpl_binlog_state::element* domain_unique_buffer[16]; + ulong k, l; + const char* errmsg= NULL; + + DBUG_ENTER("rpl_binlog_state::drop_domain"); + + my_init_dynamic_array2(PSI_INSTRUMENT_ME, &domain_unique, + sizeof(element*), domain_unique_buffer, + sizeof(domain_unique_buffer) / sizeof(element*), 4, 0); + + mysql_mutex_lock(&LOCK_binlog_state); + + /* + Gtid list is supposed to come from a binlog's Gtid_list event and + therefore should be a subset of the current binlog state. That is + for every domain in the list the binlog state contains a gtid with + sequence number not less than that of the list. + Exceptions of this inclusion rule are: + A. the list may still refer to gtids from already deleted domains. + Files containing them must have been purged whereas the file + with the list is not yet. + B. out of order groups were injected + C. manually build list of binlog files violating the inclusion + constraint. + While A is a normal case (not necessarily distinguishable from C though), + B and C may require the user's attention so any (incl the A's suspected) + inconsistency is diagnosed and *warned*. + */ + for (l= 0, errbuf[0]= 0; l < glev->count; l++, errbuf[0]= 0) + { + rpl_gtid* rb_state_gtid= find_nolock(glev->list[l].domain_id, + glev->list[l].server_id); + if (!rb_state_gtid) + sprintf(errbuf, + "missing gtids from the '%u-%u' domain-server pair which is " + "referred to in the gtid list describing an earlier state. Ignore " + "if the domain ('%u') was already explicitly deleted", + glev->list[l].domain_id, glev->list[l].server_id, + glev->list[l].domain_id); + else if (rb_state_gtid->seq_no < glev->list[l].seq_no) + sprintf(errbuf, + "having a gtid '%u-%u-%llu' which is less than " + "the '%u-%u-%llu' of the gtid list describing an earlier state. " + "The state may have been affected by manually injecting " + "a lower sequence number gtid or via replication", + rb_state_gtid->domain_id, rb_state_gtid->server_id, + rb_state_gtid->seq_no, glev->list[l].domain_id, + glev->list[l].server_id, glev->list[l].seq_no); + if (strlen(errbuf)) // use strlen() as cheap flag + push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, + ER_BINLOG_CANT_DELETE_GTID_DOMAIN, + "The current gtid binlog state is incompatible with " + "a former one %s.", errbuf); + } + + /* + For each domain_id from ids + If the domain is already absent from the binlog state + Warn && continue + If any GTID with that domain in binlog state is missing from glev.list + Error out binlog state can't change + */ + for (ulong i= 0; i < ids->elements; i++) + { + rpl_binlog_state::element *elem= NULL; + uint32 *ptr_domain_id; + bool all_found; + + ptr_domain_id= (uint32*) dynamic_array_ptr(ids, i); + elem= (rpl_binlog_state::element *) + my_hash_search(&hash, (const uchar *) ptr_domain_id, + sizeof(ptr_domain_id[0])); + if (!elem) + { + push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, + ER_BINLOG_CANT_DELETE_GTID_DOMAIN, + "The gtid domain being deleted ('%lu') is not in " + "the current binlog state", (unsigned long) *ptr_domain_id); + continue; + } + + all_found= true; + for (k= 0; k < elem->hash.records && all_found; k++) + { + rpl_gtid *d_gtid= (rpl_gtid *)my_hash_element(&elem->hash, k); + bool match_found= false; + for (ulong l= 0; l < glev->count && !match_found; l++) + match_found= match_found || (*d_gtid == glev->list[l]); + if (!match_found) + all_found= false; + } + + if (!all_found) + { + sprintf(errbuf, "binlog files may contain gtids from the domain ('%u') " + "being deleted. Make sure to first purge those files", + *ptr_domain_id); + errmsg= errbuf; + goto end; + } + // compose a sequence of unique pointers to domain object + for (k= 0; k < domain_unique.elements; k++) + { + if ((rpl_binlog_state::element*) dynamic_array_ptr(&domain_unique, k) + == elem) + break; // domain_id's elem has been already in + } + if (k == domain_unique.elements) // proven not to have duplicates + insert_dynamic(&domain_unique, (uchar*) &elem); + } + + // Domain removal from binlog state + for (k= 0; k < domain_unique.elements; k++) + { + rpl_binlog_state::element *elem= *(rpl_binlog_state::element**) + dynamic_array_ptr(&domain_unique, k); + my_hash_free(&elem->hash); + my_hash_delete(&hash, (uchar*) elem); + } + + DBUG_ASSERT(strlen(errbuf) == 0); + + if (domain_unique.elements == 0) + errmsg= ""; + +end: + mysql_mutex_unlock(&LOCK_binlog_state); + delete_dynamic(&domain_unique); + + DBUG_RETURN(errmsg); +} + +slave_connection_state::slave_connection_state() +{ + my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, + offsetof(entry, gtid) + offsetof(rpl_gtid, domain_id), + sizeof(rpl_gtid::domain_id), NULL, my_free, HASH_UNIQUE); + my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0)); +} + + +slave_connection_state::~slave_connection_state() +{ + my_hash_free(&hash); + delete_dynamic(>id_sort_array); +} + + +/* + Create a hash from the slave GTID state that is sent to master when slave + connects to start replication. + + The state is sent as <GTID>,<GTID>,...,<GTID>, for example: + + 0-2-112,1-4-1022 + + The state gives for each domain_id the GTID to start replication from for + the corresponding replication stream. So domain_id must be unique. + + Returns 0 if ok, non-zero if error due to malformed input. + + Note that input string is built by slave server, so it will not be incorrect + unless bug/corruption/malicious server. So we just need basic sanity check, + not fancy user-friendly error message. +*/ + +int +slave_connection_state::load(const char *slave_request, size_t len) +{ + const char *p, *end; + uchar *rec; + rpl_gtid *gtid; + const entry *e; + + reset(); + p= slave_request; + end= slave_request + len; + if (p == end) + return 0; + for (;;) + { + if (!(rec= (uchar *)my_malloc(PSI_INSTRUMENT_ME, sizeof(entry), MYF(MY_WME)))) + return 1; + gtid= &((entry *)rec)->gtid; + if (gtid_parser_helper(&p, end, gtid)) + { + my_free(rec); + my_error(ER_INCORRECT_GTID_STATE, MYF(0)); + return 1; + } + if ((e= (const entry *) + my_hash_search(&hash, (const uchar *)(>id->domain_id), + sizeof(gtid->domain_id)))) + { + my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id, + gtid->server_id, (ulonglong)gtid->seq_no, e->gtid.domain_id, + e->gtid.server_id, (ulonglong)e->gtid.seq_no, gtid->domain_id); + my_free(rec); + return 1; + } + ((entry *)rec)->flags= 0; + if (my_hash_insert(&hash, rec)) + { + my_free(rec); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + if (p == end) + break; /* Finished. */ + if (*p != ',') + { + my_error(ER_INCORRECT_GTID_STATE, MYF(0)); + return 1; + } + ++p; + } + + return 0; +} + + +int +slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count) +{ + uint32 i; + + reset(); + for (i= 0; i < count; ++i) + if (update(>id_list[i])) + return 1; + return 0; +} + + +static int +slave_connection_state_load_cb(rpl_gtid *gtid, void *data) +{ + slave_connection_state *state= (slave_connection_state *)data; + return state->update(gtid); +} + + +/* + Same as rpl_slave_state::tostring(), but populates a slave_connection_state + instead. +*/ +int +slave_connection_state::load(rpl_slave_state *state, + rpl_gtid *extra_gtids, uint32 num_extra) +{ + reset(); + return state->iterate(slave_connection_state_load_cb, this, + extra_gtids, num_extra, false); +} + + +slave_connection_state::entry * +slave_connection_state::find_entry(uint32 domain_id) +{ + return (entry *) my_hash_search(&hash, (const uchar *)(&domain_id), + sizeof(domain_id)); +} + + +rpl_gtid * +slave_connection_state::find(uint32 domain_id) +{ + entry *e= find_entry(domain_id); + if (!e) + return NULL; + return &e->gtid; +} + + +int +slave_connection_state::update(const rpl_gtid *in_gtid) +{ + entry *e; + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), + sizeof(in_gtid->domain_id)); + if (rec) + { + e= (entry *)rec; + e->gtid= *in_gtid; + return 0; + } + + if (!(e= (entry *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*e), MYF(MY_WME)))) + return 1; + e->gtid= *in_gtid; + e->flags= 0; + if (my_hash_insert(&hash, (uchar *)e)) + { + my_free(e); + return 1; + } + + return 0; +} + + +void +slave_connection_state::remove(const rpl_gtid *in_gtid) +{ + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), + sizeof(in_gtid->domain_id)); +#ifdef DBUG_ASSERT_EXISTS + bool err; + rpl_gtid *slave_gtid= &((entry *)rec)->gtid; + DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */); + DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id); + DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no); + err= +#endif + my_hash_delete(&hash, rec); + DBUG_ASSERT(!err); +} + + +void +slave_connection_state::remove_if_present(const rpl_gtid *in_gtid) +{ + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), + sizeof(in_gtid->domain_id)); + if (rec) + my_hash_delete(&hash, rec); +} + + +int +slave_connection_state::to_string(String *out_str) +{ + out_str->length(0); + return append_to_string(out_str); +} + + +int +slave_connection_state::append_to_string(String *out_str) +{ + uint32 i; + bool first; + + first= true; + for (i= 0; i < hash.records; ++i) + { + const entry *e= (const entry *)my_hash_element(&hash, i); + if (rpl_slave_state_tostring_helper(out_str, &e->gtid, &first)) + return 1; + } + return 0; +} + + +int +slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) +{ + uint32 i, pos; + + pos= 0; + for (i= 0; i < hash.records; ++i) + { + entry *e; + if (pos >= list_size) + return 1; + e= (entry *)my_hash_element(&hash, i); + memcpy(>id_list[pos++], &e->gtid, sizeof(e->gtid)); + } + + return 0; +} + + +/* + Check if the GTID position has been reached, for mysql_binlog_send(). + + The position has not been reached if we have anything in the state, unless + it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not + belong to this master at all), or the START_OWN_SLAVE_POS (which means that + we start on an old position from when the server was a slave with + --log-slave-updates=0). +*/ +bool +slave_connection_state::is_pos_reached() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + { + entry *e= (entry *)my_hash_element(&hash, i); + if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN))) + return false; + } + + return true; +} + + +/* + Execute a MASTER_GTID_WAIT(). + The position to wait for is in gtid_str in string form. + The timeout in microseconds is in timeout_us, zero means no timeout. + + Returns: + 1 for error. + 0 for wait completed. + -1 for wait timed out. +*/ +int +gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us) +{ + int err; + rpl_gtid *wait_pos; + uint32 count, i; + struct timespec wait_until, *wait_until_ptr; + ulonglong before; + + /* Wait for the empty position returns immediately. */ + if (gtid_str->length() == 0) + { + status_var_increment(thd->status_var.master_gtid_wait_count); + return 0; + } + + if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(), + &count))) + { + my_error(ER_INCORRECT_GTID_STATE, MYF(0)); + return 1; + } + status_var_increment(thd->status_var.master_gtid_wait_count); + before= microsecond_interval_timer(); + + if (timeout_us >= 0) + { + set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us); + wait_until_ptr= &wait_until; + } + else + wait_until_ptr= NULL; + err= 0; + for (i= 0; i < count; ++i) + { + if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr))) + break; + } + switch (err) + { + case -1: + status_var_increment(thd->status_var.master_gtid_wait_timeouts); + /* fall through */ + case 0: + status_var_add(thd->status_var.master_gtid_wait_time, + static_cast<ulong> + (microsecond_interval_timer() - before)); + } + my_free(wait_pos); + return err; +} + + +void +gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he) +{ + queue_element *qe; + + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + if (queue_empty(&he->queue)) + return; + qe= (queue_element *)queue_top(&he->queue); + qe->do_small_wait= true; + mysql_cond_signal(&qe->thd->COND_wakeup_ready); +} + +void +gtid_waiting::process_wait_hash(uint64 wakeup_seq_no, + gtid_waiting::hash_element *he) +{ + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + + for (;;) + { + queue_element *qe; + + if (queue_empty(&he->queue)) + break; + qe= (queue_element *)queue_top(&he->queue); + if (qe->wait_seq_no > wakeup_seq_no) + break; + DBUG_ASSERT(!qe->done); + queue_remove_top(&he->queue); + qe->done= true;; + mysql_cond_signal(&qe->thd->COND_wakeup_ready); + } +} + + +/* + Execute a MASTER_GTID_WAIT() for one specific domain. + + The implementation is optimised primarily for (1) minimal performance impact + on the slave replication threads, and secondarily for (2) quick performance + of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent + read to clients in an async replication read-scaleout scenario. + + To achieve (1), we have a "small" wait and a "large" wait. The small wait + contends with the replication threads on the lock on the gtid_slave_pos, so + only minimal processing is done under that lock, and only a single waiter at + a time does the small wait. + + If there is already a small waiter, a new thread will either replace the + small waiter (if it needs to wait for an earlier sequence number), or + instead do a "large" wait. + + Once awoken on the small wait, the waiting thread releases the lock shared + with the SQL threads quickly, and then processes all waiters currently doing + the large wait using a different lock that does not impact replication. + + This way, the SQL threads only need to do a single check + possibly a + pthread_cond_signal() when updating the gtid_slave_state, and the time that + non-SQL threads contend for the lock on gtid_slave_state is minimized. + + There is always at least one thread that has the responsibility to ensure + that there is a small waiter; this thread has queue_element::do_small_wait + set to true. This thread will do the small wait until it is done, at which + point it will make sure to pass on the responsibility to another thread. + Normally only one thread has do_small_wait==true, but it can occasionally + happen that there is more than one, when threads race one another for the + lock on the small wait (this results in slightly increased activity on the + small lock but is otherwise harmless). + + Returns: + 0 Wait completed normally + -1 Wait completed due to timeout + 1 An error (my_error() will have been called to set the error in the da) +*/ +int +gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, + struct timespec *wait_until) +{ + bool timed_out= false; +#ifdef HAVE_REPLICATION + queue_element elem; + uint32 domain_id= wait_gtid->domain_id; + uint64 seq_no= wait_gtid->seq_no; + hash_element *he; + rpl_slave_state::element *slave_state_elem= NULL; + PSI_stage_info old_stage; + bool did_enter_cond= false; + + elem.wait_seq_no= seq_no; + elem.thd= thd; + elem.done= false; + + mysql_mutex_lock(&LOCK_gtid_waiting); + if (!(he= get_entry(wait_gtid->domain_id))) + { + mysql_mutex_unlock(&LOCK_gtid_waiting); + return 1; + } + /* + If there is already another waiter with seq_no no larger than our own, + we are sure that there is already a small waiter that will wake us up + (or later pass the small wait responsibility to us). So in this case, we + do not need to touch the small wait lock at all. + */ + elem.do_small_wait= + (queue_empty(&he->queue) || + ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no); + + if (register_in_wait_queue(thd, wait_gtid, he, &elem)) + { + mysql_mutex_unlock(&LOCK_gtid_waiting); + return 1; + } + /* + Loop, doing either the small or large wait as appropriate, until either + the position waited for is reached, or we get a kill or timeout. + */ + for (;;) + { + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + + if (elem.do_small_wait) + { + uint64 wakeup_seq_no; + queue_element *cur_waiter; + + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + /* + The elements in the gtid_slave_state_hash are never re-allocated once + they enter the hash, so we do not need to re-do the lookup after releasing + and re-aquiring the lock. + */ + if (!slave_state_elem && + !(slave_state_elem= rpl_global_gtid_slave_state->get_element(domain_id))) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + remove_from_wait_queue(he, &elem); + promote_new_waiter(he); + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&LOCK_gtid_waiting); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + + if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no) + { + /* + We do not have to wait. (We will be removed from the wait queue when + we call process_wait_hash() below. + */ + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + } + else if ((cur_waiter= slave_state_elem->gtid_waiter) && + slave_state_elem->min_wait_seq_no <= seq_no) + { + /* + There is already a suitable small waiter, go do the large wait. + (Normally we would not have needed to check the small wait in this + case, but it can happen if we race with another thread for the small + lock). + */ + elem.do_small_wait= false; + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + } + else + { + /* + We have to do the small wait ourselves (stealing it from any thread + that might already be waiting for a later seq_no). + */ + slave_state_elem->gtid_waiter= &elem; + slave_state_elem->min_wait_seq_no= seq_no; + if (cur_waiter) + { + /* We stole the wait, so wake up the old waiting thread. */ + mysql_cond_signal(&slave_state_elem->COND_wait_gtid); + } + + /* Release the large lock, and do the small wait. */ + if (did_enter_cond) + { + thd->EXIT_COND(&old_stage); + did_enter_cond= false; + } + else + mysql_mutex_unlock(&LOCK_gtid_waiting); + thd->ENTER_COND(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state->LOCK_slave_state, + &stage_master_gtid_wait_primary, &old_stage); + do + { + if (unlikely(thd->check_killed(1))) + break; + else if (wait_until) + { + int err= + mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state->LOCK_slave_state, + wait_until); + if (err == ETIMEDOUT || err == ETIME) + { + timed_out= true; + break; + } + } + else + mysql_cond_wait(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state->LOCK_slave_state); + } while (slave_state_elem->gtid_waiter == &elem); + wakeup_seq_no= slave_state_elem->highest_seq_no; + /* + If we aborted due to timeout or kill, remove us as waiter. + + If we were replaced by another waiter with a smaller seq_no, then we + no longer have responsibility for the small wait. + */ + if ((cur_waiter= slave_state_elem->gtid_waiter)) + { + if (cur_waiter == &elem) + slave_state_elem->gtid_waiter= NULL; + else if (slave_state_elem->min_wait_seq_no <= seq_no) + elem.do_small_wait= false; + } + thd->EXIT_COND(&old_stage); + + mysql_mutex_lock(&LOCK_gtid_waiting); + } + + /* + Note that hash_entry pointers do not change once allocated, so we do + not need to lookup `he' again after re-aquiring LOCK_gtid_waiting. + */ + process_wait_hash(wakeup_seq_no, he); + } + else + { + /* Do the large wait. */ + if (!did_enter_cond) + { + thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting, + &stage_master_gtid_wait, &old_stage); + did_enter_cond= true; + } + while (!elem.done && likely(!thd->check_killed(1))) + { + thd_wait_begin(thd, THD_WAIT_BINLOG); + if (wait_until) + { + int err= mysql_cond_timedwait(&thd->COND_wakeup_ready, + &LOCK_gtid_waiting, wait_until); + if (err == ETIMEDOUT || err == ETIME) + timed_out= true; + } + else + mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting); + thd_wait_end(thd); + if (elem.do_small_wait || timed_out) + break; + } + } + + if ((thd->killed || timed_out) && !elem.done) + { + /* Aborted, so remove ourselves from the hash. */ + remove_from_wait_queue(he, &elem); + elem.done= true; + } + if (elem.done) + { + /* + If our wait is done, but we have (or were passed) responsibility for + the small wait, then we need to pass on that task to someone else. + */ + if (elem.do_small_wait) + promote_new_waiter(he); + break; + } + } + + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&LOCK_gtid_waiting); + if (thd->killed) + thd->send_kill_message(); +#endif /* HAVE_REPLICATION */ + return timed_out ? -1 : 0; +} + + +static void +free_hash_element(void *p) +{ + gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p; + delete_queue(&e->queue); + my_free(e); +} + + +void +gtid_waiting::init() +{ + my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, + offsetof(hash_element, domain_id), + sizeof(hash_element::domain_id), NULL, + free_hash_element, HASH_UNIQUE); + mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0); +} + + +void +gtid_waiting::destroy() +{ + mysql_mutex_destroy(&LOCK_gtid_waiting); + my_hash_free(&hash); +} + + +static int +cmp_queue_elem(void *, uchar *a, uchar *b) +{ + uint64 seq_no_a= *(uint64 *)a; + uint64 seq_no_b= *(uint64 *)b; + if (seq_no_a < seq_no_b) + return -1; + else if (seq_no_a == seq_no_b) + return 0; + else + return 1; +} + + +gtid_waiting::hash_element * +gtid_waiting::get_entry(uint32 domain_id) +{ + hash_element *e; + + if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, + sizeof(domain_id)))) + return e; + + if (!(e= (hash_element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*e), MYF(MY_WME)))) + return NULL; + + if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0, + cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + my_free(e); + return NULL; + } + e->domain_id= domain_id; + if (my_hash_insert(&hash, (uchar *)e)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + delete_queue(&e->queue); + my_free(e); + return NULL; + } + return e; +} + + +int +gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, + gtid_waiting::hash_element *he, + gtid_waiting::queue_element *elem) +{ + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + + if (queue_insert_safe(&he->queue, (uchar *)elem)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + + return 0; +} + + +void +gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he, + gtid_waiting::queue_element *elem) +{ + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + + queue_remove(&he->queue, elem->queue_idx); +} + +#endif + +void free_domain_lookup_element(void *p) +{ + struct Binlog_gtid_state_validator::audit_elem *audit_elem= + (struct Binlog_gtid_state_validator::audit_elem *) p; + delete_dynamic(&audit_elem->late_gtids_previous); + delete_dynamic(&audit_elem->late_gtids_real); + my_free(audit_elem); +} + +Binlog_gtid_state_validator::Binlog_gtid_state_validator() +{ + my_hash_init(PSI_INSTRUMENT_ME, &m_audit_elem_domain_lookup, &my_charset_bin, 32, + offsetof(struct audit_elem, domain_id), sizeof(uint32), + NULL, free_domain_lookup_element, HASH_UNIQUE); +} + +Binlog_gtid_state_validator::~Binlog_gtid_state_validator() +{ + my_hash_free(&m_audit_elem_domain_lookup); +} + +void Binlog_gtid_state_validator::initialize_start_gtids(rpl_gtid *start_gtids, + size_t n_gtids) +{ + size_t i; + for(i= 0; i < n_gtids; i++) + { + rpl_gtid *domain_state_gtid= &start_gtids[i]; + + /* + If we are initializing from a GLLE, we can have repeat domain ids from + differing servers, so we want to ensure our start gtid matches the last + known position + */ + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, + (const uchar *) &(domain_state_gtid->domain_id), 0); + if (audit_elem) + { + /* + We have this domain already specified, so try to overwrite with the + more recent GTID + */ + if (domain_state_gtid->seq_no > audit_elem->start_gtid.seq_no) + audit_elem->start_gtid = *domain_state_gtid; + continue; + } + + /* Initialize a new domain */ + audit_elem= (struct audit_elem *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(struct audit_elem), MYF(MY_WME)); + if (!audit_elem) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return; + } + + audit_elem->domain_id= start_gtids[i].domain_id; + audit_elem->start_gtid= start_gtids[i]; + audit_elem->last_gtid= {audit_elem->domain_id, 0, 0}; + + my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_real, + sizeof(rpl_gtid), 8, 8, MYF(0)); + my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_previous, + sizeof(rpl_gtid), 8, 8, MYF(0)); + + if (my_hash_insert(&m_audit_elem_domain_lookup, (uchar *) audit_elem)) + { + my_free(audit_elem); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return; + } + } +} + +my_bool Binlog_gtid_state_validator::initialize_gtid_state(FILE *out, + rpl_gtid *gtids, + size_t n_gtids) +{ + size_t i; + my_bool err= FALSE; + + /* + We weren't initialized with starting positions explicitly, so assume the + starting positions of the current gtid state + */ + if (!m_audit_elem_domain_lookup.records) + initialize_start_gtids(gtids, n_gtids); + + for(i= 0; i < n_gtids; i++) + { + rpl_gtid *domain_state_gtid= >ids[i]; + + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, + (const uchar *) &(domain_state_gtid->domain_id), 0); + + if (!audit_elem) + { + Binlog_gtid_state_validator::error( + out, + "Starting GTID position list does not specify an initial value " + "for domain %u, whose events may be present in the requested binlog " + "file(s). The last known position for this domain was %u-%u-%llu.", + domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid))); + err= TRUE; + continue; + } + + if (audit_elem->start_gtid.seq_no < domain_state_gtid->seq_no) + { + Binlog_gtid_state_validator::error( + out, + "Binary logs are missing data for domain %u. Expected data to " + "start from state %u-%u-%llu; however, the initial GTID state of " + "the logs was %u-%u-%llu.", + domain_state_gtid->domain_id, PARAM_GTID(audit_elem->start_gtid), + PARAM_GTID((*domain_state_gtid))); + err= TRUE; + continue; + } + + if (domain_state_gtid->seq_no > audit_elem->last_gtid.seq_no) + audit_elem->last_gtid= *domain_state_gtid; + } + return err; +} + +my_bool Binlog_gtid_state_validator::verify_stop_state(FILE *out, + rpl_gtid *stop_gtids, + size_t n_stop_gtids) +{ + size_t i; + for(i= 0; i < n_stop_gtids; i++) + { + rpl_gtid *stop_gtid= &stop_gtids[i]; + + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, + (const uchar *) &(stop_gtid->domain_id), 0); + + /* + It is okay if stop gtid doesn't exist in current state because it will be treated + as a new domain + */ + if (audit_elem && stop_gtid->seq_no <= audit_elem->start_gtid.seq_no) + { + Binlog_gtid_state_validator::error( + out, + "--stop-position GTID %u-%u-%llu does not exist in the " + "specified binlog files. The current GTID state of domain %u in the " + "specified binary logs is %u-%u-%llu", + PARAM_GTID((*stop_gtid)), stop_gtid->domain_id, + PARAM_GTID(audit_elem->start_gtid)); + return TRUE; + } + } + + /* No issues with any GTIDs */ + return FALSE; +} + +my_bool +Binlog_gtid_state_validator::verify_gtid_state(FILE *out, + rpl_gtid *domain_state_gtid) +{ + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, + (const uchar *) &(domain_state_gtid->domain_id), 0); + + if (!audit_elem) + { + Binlog_gtid_state_validator::warn( + out, + "Binary logs are missing data for domain %u. The current binary log " + "specified its " + "current state for this domain as %u-%u-%llu, but neither the " + "starting GTID position list nor any processed events have " + "mentioned " + "this domain.", + domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid))); + return TRUE; + } + + if (audit_elem->last_gtid.seq_no < domain_state_gtid->seq_no) + { + Binlog_gtid_state_validator::warn( + out, + "Binary logs are missing data for domain %u. The current binary log " + "state is %u-%u-%llu, but the last seen event was %u-%u-%llu.", + domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid)), + PARAM_GTID(audit_elem->last_gtid)); + return TRUE; + } + + return FALSE; +} + +my_bool Binlog_gtid_state_validator::record(rpl_gtid *gtid) +{ + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, (const uchar *) &(gtid->domain_id), 0); + + if (!audit_elem) + { + /* + We haven't seen any GTIDs in this domian yet. Perform initial set up for + this domain so we can monitor its events. + */ + audit_elem= (struct audit_elem *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(struct audit_elem), MYF(MY_WME)); + if (!audit_elem) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return TRUE; + } + + audit_elem->domain_id= gtid->domain_id; + audit_elem->last_gtid= *gtid; + audit_elem->start_gtid= {gtid->domain_id, 0, 0}; + + my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_real, + sizeof(rpl_gtid), 8, 8, MYF(0)); + my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_previous, + sizeof(rpl_gtid), 8, 8, MYF(0)); + + if (my_hash_insert(&m_audit_elem_domain_lookup, (uchar *) audit_elem)) + { + my_free(audit_elem); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return TRUE; + } + } + else + { + /* Out of order check */ + if (gtid->seq_no <= audit_elem->last_gtid.seq_no && + gtid->seq_no >= audit_elem->start_gtid.seq_no) + { + /* GTID is out of order */ + insert_dynamic(&audit_elem->late_gtids_real, (const void *) gtid); + insert_dynamic(&audit_elem->late_gtids_previous, + (const void *) &(audit_elem->last_gtid)); + + return TRUE; + } + else + { + /* GTID is valid */ + audit_elem->last_gtid= *gtid; + } + } + + return FALSE; +} + +/* + Data structure used to help pass data into report_audit_findings because + my_hash_iterate only passes one parameter +*/ +struct gtid_report_ctx +{ + FILE *out_file; + my_bool is_strict_mode; + my_bool contains_err; +}; + +static my_bool report_audit_findings(void *entry, void *report_ctx_arg) +{ + struct Binlog_gtid_state_validator::audit_elem *audit_el= + (struct Binlog_gtid_state_validator::audit_elem *) entry; + + struct gtid_report_ctx *report_ctx= + (struct gtid_report_ctx *) report_ctx_arg; + FILE *out= report_ctx->out_file; + my_bool is_strict_mode= report_ctx->is_strict_mode; + size_t i; + void (*report_f)(FILE*, const char*, ...); + + if (is_strict_mode) + report_f= Binlog_gtid_state_validator::error; + else + report_f= Binlog_gtid_state_validator::warn; + + if (audit_el) + { + if (audit_el->last_gtid.seq_no < audit_el->start_gtid.seq_no) + { + report_f(out, + "Binary logs never reached expected GTID state of %u-%u-%llu", + PARAM_GTID(audit_el->start_gtid)); + report_ctx->contains_err= TRUE; + } + + /* Report any out of order GTIDs */ + for(i= 0; i < audit_el->late_gtids_real.elements; i++) + { + rpl_gtid *real_gtid= + (rpl_gtid *) dynamic_array_ptr(&(audit_el->late_gtids_real), i); + rpl_gtid *last_gtid= (rpl_gtid *) dynamic_array_ptr( + &(audit_el->late_gtids_previous), i); + DBUG_ASSERT(real_gtid && last_gtid); + + report_f(out, + "Found out of order GTID. Got %u-%u-%llu after %u-%u-%llu", + PARAM_GTID((*real_gtid)), PARAM_GTID((*last_gtid))); + report_ctx->contains_err= TRUE; + } + } + + return FALSE; +} + +my_bool Binlog_gtid_state_validator::report(FILE *out, my_bool is_strict_mode) +{ + struct gtid_report_ctx report_ctx; + report_ctx.out_file= out; + report_ctx.is_strict_mode= is_strict_mode; + report_ctx.contains_err= FALSE; + my_hash_iterate(&m_audit_elem_domain_lookup, report_audit_findings, &report_ctx); + fflush(out); + return is_strict_mode ? report_ctx.contains_err : FALSE; +} + +Window_gtid_event_filter::Window_gtid_event_filter() + : m_has_start(FALSE), m_has_stop(FALSE), m_is_active(FALSE), + m_has_passed(FALSE) +{ + // m_start and m_stop do not need initial values if unused +} + +int Window_gtid_event_filter::set_start_gtid(rpl_gtid *start) +{ + if (m_has_start) + { + sql_print_error( + "Start position cannot have repeated domain " + "ids (found %u-%u-%llu when %u-%u-%llu was previously specified)", + PARAM_GTID((*start)), PARAM_GTID(m_start)); + return 1; + } + + m_has_start= TRUE; + m_start= *start; + return 0; +} + +int Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop) +{ + if (m_has_stop) + { + sql_print_error( + "Stop position cannot have repeated domain " + "ids (found %u-%u-%llu when %u-%u-%llu was previously specified)", + PARAM_GTID((*stop)), PARAM_GTID(m_stop)); + return 1; + } + + m_has_stop= TRUE; + m_stop= *stop; + return 0; +} + +my_bool Window_gtid_event_filter::is_range_invalid() +{ + if (m_has_start && m_has_stop && m_start.seq_no > m_stop.seq_no) + { + sql_print_error( + "Queried GTID range is invalid in strict mode. Stop position " + "%u-%u-%llu is not greater than or equal to start %u-%u-%llu.", + PARAM_GTID(m_stop), PARAM_GTID(m_start)); + return TRUE; + } + return FALSE; +} + +static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->seq_no >= boundary->seq_no; +} + +static inline my_bool is_gtid_at_or_before(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->seq_no <= boundary->seq_no; +} + +my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + /* Assume result should be excluded to start */ + my_bool should_exclude= TRUE; + + DBUG_ASSERT((m_has_start && gtid->domain_id == m_start.domain_id) || + (m_has_stop && gtid->domain_id == m_stop.domain_id)); + + if (!m_is_active && !m_has_passed) + { + /* + This filter has not yet been activated. Check if the gtid is within the + bounds of this window. + */ + + if (!m_has_start && is_gtid_at_or_before(&m_stop, gtid)) + { + /* + Start GTID was not provided, so we want to include everything from here + up to m_stop + */ + m_is_active= TRUE; + should_exclude= FALSE; + } + else if ((m_has_start && is_gtid_at_or_after(&m_start, gtid)) && + (!m_has_stop || is_gtid_at_or_before(&m_stop, gtid))) + { + m_is_active= TRUE; + + DBUG_PRINT("gtid-event-filter", + ("Window: Begin (%d-%d-%llu, %d-%d-%llu]", + PARAM_GTID(m_start), PARAM_GTID(m_stop))); + + /* + As the start of the range is exclusive, if this gtid is the start of + the range, exclude it + */ + if (gtid->seq_no == m_start.seq_no) + should_exclude= TRUE; + else + should_exclude= FALSE; + + if (m_has_stop && gtid->seq_no == m_stop.seq_no) + { + m_has_passed= TRUE; + DBUG_PRINT("gtid-event-filter", + ("Window: End (%d-%d-%llu, %d-%d-%llu]", + PARAM_GTID(m_start), PARAM_GTID(m_stop))); + } + } + } /* if (!m_is_active && !m_has_passed) */ + else if (m_is_active && !m_has_passed) + { + /* + This window is currently active so we want the event group to be included + in the results. Additionally check if we are at the end of the window. + If no end of the window is provided, go indefinitely + */ + should_exclude= FALSE; + + if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid)) + { + DBUG_PRINT("gtid-event-filter", + ("Window: End (%d-%d-%llu, %d-%d-%llu]", + PARAM_GTID(m_start), PARAM_GTID(m_stop))); + m_is_active= FALSE; + m_has_passed= TRUE; + + if (!is_gtid_at_or_before(&m_stop, gtid)) + { + /* + The GTID is after the finite stop of the window, don't let it pass + through + */ + should_exclude= TRUE; + } + } + } + + return should_exclude; +} + +my_bool Window_gtid_event_filter::has_finished() +{ + return m_has_stop ? m_has_passed : FALSE; +} + +void free_u32_gtid_filter_element(void *p) +{ + gtid_filter_element<uint32> *gfe= (gtid_filter_element<uint32> *) p; + if (gfe->filter) + delete gfe->filter; + my_free(gfe); +} + +template <typename T> +Id_delegating_gtid_event_filter<T>::Id_delegating_gtid_event_filter() + : m_num_stateful_filters(0), m_num_completed_filters(0), + m_id_restriction_mode(id_restriction_mode::MODE_NOT_SET) +{ + void (*free_func)(void *); + if (std::is_same<T,uint32>::value) + free_func= free_u32_gtid_filter_element; + else + DBUG_ASSERT(0); + + my_hash_init(PSI_INSTRUMENT_ME, &m_filters_by_id_hash, &my_charset_bin, 32, + offsetof(gtid_filter_element<T>, identifier), + sizeof(T), NULL, free_func, + HASH_UNIQUE); + + m_default_filter= new Accept_all_gtid_filter(); +} + +template <typename T> +Id_delegating_gtid_event_filter<T>::~Id_delegating_gtid_event_filter() +{ + my_hash_free(&m_filters_by_id_hash); + delete m_default_filter; +} + +template <typename T> +void Id_delegating_gtid_event_filter<T>::set_default_filter( + Gtid_event_filter *filter) +{ + if (m_default_filter) + delete m_default_filter; + + m_default_filter= filter; +} + +template <typename T> +gtid_filter_element<T> * +Id_delegating_gtid_event_filter<T>::find_or_create_filter_element_for_id( + T filter_id) +{ + gtid_filter_element<T> *fe= + (gtid_filter_element<T> *) my_hash_search( + &m_filters_by_id_hash, (const uchar *) &filter_id, 0); + + if (!fe) + { + gtid_filter_element<T> *new_fe= (gtid_filter_element<T> *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element<T>), MYF(MY_WME)); + new_fe->filter= NULL; + new_fe->identifier= filter_id; + if (my_hash_insert(&m_filters_by_id_hash, (uchar*) new_fe)) + { + my_free(new_fe); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return NULL; + } + fe= new_fe; + } + + return fe; +} + +template <typename T> +my_bool Id_delegating_gtid_event_filter<T>::has_finished() +{ + /* + If all user-defined filters have deactivated, we are effectively + deactivated + */ + return m_num_stateful_filters && + m_num_completed_filters == m_num_stateful_filters; +} + +template <typename T> +my_bool Id_delegating_gtid_event_filter<T>::exclude(rpl_gtid *gtid) +{ + T filter_id= get_id_from_gtid(gtid); + gtid_filter_element<T> *filter_element= + (gtid_filter_element<T> *) my_hash_search(&m_filters_by_id_hash, + (const uchar *) &filter_id, 0); + Gtid_event_filter *filter= + (filter_element ? filter_element->filter : m_default_filter); + my_bool ret= TRUE; + + if(!filter_element || !filter->has_finished()) + { + ret= filter->exclude(gtid); + + /* + If this is an explicitly defined filter, e.g. Window-based filter, check + if it has completed, and update the counter accordingly if so. + */ + if (filter_element && filter->has_finished()) + m_num_completed_filters++; + } + + return ret; +} + + +template <typename F> Gtid_event_filter* create_event_filter() +{ + return new F(); +} + +template <typename T> +int Id_delegating_gtid_event_filter<T>::set_id_restrictions( + T *id_list, size_t n_ids, id_restriction_mode mode) +{ + static const char *WHITELIST_NAME= "do", *BLACKLIST_NAME= "ignore"; + + size_t id_ctr; + int err; + const char *filter_name, *opposite_filter_name; + Gtid_event_filter *(*construct_filter)(void); + Gtid_event_filter *(*construct_default_filter)(void); + + /* + Set up variables which help this filter either be in whitelist or blacklist + mode + */ + if (mode == Gtid_event_filter::id_restriction_mode::WHITELIST_MODE) + { + filter_name= WHITELIST_NAME; + opposite_filter_name= BLACKLIST_NAME; + construct_filter= + create_event_filter<Accept_all_gtid_filter>; + construct_default_filter= + create_event_filter<Reject_all_gtid_filter>; + } + else + { + DBUG_ASSERT(mode == + Gtid_event_filter::id_restriction_mode::BLACKLIST_MODE); + filter_name= BLACKLIST_NAME; + opposite_filter_name= WHITELIST_NAME; + construct_filter= + create_event_filter<Reject_all_gtid_filter>; + construct_default_filter= + create_event_filter<Accept_all_gtid_filter>; + } + + if (m_id_restriction_mode != + Gtid_event_filter::id_restriction_mode::MODE_NOT_SET) + { + if (mode != m_id_restriction_mode) + { + /* + If a rule specifying the opposite version of this has already been set, + error. + */ + sql_print_error("Cannot create %s filtering rule for %s id because " + "%s rule already exists", + filter_name, get_id_type_name(), + opposite_filter_name); + err= 1; + goto err; + } + + /* This filter is specified more than once, only use the latest values */ + my_hash_reset(&m_filters_by_id_hash); + } + + for (id_ctr= 0; id_ctr < n_ids; id_ctr++) + { + T filter_id= id_list[id_ctr]; + gtid_filter_element<T> *map_element= + find_or_create_filter_element_for_id(filter_id); + + if(map_element == NULL) + { + /* + If map_element is NULL, find_or_create_filter_element_for_id failed and + has already written the error message + */ + err= 1; + goto err; + } + else if (map_element->filter == NULL) + { + map_element->filter= construct_filter(); + m_num_stateful_filters++; + } + else + { + DBUG_ASSERT(map_element->filter->get_filter_type() == + (mode == + Gtid_event_filter::id_restriction_mode::WHITELIST_MODE + ? Gtid_event_filter::ACCEPT_ALL_GTID_FILTER_TYPE + : Gtid_event_filter::REJECT_ALL_GTID_FILTER_TYPE)); + } + } + + /* + With a whitelist, we by only want to accept the ids which are specified. + Everything else should be denied. + + With a blacklist, we by default want to accept everything that is not + specified in the list + */ + set_default_filter(construct_default_filter()); + m_id_restriction_mode= mode; + err= 0; + +err: + return err; +} + +Window_gtid_event_filter * +Domain_gtid_event_filter::find_or_create_window_filter_for_id( + decltype(rpl_gtid::domain_id) domain_id) +{ + gtid_filter_element<decltype(rpl_gtid::domain_id)> *filter_element= + find_or_create_filter_element_for_id(domain_id); + Window_gtid_event_filter *wgef= NULL; + + if (filter_element->filter == NULL) + { + /* New filter */ + wgef= new Window_gtid_event_filter(); + filter_element->filter= wgef; + } + else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE) + { + /* We have an existing window filter here */ + wgef= (Window_gtid_event_filter *) filter_element->filter; + } + else + { + /* + We have an existing filter but it is not of window type so propogate NULL + filter + */ + sql_print_error("cannot subset domain id %d by position, another rule " + "exists on that domain", + domain_id); + } + + return wgef; +} + +static my_bool check_filter_entry_validity(void *entry, + void *are_filters_invalid_arg) +{ + gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe= + (gtid_filter_element<decltype(rpl_gtid::domain_id)> *) entry; + + if (fe) + { + Gtid_event_filter *gef= fe->filter; + if (gef->get_filter_type() == Gtid_event_filter::WINDOW_GTID_FILTER_TYPE) + { + Window_gtid_event_filter *wgef= (Window_gtid_event_filter *) gef; + if (wgef->is_range_invalid()) + { + *((int *) are_filters_invalid_arg)= 1; + return TRUE; + } + } + } + return FALSE; +} + +int Domain_gtid_event_filter::validate_window_filters() +{ + int are_filters_invalid= 0; + my_hash_iterate(&m_filters_by_id_hash, check_filter_entry_validity, + &are_filters_invalid); + return are_filters_invalid; +} + +int Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid) +{ + int err= 0; + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + + if (filter_to_update == NULL) + { + err= 1; + } + else if (!(err= filter_to_update->set_start_gtid(gtid))) + { + gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe= + (gtid_filter_element<decltype(rpl_gtid::domain_id)> *) my_hash_search( + &m_filters_by_id_hash, (const uchar *) &(gtid->domain_id), 0); + insert_dynamic(&m_start_filters, (const void *) &fe); + } + + return err; +} + +int Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid) +{ + int err= 0; + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + + if (filter_to_update == NULL) + { + err= 1; + } + else if (!(err= filter_to_update->set_stop_gtid(gtid))) + { + gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe= + (gtid_filter_element<decltype(rpl_gtid::domain_id)> *) my_hash_search( + &m_filters_by_id_hash, (const uchar *) &(gtid->domain_id), 0); + insert_dynamic(&m_stop_filters, (const void *) &fe); + + /* + A window with a stop position can be disabled, and is therefore stateful. + */ + m_num_stateful_filters++; + + /* + Default filtering behavior changes with GTID stop positions, where we + exclude all domains not present in the stop list + */ + if (m_default_filter->get_filter_type() == ACCEPT_ALL_GTID_FILTER_TYPE) + { + delete m_default_filter; + m_default_filter= new Reject_all_gtid_filter(); + } + } + + return err; +} + +rpl_gtid *Domain_gtid_event_filter::get_start_gtids() +{ + rpl_gtid *gtid_list; + uint32 i; + size_t n_start_gtids= get_num_start_gtids(); + + gtid_list= (rpl_gtid *) my_malloc( + PSI_INSTRUMENT_ME, n_start_gtids * sizeof(rpl_gtid), MYF(MY_WME)); + + for (i = 0; i < n_start_gtids; i++) + { + gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe= + *(gtid_filter_element<decltype(rpl_gtid::domain_id)> **) + dynamic_array_ptr(&m_start_filters, i); + DBUG_ASSERT(fe->filter && + fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE); + Window_gtid_event_filter *wgef= + (Window_gtid_event_filter *) fe->filter; + + rpl_gtid win_start_gtid= wgef->get_start_gtid(); + gtid_list[i]= win_start_gtid; + } + + return gtid_list; +} + +rpl_gtid *Domain_gtid_event_filter::get_stop_gtids() +{ + rpl_gtid *gtid_list; + uint32 i; + size_t n_stop_gtids= get_num_stop_gtids(); + + gtid_list= (rpl_gtid *) my_malloc( + PSI_INSTRUMENT_ME, n_stop_gtids * sizeof(rpl_gtid), MYF(MY_WME)); + + for (i = 0; i < n_stop_gtids; i++) + { + gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe= + *(gtid_filter_element<decltype(rpl_gtid::domain_id)> **) + dynamic_array_ptr(&m_stop_filters, i); + DBUG_ASSERT(fe->filter && + fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE); + Window_gtid_event_filter *wgef= + (Window_gtid_event_filter *) fe->filter; + + rpl_gtid win_stop_gtid= wgef->get_stop_gtid(); + gtid_list[i]= win_stop_gtid; + } + + return gtid_list; +} + +void Domain_gtid_event_filter::clear_start_gtids() +{ + uint32 i; + for (i = 0; i < get_num_start_gtids(); i++) + { + gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe= + *(gtid_filter_element<decltype(rpl_gtid::domain_id)> **) + dynamic_array_ptr(&m_start_filters, i); + DBUG_ASSERT(fe->filter && + fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE); + Window_gtid_event_filter *wgef= + (Window_gtid_event_filter *) fe->filter; + + if (wgef->has_stop()) + { + /* + Don't delete the whole filter if it already has a stop position attached + */ + wgef->clear_start_pos(); + } + else + { + /* + This domain only has a stop, so delete the whole filter + */ + my_hash_delete(&m_filters_by_id_hash, (uchar *) fe); + } + } + + reset_dynamic(&m_start_filters); +} + +void Domain_gtid_event_filter::clear_stop_gtids() +{ + uint32 i; + + for (i = 0; i < get_num_stop_gtids(); i++) + { + gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe= + *(gtid_filter_element<decltype(rpl_gtid::domain_id)> **) + dynamic_array_ptr(&m_stop_filters, i); + DBUG_ASSERT(fe->filter && + fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE); + Window_gtid_event_filter *wgef= + (Window_gtid_event_filter *) fe->filter; + + if (wgef->has_start()) + { + /* + Don't delete the whole filter if it already has a start position + attached + */ + wgef->clear_stop_pos(); + } + else + { + /* + This domain only has a start, so delete the whole filter + */ + my_hash_delete(&m_filters_by_id_hash, (uchar *) fe); + } + m_num_stateful_filters--; + } + + /* + Stop positions were cleared and we want to be inclusive again of other + domains again + */ + if (m_default_filter->get_filter_type() == REJECT_ALL_GTID_FILTER_TYPE) + { + delete m_default_filter; + m_default_filter= new Accept_all_gtid_filter(); + } + + reset_dynamic(&m_stop_filters); +} + +my_bool Domain_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + my_bool include_domain= TRUE; + /* + If GTID stop positions are provided, we limit the domains which are output + to only be those specified with stop positions + */ + if (get_num_stop_gtids()) + { + decltype(rpl_gtid::domain_id) filter_id= get_id_from_gtid(gtid); + gtid_filter_element<decltype(rpl_gtid::domain_id)> *filter_element= + (gtid_filter_element<decltype(rpl_gtid::domain_id)> *) my_hash_search( + &m_filters_by_id_hash, (const uchar *) &filter_id, 0); + if (filter_element) + { + Gtid_event_filter *filter= filter_element->filter; + if (filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE) + { + Window_gtid_event_filter *wgef= (Window_gtid_event_filter *) filter; + include_domain= wgef->has_stop(); + } + } + } + + return include_domain ? Id_delegating_gtid_event_filter::exclude(gtid) + : TRUE; +} + +Intersecting_gtid_event_filter::Intersecting_gtid_event_filter( + Gtid_event_filter *filter1, Gtid_event_filter *filter2) +{ + my_init_dynamic_array(PSI_INSTRUMENT_ME, &m_filters, + sizeof(Gtid_event_filter *), 3, 3, MYF(0)); + insert_dynamic(&m_filters, (void *) &filter1); + insert_dynamic(&m_filters, (void *) &filter2); +} + +Intersecting_gtid_event_filter::~Intersecting_gtid_event_filter() +{ + Gtid_event_filter *tmp_filter= NULL; + ulong i; + for (i= 0; i < m_filters.elements; i++) + { + tmp_filter= *(Gtid_event_filter **) dynamic_array_ptr(&m_filters, i); + delete tmp_filter; + } + delete_dynamic(&m_filters); +} + +my_bool Intersecting_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + Gtid_event_filter *tmp_filter= NULL; + ulong i; + for (i= 0; i < m_filters.elements; i++) + { + tmp_filter= *(Gtid_event_filter **) dynamic_array_ptr(&m_filters, i); + DBUG_ASSERT(tmp_filter); + if (tmp_filter->exclude(gtid)) + return TRUE; + } + return FALSE; +} + +my_bool Intersecting_gtid_event_filter::has_finished() +{ + Gtid_event_filter *tmp_filter= NULL; + ulong i; + for (i= 0; i < m_filters.elements; i++) + { + tmp_filter= *(Gtid_event_filter **) dynamic_array_ptr(&m_filters, i); + DBUG_ASSERT(tmp_filter); + if (tmp_filter->has_finished()) + return TRUE; + } + return FALSE; +} |