From 347c164c35eddab388009470e6848cb361ac93f8 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 18 May 2024 15:22:53 +0200 Subject: Merging upstream version 1:10.11.8. Signed-off-by: Daniel Baumann --- sql/wsrep_schema.cc | 301 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 190 insertions(+), 111 deletions(-) (limited to 'sql/wsrep_schema.cc') diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index c6e45340..d9baf699 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -272,25 +272,17 @@ static void finish_stmt(THD* thd) { close_thread_tables(thd); } -static int open_table(THD* thd, - const LEX_CSTRING *schema_name, - const LEX_CSTRING *table_name, - enum thr_lock_type const lock_type, - TABLE** table) { - assert(table); - *table= NULL; - +static int open_table(THD *thd, const LEX_CSTRING *schema_name, + const LEX_CSTRING *table_name, + enum thr_lock_type const lock_type, + TABLE_LIST *table_list) +{ + assert(table_list); DBUG_ENTER("Wsrep_schema::open_table()"); - - TABLE_LIST tables; - uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK | - MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY | - MYSQL_OPEN_IGNORE_FLUSH | - MYSQL_LOCK_IGNORE_TIMEOUT); - - tables.init_one_table(schema_name, - table_name, - NULL, lock_type); + const uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK | + MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY | + MYSQL_OPEN_IGNORE_FLUSH | MYSQL_LOCK_IGNORE_TIMEOUT); + table_list->init_one_table(schema_name, table_name, NULL, lock_type); thd->lex->query_tables_own_last= 0; // No need to open table if the query was bf aborted, @@ -300,37 +292,39 @@ static int open_table(THD* thd, (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED)); if (interrupted || - !open_n_lock_single_table(thd, &tables, tables.lock_type, flags)) { + !open_n_lock_single_table(thd, table_list, table_list->lock_type, flags)) + { close_thread_tables(thd); DBUG_RETURN(1); } - *table= tables.table; - (*table)->use_all_columns(); + table_list->table->use_all_columns(); DBUG_RETURN(0); } - -static int open_for_write(THD* thd, const char* table_name, TABLE** table) { +static int open_for_write(THD* thd, const char* table_name, TABLE_LIST* table_list) +{ LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() }; LEX_CSTRING table_str= { table_name, strlen(table_name) }; if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_WRITE, - table)) { + table_list)) + { // No need to log an error if the query was bf aborted, // thd client will get ER_LOCK_DEADLOCK in the end. const bool interrupted= thd->killed || (thd->is_error() && (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED)); - if (!interrupted) { + if (!interrupted) + { WSREP_ERROR("Failed to open table %s.%s for writing", schema_str.str, table_name); } return 1; } - empty_record(*table); - (*table)->use_all_columns(); - restore_record(*table, s->default_values); + empty_record(table_list->table); + table_list->table->use_all_columns(); + restore_record(table_list->table, s->default_values); return 0; } @@ -479,19 +473,21 @@ static int delete_row(TABLE* table) { return 0; } -static int open_for_read(THD* thd, const char* table_name, TABLE** table) { - +static int open_for_read(THD *thd, const char *table_name, + TABLE_LIST *table_list) +{ LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() }; LEX_CSTRING table_str= { table_name, strlen(table_name) }; if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_READ, - table)) { + table_list)) + { WSREP_ERROR("Failed to open table %s.%s for reading", schema_str.str, table_name); return 1; } - empty_record(*table); - (*table)->use_all_columns(); - restore_record(*table, s->default_values); + empty_record(table_list->table); + table_list->table->use_all_columns(); + restore_record(table_list->table, s->default_values); return 0; } @@ -752,8 +748,10 @@ int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view) assert(view.status() == Wsrep_view::primary); int ret= 1; int error; + TABLE_LIST cluster_table_l; TABLE* cluster_table= 0; - TABLE* members_table= 0; + TABLE_LIST members_table_l; + TABLE* members_table = 0; #ifdef WSREP_SCHEMA_MEMBERS_HISTORY TABLE* members_history_table= 0; #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */ @@ -778,11 +776,13 @@ int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view) Store cluster view info */ Wsrep_schema_impl::init_stmt(thd); - if (Wsrep_schema_impl::open_for_write(thd, cluster_table_str.c_str(), &cluster_table)) + if (Wsrep_schema_impl::open_for_write(thd, cluster_table_str.c_str(), &cluster_table_l)) { goto out; } + cluster_table= cluster_table_l.table; + Wsrep_schema_impl::store(cluster_table, 0, view.state_id().id()); Wsrep_schema_impl::store(cluster_table, 1, view.view_seqno().get()); Wsrep_schema_impl::store(cluster_table, 2, view.state_id().seqno().get()); @@ -802,12 +802,14 @@ int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view) */ Wsrep_schema_impl::init_stmt(thd); if (Wsrep_schema_impl::open_for_write(thd, members_table_str.c_str(), - &members_table)) + &members_table_l)) { WSREP_ERROR("failed to open wsrep.members table"); goto out; } + members_table= members_table_l.table; + for (size_t i= 0; i < view.members().size(); ++i) { Wsrep_schema_impl::store(members_table, 0, view.members()[i].id()); @@ -861,8 +863,10 @@ Wsrep_view Wsrep_schema::restore_view(THD* thd, const Wsrep_id& own_id) const { int ret= 1; int error; + TABLE_LIST cluster_table_l; TABLE* cluster_table= 0; bool end_cluster_scan= false; + TABLE_LIST members_table_l; TABLE* members_table= 0; bool end_members_scan= false; @@ -888,8 +892,12 @@ Wsrep_view Wsrep_schema::restore_view(THD* thd, const Wsrep_id& own_id) const { Read cluster info from cluster table */ Wsrep_schema_impl::init_stmt(thd); - if (Wsrep_schema_impl::open_for_read(thd, cluster_table_str.c_str(), &cluster_table) || - Wsrep_schema_impl::init_for_scan(cluster_table)) { + if (Wsrep_schema_impl::open_for_read(thd, cluster_table_str.c_str(), &cluster_table_l)) { + goto out; + } + cluster_table = cluster_table_l.table; + + if (Wsrep_schema_impl::init_for_scan(cluster_table)) { goto out; } @@ -913,8 +921,14 @@ Wsrep_view Wsrep_schema::restore_view(THD* thd, const Wsrep_id& own_id) const { Read members from members table */ Wsrep_schema_impl::init_stmt(thd); - if (Wsrep_schema_impl::open_for_read(thd, members_table_str.c_str(), &members_table) || - Wsrep_schema_impl::init_for_scan(members_table)) { + if (Wsrep_schema_impl::open_for_read(thd, members_table_str.c_str(), + &members_table_l)) + { + goto out; + } + + members_table= members_table_l.table; + if (Wsrep_schema_impl::init_for_scan(members_table)) { goto out; } end_members_scan= true; @@ -1018,14 +1032,15 @@ int Wsrep_schema::append_fragment(THD* thd, Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd); Wsrep_schema_impl::init_stmt(thd); - TABLE* frag_table= 0; - if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table)) + TABLE_LIST frag_table_l; + if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l)) { trans_rollback_stmt(thd); thd->lex->restore_backup_query_tables_list(&query_tables_list_backup); DBUG_RETURN(1); } + TABLE* frag_table= frag_table_l.table; Wsrep_schema_impl::store(frag_table, 0, server_id); Wsrep_schema_impl::store(frag_table, 1, transaction_id.get()); Wsrep_schema_impl::store(frag_table, 2, seqno.get()); @@ -1069,13 +1084,15 @@ int Wsrep_schema::update_fragment_meta(THD* thd, uchar *key=NULL; key_part_map key_map= 0; TABLE* frag_table= 0; + TABLE_LIST frag_table_l; Wsrep_schema_impl::init_stmt(thd); - if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table)) + if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l)) { thd->lex->restore_backup_query_tables_list(&query_tables_list_backup); DBUG_RETURN(1); } + frag_table= frag_table_l.table; /* Find record with the given uuid, trx id, and seqno -1 */ Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id()); @@ -1163,7 +1180,10 @@ static int remove_fragment(THD* thd, seqno.get(), error); } - ret= error; + else + { + ret= error; + } } else if (Wsrep_schema_impl::delete_row(frag_table)) { @@ -1195,12 +1215,14 @@ int Wsrep_schema::remove_fragments(THD* thd, thd->reset_n_backup_open_tables_state(&open_tables_backup); TABLE* frag_table= 0; - if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table)) + TABLE_LIST frag_table_l; + if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l)) { ret= 1; } else { + frag_table= frag_table_l.table; for (std::vector::const_iterator i= fragments.begin(); i != fragments.end(); ++i) { @@ -1243,40 +1265,35 @@ int Wsrep_schema::remove_fragments(THD* thd, DBUG_RETURN(ret); } -int Wsrep_schema::replay_transaction(THD* orig_thd, - Relay_log_info* rli, - const wsrep::ws_meta& ws_meta, - const std::vector& fragments) +static int replay_transaction(THD* thd, + THD* orig_thd, + Relay_log_info* rli, + const wsrep::ws_meta& ws_meta, + const std::vector& fragments) { - DBUG_ENTER("Wsrep_schema::replay_transaction"); - DBUG_ASSERT(!fragments.empty()); - - THD thd(next_thread_id(), true); - thd.thread_stack= (orig_thd ? orig_thd->thread_stack : - (char*) &thd); - wsrep_assign_from_threadvars(&thd); - - Wsrep_schema_impl::wsrep_off wsrep_off(&thd); - Wsrep_schema_impl::binlog_off binlog_off(&thd); - Wsrep_schema_impl::sql_safe_updates sql_safe_updates(&thd); - Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd); + Wsrep_schema_impl::wsrep_off wsrep_off(thd); + Wsrep_schema_impl::binlog_off binlog_off(thd); + Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd); + Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, thd); int ret= 1; int error; TABLE* frag_table= 0; + TABLE_LIST frag_table_l; uchar *key=NULL; key_part_map key_map= 0; for (std::vector::const_iterator i= fragments.begin(); i != fragments.end(); ++i) { - Wsrep_schema_impl::init_stmt(&thd); - if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table))) + Wsrep_schema_impl::init_stmt(thd); + if ((error= Wsrep_schema_impl::open_for_read(thd, sr_table_str.c_str(), &frag_table_l))) { WSREP_WARN("Could not open SR table for read: %d", error); - Wsrep_schema_impl::finish_stmt(&thd); - DBUG_RETURN(1); + Wsrep_schema_impl::finish_stmt(thd); + return 1; } + frag_table= frag_table_l.table; Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id()); Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get()); @@ -1305,7 +1322,7 @@ int Wsrep_schema::replay_transaction(THD* orig_thd, frag_table->field[4]->val_str(&buf); { - Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd); + Wsrep_schema_impl::thd_context_switch thd_context_switch(thd, orig_thd); ret= wsrep_apply_events(orig_thd, rli, buf.ptr(), buf.length()); if (ret) @@ -1316,18 +1333,20 @@ int Wsrep_schema::replay_transaction(THD* orig_thd, } Wsrep_schema_impl::end_index_scan(frag_table); - Wsrep_schema_impl::finish_stmt(&thd); + Wsrep_schema_impl::finish_stmt(thd); - Wsrep_schema_impl::init_stmt(&thd); + Wsrep_schema_impl::init_stmt(thd); - if ((error= Wsrep_schema_impl::open_for_write(&thd, + if ((error= Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), - &frag_table))) + &frag_table_l))) { WSREP_WARN("Could not open SR table for write: %d", error); - Wsrep_schema_impl::finish_stmt(&thd); - DBUG_RETURN(1); + Wsrep_schema_impl::finish_stmt(thd); + ret= 1; + break; } + frag_table= frag_table_l.table; error= Wsrep_schema_impl::init_for_index_scan(frag_table, key, @@ -1351,86 +1370,120 @@ int Wsrep_schema::replay_transaction(THD* orig_thd, break; } Wsrep_schema_impl::end_index_scan(frag_table); - Wsrep_schema_impl::finish_stmt(&thd); + Wsrep_schema_impl::finish_stmt(thd); my_free(key); key= NULL; } if (key) my_free(key); + + return ret; +} + +int Wsrep_schema::replay_transaction(THD* orig_thd, + Relay_log_info* rli, + const wsrep::ws_meta& ws_meta, + const std::vector& fragments) +{ + DBUG_ENTER("Wsrep_schema::replay_transaction"); + DBUG_ASSERT(!fragments.empty()); + + THD *thd= new THD(next_thread_id(), true); + if (!thd) + { + WSREP_WARN("Could not allocate memory for THD"); + DBUG_RETURN(1); + } + + thd->thread_stack= (orig_thd ? orig_thd->thread_stack : (char *) &thd); + wsrep_assign_from_threadvars(thd); + + int ret= ::replay_transaction(thd, orig_thd, rli, ws_meta, fragments); + + delete thd; DBUG_RETURN(ret); } -int Wsrep_schema::recover_sr_transactions(THD *orig_thd) +static int recover_sr_transactions(THD* storage_thd, THD* orig_thd) { - DBUG_ENTER("Wsrep_schema::recover_sr_transactions"); - THD storage_thd(next_thread_id(), true); - storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack : - (char*) &storage_thd); - wsrep_assign_from_threadvars(&storage_thd); TABLE* frag_table= 0; + TABLE_LIST frag_table_l; TABLE* cluster_table= 0; - Wsrep_storage_service storage_service(&storage_thd); - Wsrep_schema_impl::binlog_off binlog_off(&storage_thd); - Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd); - Wsrep_schema_impl::sql_safe_updates sql_safe_updates(&storage_thd); + TABLE_LIST cluster_table_l; + Wsrep_storage_service storage_service(storage_thd); + Wsrep_schema_impl::binlog_off binlog_off(storage_thd); + Wsrep_schema_impl::wsrep_off wsrep_off(storage_thd); + Wsrep_schema_impl::sql_safe_updates sql_safe_updates(storage_thd); Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, - &storage_thd); + storage_thd); Wsrep_server_state& server_state(Wsrep_server_state::instance()); int ret= 1; int error; wsrep::id cluster_id; - Wsrep_schema_impl::init_stmt(&storage_thd); - storage_thd.wsrep_skip_locking= FALSE; - if (Wsrep_schema_impl::open_for_read(&storage_thd, - cluster_table_str.c_str(), - &cluster_table) || - Wsrep_schema_impl::init_for_scan(cluster_table)) + Wsrep_schema_impl::init_stmt(storage_thd); + storage_thd->wsrep_skip_locking= FALSE; + if (Wsrep_schema_impl::open_for_read(storage_thd, cluster_table_str.c_str(), + &cluster_table_l)) { - Wsrep_schema_impl::finish_stmt(&storage_thd); - DBUG_RETURN(1); + Wsrep_schema_impl::finish_stmt(storage_thd); + return 1; + } + cluster_table= cluster_table_l.table; + + if (Wsrep_schema_impl::init_for_scan(cluster_table)) + { + Wsrep_schema_impl::finish_stmt(storage_thd); + return 1; } if ((error= Wsrep_schema_impl::next_record(cluster_table))) { Wsrep_schema_impl::end_scan(cluster_table); - Wsrep_schema_impl::finish_stmt(&storage_thd); - trans_commit(&storage_thd); + Wsrep_schema_impl::finish_stmt(storage_thd); + trans_commit(storage_thd); if (error == HA_ERR_END_OF_FILE) { WSREP_INFO("Cluster table is empty, not recovering transactions"); - DBUG_RETURN(0); + return 0; } else { WSREP_ERROR("Failed to read cluster table: %d", error); - DBUG_RETURN(1); + return 1; } } Wsrep_schema_impl::scan(cluster_table, 0, cluster_id); Wsrep_schema_impl::end_scan(cluster_table); - Wsrep_schema_impl::finish_stmt(&storage_thd); + Wsrep_schema_impl::finish_stmt(storage_thd); std::ostringstream os; os << cluster_id; WSREP_INFO("Recovered cluster id %s", os.str().c_str()); - storage_thd.wsrep_skip_locking= TRUE; - Wsrep_schema_impl::init_stmt(&storage_thd); + storage_thd->wsrep_skip_locking= TRUE; + Wsrep_schema_impl::init_stmt(storage_thd); /* Open the table for reading and writing so that fragments without valid seqno can be deleted. */ - if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) || - Wsrep_schema_impl::init_for_scan(frag_table)) + if (Wsrep_schema_impl::open_for_write(storage_thd, sr_table_str.c_str(), + &frag_table_l)) { WSREP_ERROR("Failed to open SR table for write"); goto out; } + frag_table= frag_table_l.table; + + if (Wsrep_schema_impl::init_for_scan(frag_table)) + { + WSREP_ERROR("Failed to init for index scan"); + goto out; + } while (0 == error) { @@ -1474,7 +1527,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) transaction_id))) { DBUG_ASSERT(wsrep::starts_transaction(flags)); - applier = wsrep_create_streaming_applier(&storage_thd, "recovery"); + applier = wsrep_create_streaming_applier(storage_thd, "recovery"); server_state.start_streaming_applier(server_id, transaction_id, applier); applier->start_transaction(wsrep::ws_handle(transaction_id, 0), @@ -1502,10 +1555,30 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) } } Wsrep_schema_impl::end_scan(frag_table); - Wsrep_schema_impl::finish_stmt(&storage_thd); - trans_commit(&storage_thd); - storage_thd.set_mysys_var(0); + Wsrep_schema_impl::finish_stmt(storage_thd); + trans_commit(storage_thd); + storage_thd->set_mysys_var(0); out: + return ret; +} + +int Wsrep_schema::recover_sr_transactions(THD *orig_thd) +{ + DBUG_ENTER("Wsrep_schema::recover_sr_transactions"); + + THD *storage_thd= new THD(next_thread_id(), true); + if (!storage_thd) + { + WSREP_WARN("Could not allocate memory for THD"); + DBUG_RETURN(1); + } + storage_thd->thread_stack= + (orig_thd ? orig_thd->thread_stack : (char *) &storage_thd); + wsrep_assign_from_threadvars(storage_thd); + + int ret= ::recover_sr_transactions(storage_thd, orig_thd); + + delete storage_thd; DBUG_RETURN(ret); } @@ -1521,13 +1594,15 @@ void Wsrep_schema::clear_allowlist() thd->thread_stack= (char*)&thd; wsrep_init_thd_for_schema(thd); TABLE* allowlist_table= 0; + TABLE_LIST allowlist_table_l; int error= 0; Wsrep_schema_impl::init_stmt(thd); if (Wsrep_schema_impl::open_for_write(thd, allowlist_table_str.c_str(), - &allowlist_table) || - Wsrep_schema_impl::init_for_scan(allowlist_table)) + &allowlist_table_l) || + (allowlist_table= allowlist_table_l.table, + Wsrep_schema_impl::init_for_scan(allowlist_table))) { WSREP_ERROR("Failed to open mysql.wsrep_allowlist table"); goto out; @@ -1567,14 +1642,16 @@ void Wsrep_schema::store_allowlist(std::vector& ip_allowlist) thd->thread_stack= (char*)&thd; wsrep_init_thd_for_schema(thd); TABLE* allowlist_table= 0; + TABLE_LIST allowlist_table_l; int error; Wsrep_schema_impl::init_stmt(thd); if (Wsrep_schema_impl::open_for_write(thd, allowlist_table_str.c_str(), - &allowlist_table)) + &allowlist_table_l)) { WSREP_ERROR("Failed to open mysql.wsrep_allowlist table"); goto out; } + allowlist_table= allowlist_table_l.table; for (size_t i= 0; i < ip_allowlist.size(); ++i) { Wsrep_schema_impl::store(allowlist_table, 0, ip_allowlist[i]); @@ -1618,6 +1695,7 @@ static void *allowlist_check_thread(void *param) int error; TABLE *allowlist_table= 0; + TABLE_LIST allowlist_table_l; bool match_found_or_empty= false; bool table_have_rows= false; char row[64]= { @@ -1629,8 +1707,9 @@ static void *allowlist_check_thread(void *param) */ Wsrep_schema_impl::init_stmt(&thd); if (Wsrep_schema_impl::open_for_read(&thd, allowlist_table_str.c_str(), - &allowlist_table) || - Wsrep_schema_impl::init_for_scan(allowlist_table)) + &allowlist_table_l) || + (allowlist_table= allowlist_table_l.table, + Wsrep_schema_impl::init_for_scan(allowlist_table))) { goto out; } -- cgit v1.2.3