From 3f619478f796eddbba6e39502fe941b285dd97b1 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 20:00:34 +0200 Subject: Adding upstream version 1:10.11.6. Signed-off-by: Daniel Baumann --- sql/wsrep_mysqld.cc | 3913 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 3913 insertions(+) create mode 100644 sql/wsrep_mysqld.cc (limited to 'sql/wsrep_mysqld.cc') diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc new file mode 100644 index 00000000..0a615228 --- /dev/null +++ b/sql/wsrep_mysqld.cc @@ -0,0 +1,3913 @@ +/* Copyright (c) 2008, 2023 Codership Oy + Copyright (c) 2020, 2022, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License.x1 + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +#include "sql_plugin.h" /* wsrep_plugins_pre_init() */ +#include "my_global.h" +#include "wsrep_server_state.h" +#include "wsrep_status.h" + +#include "mariadb.h" +#include +#include +#include +#include +#include /* find_temporary_table() */ +#include /* is_stat_table() */ +#include "slave.h" +#include "rpl_mi.h" +#include "sql_repl.h" +#include "rpl_filter.h" +#include "sql_callback.h" +#include "sp_head.h" +#include "sql_show.h" +#include "sp.h" +#include "handler.h" +#include "wsrep_priv.h" +#include "wsrep_thd.h" +#include "wsrep_sst.h" +#include "wsrep_utils.h" +#include "wsrep_var.h" +#include "wsrep_binlog.h" +#include "wsrep_applier.h" +#include "wsrep_schema.h" +#include "wsrep_xid.h" +#include "wsrep_trans_observer.h" +#include "mysql/service_wsrep.h" +#include +#include +#include +#include "log_event.h" +#include "sql_connect.h" +#include "thread_cache.h" +#include "debug_sync.h" + +#include + +/* wsrep-lib */ +Wsrep_server_state* Wsrep_server_state::m_instance; + +my_bool wsrep_emulate_bin_log= FALSE; // activating parts of binlog interface +my_bool wsrep_preordered_opt= FALSE; + +/* Streaming Replication */ +const char *wsrep_fragment_units[]= { "bytes", "rows", "statements", NullS }; +const char *wsrep_SR_store_types[]= { "none", "table", NullS }; + +/* + * Begin configuration options + */ + +extern my_bool plugins_are_initialized; + +/* System variables. */ +const char *wsrep_provider; +const char *wsrep_provider_options; +const char *wsrep_cluster_address; +const char *wsrep_cluster_name; +const char *wsrep_node_name; +const char *wsrep_node_address; +const char *wsrep_node_incoming_address; +const char *wsrep_start_position; +const char *wsrep_data_home_dir; +const char *wsrep_dbug_option; +const char *wsrep_notify_cmd; +const char *wsrep_status_file; +const char *wsrep_allowlist; + +ulong wsrep_debug; // Debug level logging +my_bool wsrep_convert_LOCK_to_trx; // Convert locking sessions to trx +my_bool wsrep_auto_increment_control; // Control auto increment variables +my_bool wsrep_drupal_282555_workaround; // Retry autoinc insert after dupkey +my_bool wsrep_certify_nonPK; // Certify, even when no primary key +ulong wsrep_certification_rules = WSREP_CERTIFICATION_RULES_STRICT; +my_bool wsrep_recovery; // Recovery +my_bool wsrep_log_conflicts; +my_bool wsrep_load_data_splitting= 0; // Commit load data every 10K intervals +my_bool wsrep_slave_UK_checks; // Slave thread does UK checks +my_bool wsrep_slave_FK_checks; // Slave thread does FK checks +my_bool wsrep_restart_slave; // Should mysql slave thread be + // restarted, when node joins back? +my_bool wsrep_desync; // De(re)synchronize the node from the + // cluster +ulonglong wsrep_mode; +bool wsrep_service_started; // If Galera was initialized +long wsrep_slave_threads; // No. of slave appliers threads +ulong wsrep_retry_autocommit; // Retry aborted autocommit trx +ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) size +ulong wsrep_max_ws_rows; // Max number of rows in ws +ulong wsrep_forced_binlog_format= BINLOG_FORMAT_UNSPEC; +ulong wsrep_mysql_replication_bundle; + +bool wsrep_gtid_mode; // Enable WSREP native GTID support +Wsrep_gtid_server wsrep_gtid_server; +uint wsrep_gtid_domain_id=0; // Domain id on above structure + +/* Other configuration variables and their default values. */ +my_bool wsrep_incremental_data_collection= 0; // Incremental data collection +my_bool wsrep_restart_slave_activated= 0; // Node has dropped, and slave + // restart will be needed +bool wsrep_new_cluster= false; // Bootstrap the cluster? +int wsrep_slave_count_change= 0; // No. of appliers to stop/start +int wsrep_to_isolation= 0; // No. of active TO isolation threads +long wsrep_max_protocol_version= 4; // Maximum protocol version to use +long int wsrep_protocol_version= wsrep_max_protocol_version; +ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES; + // unit for fragment size +ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE; +uint wsrep_ignore_apply_errors= 0; + +std::atomic wsrep_thread_create_failed; + +/* + * End configuration options + */ + +/* + * Cached variables + */ + +// Whether the Galera write-set replication provider is set +// wsrep_provider && strcmp(wsrep_provider, WSREP_NONE) +bool WSREP_PROVIDER_EXISTS_; + +// Whether the Galera write-set replication is enabled +// global_system_variables.wsrep_on && WSREP_PROVIDER_EXISTS_ +bool WSREP_ON_; + +/* + * Other wsrep global variables. + */ + +mysql_mutex_t LOCK_wsrep_ready; +mysql_cond_t COND_wsrep_ready; +mysql_mutex_t LOCK_wsrep_sst; +mysql_cond_t COND_wsrep_sst; +mysql_mutex_t LOCK_wsrep_sst_init; +mysql_cond_t COND_wsrep_sst_init; +mysql_mutex_t LOCK_wsrep_replaying; +mysql_cond_t COND_wsrep_replaying; +mysql_mutex_t LOCK_wsrep_slave_threads; +mysql_cond_t COND_wsrep_slave_threads; +mysql_mutex_t LOCK_wsrep_gtid_wait_upto; +mysql_mutex_t LOCK_wsrep_cluster_config; +mysql_mutex_t LOCK_wsrep_desync; +mysql_mutex_t LOCK_wsrep_config_state; +mysql_mutex_t LOCK_wsrep_group_commit; +mysql_mutex_t LOCK_wsrep_SR_pool; +mysql_mutex_t LOCK_wsrep_SR_store; +mysql_mutex_t LOCK_wsrep_joiner_monitor; +mysql_mutex_t LOCK_wsrep_donor_monitor; +mysql_cond_t COND_wsrep_joiner_monitor; +mysql_cond_t COND_wsrep_donor_monitor; + +int wsrep_replaying= 0; +ulong wsrep_running_threads = 0; // # of currently running wsrep + // # threads +ulong wsrep_running_applier_threads = 0; // # of running applier threads +ulong wsrep_running_rollbacker_threads = 0; // # of running + // # rollbacker threads +ulong my_bind_addr; + +#ifdef HAVE_PSI_INTERFACE +PSI_mutex_key + key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, + key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init, + key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_gtid_wait_upto, + key_LOCK_wsrep_desync, + key_LOCK_wsrep_config_state, key_LOCK_wsrep_cluster_config, + key_LOCK_wsrep_group_commit, + key_LOCK_wsrep_SR_pool, + key_LOCK_wsrep_SR_store, + key_LOCK_wsrep_thd_queue, + key_LOCK_wsrep_joiner_monitor, + key_LOCK_wsrep_donor_monitor; + +PSI_cond_key key_COND_wsrep_thd, + key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, + key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread, + key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads, key_COND_wsrep_gtid_wait_upto, + key_COND_wsrep_joiner_monitor, key_COND_wsrep_donor_monitor; + +PSI_file_key key_file_wsrep_gra_log; + +static PSI_mutex_info wsrep_mutexes[]= +{ + { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0}, + { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_gtid_wait_upto, "LOCK_wsrep_gtid_wait_upto", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_cluster_config, "LOCK_wsrep_cluster_config", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_group_commit, "LOCK_wsrep_group_commit", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_SR_store, "LOCK_wsrep_SR_store", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_joiner_monitor, "LOCK_wsrep_joiner_monitor", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_donor_monitor, "LOCK_wsrep_donor_monitor", PSI_FLAG_GLOBAL} +}; + +static PSI_cond_info wsrep_conds[]= +{ + { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0}, + { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0}, + { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_gtid_wait_upto, "COND_wsrep_gtid_wait_upto", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_joiner_monitor, "COND_wsrep_joiner_monitor", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_donor_monitor, "COND_wsrep_donor_monitor", PSI_FLAG_GLOBAL} +}; + +static PSI_file_info wsrep_files[]= +{ + { &key_file_wsrep_gra_log, "wsrep_gra_log", 0} +}; + +PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor, + key_wsrep_rollbacker, key_wsrep_applier, + key_wsrep_sst_joiner_monitor, key_wsrep_sst_donor_monitor; + +static PSI_thread_info wsrep_threads[]= +{ + {&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL}, + {&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL}, + {&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL}, + {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}, + {&key_wsrep_sst_joiner_monitor, "wsrep_sst_joiner_monitor", PSI_FLAG_GLOBAL}, + {&key_wsrep_sst_donor_monitor, "wsrep_sst_donor_monitor", PSI_FLAG_GLOBAL} +}; + +#endif /* HAVE_PSI_INTERFACE */ + +my_bool wsrep_inited= 0; // initialized ? + +static wsrep_uuid_t node_uuid= WSREP_UUID_UNDEFINED; +static char cluster_uuid_str[40]= { 0, }; + +static char provider_name[256]= { 0, }; +static char provider_version[256]= { 0, }; +static char provider_vendor[256]= { 0, }; + +/* + * Wsrep status variables. LOCK_status must be locked When modifying + * these variables, + */ +my_bool wsrep_connected = FALSE; +my_bool wsrep_ready = FALSE; +const char* wsrep_cluster_state_uuid= cluster_uuid_str; +long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED; +const char* wsrep_cluster_status = "Disconnected"; +long wsrep_cluster_size = 0; +long wsrep_local_index = -1; +long long wsrep_local_bf_aborts = 0; +const char* wsrep_provider_name = provider_name; +const char* wsrep_provider_version = provider_version; +const char* wsrep_provider_vendor = provider_vendor; +char* wsrep_provider_capabilities = NULL; +char* wsrep_cluster_capabilities = NULL; +/* End wsrep status variables */ + +wsp::Config_state *wsrep_config_state; + +void WSREP_LOG(void (*fun)(const char* fmt, ...), const char* fmt, ...) +{ + /* Allocate short buffer from stack. If the vsnprintf() return value + indicates that the message was truncated, a new buffer will be allocated + dynamically and the message will be reprinted. */ + char msg[128] = {'\0'}; + va_list arglist; + va_start(arglist, fmt); + int n= vsnprintf(msg, sizeof(msg), fmt, arglist); + va_end(arglist); + if (n < 0) + { + sql_print_warning("WSREP: Printing message failed"); + } + else if (n < (int)sizeof(msg)) + { + fun("WSREP: %s", msg); + } + else + { + size_t dynbuf_size= std::max(n, 4096); + char* dynbuf= (char*) my_malloc(PSI_NOT_INSTRUMENTED, dynbuf_size, MYF(0)); + if (dynbuf) + { + va_start(arglist, fmt); + (void)vsnprintf(&dynbuf[0], dynbuf_size - 1, fmt, arglist); + va_end(arglist); + dynbuf[dynbuf_size - 1] = '\0'; + fun("WSREP: %s", &dynbuf[0]); + my_free(dynbuf); + } + else + { + /* Memory allocation for vector failed, print truncated message. */ + fun("WSREP: %s", msg); + } + } +} + + +wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; +wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; + +/* + */ +Wsrep_schema *wsrep_schema= 0; + +static void wsrep_log_cb(wsrep::log::level level, + const char*, const char *msg) +{ + /* + Silence all wsrep related logging from lib and provider if + wsrep is not enabled. + */ + if (!WSREP_ON) return; + + switch (level) { + case wsrep::log::info: + WSREP_INFO("%s", msg); + break; + case wsrep::log::warning: + WSREP_WARN("%s", msg); + break; + case wsrep::log::error: + WSREP_ERROR("%s", msg); + break; + case wsrep::log::debug: + WSREP_DEBUG("%s", msg); + break; + case wsrep::log::unknown: + WSREP_UNKNOWN("%s", msg); + break; + } +} + +void wsrep_init_gtid() +{ + wsrep_server_gtid_t stored_gtid= wsrep_get_SE_checkpoint(); + // Domain id may have changed, use the one + // received during state transfer. + stored_gtid.domain_id= wsrep_gtid_server.domain_id; + if (stored_gtid.server_id == 0) + { + rpl_gtid wsrep_last_gtid; + if (mysql_bin_log.is_open() && + mysql_bin_log.lookup_domain_in_binlog_state(stored_gtid.domain_id, + &wsrep_last_gtid)) + { + stored_gtid.server_id= wsrep_last_gtid.server_id; + stored_gtid.seqno= wsrep_last_gtid.seq_no; + } + else + { + stored_gtid.server_id= global_system_variables.server_id; + stored_gtid.seqno= 0; + } + } + wsrep_gtid_server.gtid(stored_gtid); +} + +bool wsrep_get_binlog_gtid_seqno(wsrep_server_gtid_t& gtid) +{ + rpl_gtid binlog_gtid; + int ret= 0; + if (mysql_bin_log.is_open() && + mysql_bin_log.find_in_binlog_state(gtid.domain_id, + gtid.server_id, + &binlog_gtid)) + { + gtid.domain_id= binlog_gtid.domain_id; + gtid.server_id= binlog_gtid.server_id; + gtid.seqno= binlog_gtid.seq_no; + ret= 1; + } + return ret; +} + +bool wsrep_check_gtid_seqno(const uint32& domain, const uint32& server, + uint64& seqno) +{ + if (domain == wsrep_gtid_server.domain_id && + server == wsrep_gtid_server.server_id) + { + if (wsrep_gtid_server.seqno_committed() < seqno) return 1; + return 0; + } + return 0; +} + +void wsrep_init_sidno(const wsrep::id& uuid) +{ + /* + Protocol versions starting from 4 use group gtid as it is. + For lesser protocol versions generate new Sid map entry from inverted + uuid. + */ + rpl_gtid sid; + if (wsrep_protocol_version >= 4) + { + memcpy((void*)&sid, (const uchar*)uuid.data(),16); + } + else + { + wsrep_uuid_t ltid_uuid; + for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i) + { + ltid_uuid.data[i]= ~((const uchar*)uuid.data())[i]; + } + memcpy((void*)&sid, (const uchar*)ltid_uuid.data,16); + } +#ifdef GTID_SUPPORT + global_sid_lock->wrlock(); + wsrep_sidno= global_sid_map->add_sid(sid); + WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno); + global_sid_lock->unlock(); +#endif +} + +void wsrep_init_schema() +{ + DBUG_ASSERT(!wsrep_schema); + + WSREP_INFO("wsrep_init_schema_and_SR %p", wsrep_schema); + if (!wsrep_schema) + { + wsrep_schema= new Wsrep_schema(); + if (wsrep_schema->init()) + { + WSREP_ERROR("Failed to init wsrep schema"); + unireg_abort(1); + } + // If we are bootstraping new cluster we should + // clear allowlist table and populate it from variable + if (wsrep_new_cluster) + { + wsrep_schema->clear_allowlist(); + std::vector ip_allowlist; + if (wsrep_split_allowlist(ip_allowlist)) + { + wsrep_schema->store_allowlist(ip_allowlist); + } + } + } +} + +void wsrep_deinit_schema() +{ + delete wsrep_schema; + wsrep_schema= 0; +} + +void wsrep_recover_sr_from_storage(THD *orig_thd) +{ + switch (wsrep_SR_store_type) + { + case WSREP_SR_STORE_TABLE: + if (!wsrep_schema) + { + WSREP_ERROR("Wsrep schema not initialized when trying to recover " + "streaming transactions: wsrep_on %d", WSREP_ON); + trans_commit(orig_thd); + } + if (wsrep_schema->recover_sr_transactions(orig_thd)) + { + WSREP_ERROR("Failed to recover SR transactions from schema: wsrep_on : %d", WSREP_ON); + trans_commit(orig_thd); + } + break; + default: + /* */ + WSREP_ERROR("Unsupported wsrep SR store type: %lu wsrep_on: %d", + wsrep_SR_store_type, WSREP_ON); + trans_commit(orig_thd); + break; + } +} + +/** Export the WSREP provider's capabilities as a human readable string. + * The result is saved in a dynamically allocated string of the form: + * :cap1:cap2:cap3: + */ +static void wsrep_capabilities_export(wsrep_cap_t const cap, char** str) +{ + static const char* names[] = + { + /* Keep in sync with wsrep/wsrep_api.h WSREP_CAP_* macros. */ + "MULTI_MASTER", + "CERTIFICATION", + "PARALLEL_APPLYING", + "TRX_REPLAY", + "ISOLATION", + "PAUSE", + "CAUSAL_READS", + "CAUSAL_TRX", + "INCREMENTAL_WRITESET", + "SESSION_LOCKS", + "DISTRIBUTED_LOCKS", + "CONSISTENCY_CHECK", + "UNORDERED", + "ANNOTATION", + "PREORDERED", + "STREAMING", + "SNAPSHOT", + "NBO", + }; + + std::string s; + for (size_t i= 0; i < sizeof(names) / sizeof(names[0]); ++i) + { + if (cap & (1ULL << i)) + { + if (s.empty()) + { + s= ":"; + } + s += names[i]; + s += ":"; + } + } + + /* A read from the string pointed to by *str may be started at any time, + * so it must never point to free(3)d memory or non '\0' terminated string. */ + + char* const previous= *str; + + *str= strdup(s.c_str()); + + if (previous != NULL) + { + free(previous); + } +} + +/* Verifies that SE position is consistent with the group position + * and initializes other variables */ +void wsrep_verify_SE_checkpoint(const wsrep_uuid_t& uuid, + wsrep_seqno_t const seqno) +{ +} + +/* + Wsrep is considered ready if + 1) Provider is not loaded (native mode) + 2) Server has reached synced state + 3) Server is in joiner mode and mysqldump SST method has been + specified + See Wsrep_server_service::log_state_change() for further details. + */ +my_bool wsrep_ready_get (void) +{ + if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); + my_bool ret= wsrep_ready; + mysql_mutex_unlock (&LOCK_wsrep_ready); + return ret; +} + +int wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff) +{ + var->type= SHOW_MY_BOOL; + var->value= buff; + *((my_bool *)buff)= wsrep_ready_get(); + return 0; +} + +void wsrep_update_cluster_state_uuid(const char* uuid) +{ + strncpy(cluster_uuid_str, uuid, sizeof(cluster_uuid_str) - 1); +} + +static void wsrep_init_position() +{ +} + +/**************************************************************************** + Helpers for wsrep_init() + ****************************************************************************/ +static std::string wsrep_server_name() +{ + std::string ret(wsrep_node_name ? wsrep_node_name : ""); + return ret; +} + +static std::string wsrep_server_id() +{ + /* using empty server_id, which enables view change handler to + set final server_id later on + */ + std::string ret(""); + return ret; +} + +static std::string wsrep_server_node_address() +{ + + std::string ret; + if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0) + wsrep_data_home_dir= mysql_real_data_home; + + /* Initialize node address */ + if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) + { + char node_addr[512]= {0, }; + const size_t node_addr_max= sizeof(node_addr) - 1; + size_t guess_ip_ret= wsrep_guess_ip(node_addr, node_addr_max); + if (!(guess_ip_ret > 0 && guess_ip_ret < node_addr_max)) + { + WSREP_WARN("Failed to guess base node address. Set it explicitly via " + "wsrep_node_address."); + } + else + { + ret= node_addr; + } + } + else + { + ret= wsrep_node_address; + } + return ret; +} + +static std::string wsrep_server_incoming_address() +{ + std::string ret; + const std::string node_addr(wsrep_server_node_address()); + char inc_addr[512]= { 0, }; + size_t const inc_addr_max= sizeof (inc_addr); + + /* + In case wsrep_node_incoming_address is either not set or set to AUTO, + we need to use mysqld's my_bind_addr_str:mysqld_port, lastly fallback + to wsrep_node_address' value if mysqld's bind-address is not set either. + */ + if ((!wsrep_node_incoming_address || + !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) + { + bool is_ipv6= false; + unsigned int my_bind_ip= INADDR_ANY; // default if not set + + if (my_bind_addr_str && strlen(my_bind_addr_str) && + strcmp(my_bind_addr_str, "*") != 0) + { + my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6); + } + + if (INADDR_ANY != my_bind_ip) + { + /* + If its a not a valid address, leave inc_addr as empty string. mysqld + is not listening for client connections on network interfaces. + */ + if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip) + { + const char *fmt= (is_ipv6) ? "[%s]:%u" : "%s:%u"; + snprintf(inc_addr, inc_addr_max, fmt, my_bind_addr_str, mysqld_port); + } + } + else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */ + { + if (node_addr.size()) + { + size_t const ip_len_mdb= wsrep_host_len(node_addr.c_str(), node_addr.size()); + if (ip_len_mdb + 7 /* :55555\0 */ < inc_addr_max) + { + memcpy (inc_addr, node_addr.c_str(), ip_len_mdb); + snprintf(inc_addr + ip_len_mdb, inc_addr_max - ip_len_mdb, ":%u", + (int)mysqld_port); + } + else + { + WSREP_WARN("Guessing address for incoming client connections: " + "address too long."); + inc_addr[0]= '\0'; + } + } + + if (!strlen(inc_addr)) + { + WSREP_WARN("Guessing address for incoming client connections failed. " + "Try setting wsrep_node_incoming_address explicitly."); + WSREP_INFO("Node addr: %s", node_addr.c_str()); + } + } + } + else + { + wsp::Address addr(wsrep_node_incoming_address); + + if (!addr.is_valid()) + { + WSREP_WARN("Could not parse wsrep_node_incoming_address : %s", + wsrep_node_incoming_address); + goto done; + } + + /* + In case port is not specified in wsrep_node_incoming_address, we use + mysqld_port. + Note that we might get here before we execute set_ports(). + */ + int local_port= (addr.get_port() > 0) ? addr.get_port() : (int) mysqld_port; + if (!local_port) + local_port= MYSQL_PORT; + const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u"; + + snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), local_port); + } + + done: + if (!strlen(inc_addr)) + ret= wsrep_node_incoming_address; + else + ret= inc_addr; + WSREP_DEBUG("wsrep_incoming_address = %s", ret.c_str()); + return ret; +} + +static std::string wsrep_server_working_dir() +{ + std::string ret; + if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0) + { + ret= mysql_real_data_home; + } + else + { + ret= wsrep_data_home_dir; + } + return ret; +} + +static wsrep::gtid wsrep_server_initial_position() +{ + wsrep::gtid ret; + WSREP_DEBUG("Server initial position: %s", wsrep_start_position); + std::istringstream is(wsrep_start_position); + is >> ret; + return ret; +} + +/* + Intitialize provider specific status variables + */ +static void wsrep_init_provider_status_variables() +{ + wsrep_inited= 1; + const wsrep::provider& provider= + Wsrep_server_state::instance().provider(); + strncpy(provider_name, + provider.name().c_str(), sizeof(provider_name) - 1); + strncpy(provider_version, + provider.version().c_str(), sizeof(provider_version) - 1); + strncpy(provider_vendor, + provider.vendor().c_str(), sizeof(provider_vendor) - 1); +} + +int wsrep_init_server() +{ + wsrep::log::logger_fn(wsrep_log_cb); + try + { + Wsrep_status::init_once(wsrep_status_file); + + std::string server_name; + std::string server_id; + std::string node_address; + std::string incoming_address; + std::string working_dir; + wsrep::gtid initial_position; + + server_name= wsrep_server_name(); + server_id= wsrep_server_id(); + node_address= wsrep_server_node_address(); + incoming_address= wsrep_server_incoming_address(); + working_dir= wsrep_server_working_dir(); + initial_position= wsrep_server_initial_position(); + + Wsrep_server_state::init_once(server_name, + incoming_address, + node_address, + working_dir, + initial_position, + wsrep_max_protocol_version); + Wsrep_server_state::instance().debug_log_level(wsrep_debug); + } + catch (const wsrep::runtime_error& e) + { + WSREP_ERROR("Failed to init wsrep server %s", e.what()); + return 1; + } + catch (const std::exception& e) + { + WSREP_ERROR("Failed to init wsrep server %s", e.what()); + } + return 0; +} + +void wsrep_init_globals() +{ + wsrep_init_sidno(Wsrep_server_state::instance().connected_gtid().id()); + /* Recover last written wsrep gtid */ + wsrep_init_gtid(); + if (wsrep_new_cluster) + { + /* Start with provided domain_id & server_id found in configuration */ + wsrep_server_gtid_t new_gtid; + new_gtid.domain_id= wsrep_gtid_domain_id; + new_gtid.server_id= global_system_variables.server_id; + new_gtid.seqno= 0; + /* Try to search for domain_id and server_id combination in binlog if found continue from last seqno */ + wsrep_get_binlog_gtid_seqno(new_gtid); + wsrep_gtid_server.gtid(new_gtid); + } + else + { + if (wsrep_gtid_mode && wsrep_gtid_server.server_id != global_system_variables.server_id) + { + WSREP_WARN("Ignoring server id for non bootstrap node."); + } + } + wsrep_init_schema(); + if (WSREP_ON) + { + Wsrep_server_state::instance().initialized(); + } +} + +void wsrep_deinit_server() +{ + wsrep_deinit_schema(); + Wsrep_server_state::destroy(); + Wsrep_status::destroy(); + wsrep_free_status_vars(); +} + +int wsrep_init() +{ + assert(wsrep_provider); + + wsrep_init_position(); + wsrep_sst_auth_init(); + + if (strlen(wsrep_provider)== 0 || + !strcmp(wsrep_provider, WSREP_NONE)) + { + // enable normal operation in case no provider is specified + global_system_variables.wsrep_on= 0; + int err= Wsrep_server_state::instance().load_provider(wsrep_provider, wsrep_provider_options ? wsrep_provider_options : ""); + if (err) + { + DBUG_PRINT("wsrep",("wsrep::init() failed: %d", err)); + WSREP_ERROR("wsrep::init() failed: %d, must shutdown", err); + } + else + wsrep_init_provider_status_variables(); + return err; + } + + if (wsrep_gtid_mode && opt_bin_log && !opt_log_slave_updates) + { + WSREP_ERROR("Option --log-slave-updates is required if " + "binlog is enabled, GTID mode is on and wsrep provider " + "is specified"); + return 1; + } + + if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0) + wsrep_data_home_dir= mysql_real_data_home; + + Wsrep_server_state::init_provider_services(); + if (Wsrep_server_state::instance().load_provider( + wsrep_provider, + wsrep_provider_options, + Wsrep_server_state::instance().provider_services())) + { + WSREP_ERROR("Failed to load provider"); + Wsrep_server_state::deinit_provider_services(); + return 1; + } + + if (!wsrep_provider_is_SR_capable() && + global_system_variables.wsrep_trx_fragment_size > 0) + { + WSREP_ERROR("The WSREP provider (%s) does not support streaming " + "replication but wsrep_trx_fragment_size is set to a " + "value other than 0 (%llu). Cannot continue. Either set " + "wsrep_trx_fragment_size to 0 or use wsrep_provider that " + "supports streaming replication.", + wsrep_provider, global_system_variables.wsrep_trx_fragment_size); + Wsrep_server_state::instance().unload_provider(); + Wsrep_server_state::deinit_provider_services(); + return 1; + } + + /* Now WSREP is fully initialized */ + global_system_variables.wsrep_on= 1; + WSREP_ON_= wsrep_provider && strcmp(wsrep_provider, WSREP_NONE); + wsrep_service_started= 1; + + wsrep_init_provider_status_variables(); + wsrep_capabilities_export(Wsrep_server_state::instance().provider().capabilities(), + &wsrep_provider_capabilities); + + WSREP_DEBUG("SR storage init for: %s", + (wsrep_SR_store_type == WSREP_SR_STORE_TABLE) ? "table" : "void"); + + return 0; +} + +/* Initialize wsrep thread LOCKs and CONDs */ +void wsrep_thr_init() +{ + DBUG_ENTER("wsrep_thr_init"); + wsrep_config_state= new wsp::Config_state; +#ifdef HAVE_PSI_INTERFACE + mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes)); + mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds)); + mysql_file_register("sql", wsrep_files, array_elements(wsrep_files)); + mysql_thread_register("sql", wsrep_threads, array_elements(wsrep_threads)); +#endif + + mysql_mutex_init(key_LOCK_wsrep_ready, &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL); + mysql_mutex_init(key_LOCK_wsrep_sst, &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL); + mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL); + mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); + mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_slave_threads, &COND_wsrep_slave_threads, NULL); + mysql_mutex_init(key_LOCK_wsrep_gtid_wait_upto, &LOCK_wsrep_gtid_wait_upto, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_cluster_config, &LOCK_wsrep_cluster_config, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_group_commit, &LOCK_wsrep_group_commit, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_SR_pool, + &LOCK_wsrep_SR_pool, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_SR_store, + &LOCK_wsrep_SR_store, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_joiner_monitor, + &LOCK_wsrep_joiner_monitor, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_donor_monitor, + &LOCK_wsrep_donor_monitor, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_joiner_monitor, &COND_wsrep_joiner_monitor, NULL); + mysql_cond_init(key_COND_wsrep_donor_monitor, &COND_wsrep_donor_monitor, NULL); + + DBUG_VOID_RETURN; +} + +void wsrep_init_startup (bool sst_first) +{ + if (wsrep_init()) unireg_abort(1); + + /* + Pre-initialize global_system_variables.table_plugin with a dummy engine + (placeholder) required during the initialization of wsrep threads (THDs). + (see: plugin_thdvar_init()) + Note: This only needs to be done for rsync & mariabackup based SST methods. + In case of mysqldump SST method, the wsrep threads are created after the + server plugins & global system variables are initialized. + */ + if (wsrep_before_SE()) + wsrep_plugins_pre_init(); + + /* Skip replication start if dummy wsrep provider is loaded */ + if (!strcmp(wsrep_provider, WSREP_NONE)) return; + + /* Skip replication start if no cluster address */ + if (!wsrep_cluster_address_exists()) return; + + /* + Read value of wsrep_new_cluster before wsrep_start_replication(), + the value is reset to FALSE inside wsrep_start_replication. + */ + if (!wsrep_start_replication(wsrep_cluster_address)) unireg_abort(1); + + wsrep_create_rollbacker(); + wsrep_create_appliers(1); + + Wsrep_server_state& server_state= Wsrep_server_state::instance(); + /* + If the SST happens before server initialization, wait until the server + state reaches initializing. This indicates that + either SST was not necessary or SST has been delivered. + + With mysqldump SST (!sst_first) wait until the server reaches + joiner state and procedd to accepting connections. + */ + int err= 0; + if (sst_first) + { + err= server_state.wait_until_state(Wsrep_server_state::s_initializing); + } + else + { + err= server_state.wait_until_state(Wsrep_server_state::s_joiner); + } + if (err) + { + WSREP_ERROR("Wsrep startup was interrupted"); + unireg_abort(1); + } +} + + +void wsrep_deinit(bool free_options) +{ + DBUG_ASSERT(wsrep_inited == 1); + WSREP_DEBUG("wsrep_deinit"); + + Wsrep_server_state::instance().unload_provider(); + Wsrep_server_state::deinit_provider_services(); + + provider_name[0]= '\0'; + provider_version[0]= '\0'; + provider_vendor[0]= '\0'; + + wsrep_inited= 0; + + if (wsrep_provider_capabilities != NULL) + { + char* p= wsrep_provider_capabilities; + wsrep_provider_capabilities= NULL; + free(p); + } + + if (free_options) + { + wsrep_sst_auth_free(); + } +} + +/* Destroy wsrep thread LOCKs and CONDs */ +void wsrep_thr_deinit() +{ + if (!wsrep_config_state) + return; // Never initialized + WSREP_DEBUG("wsrep_thr_deinit"); + mysql_mutex_destroy(&LOCK_wsrep_ready); + mysql_cond_destroy(&COND_wsrep_ready); + mysql_mutex_destroy(&LOCK_wsrep_sst); + mysql_cond_destroy(&COND_wsrep_sst); + mysql_mutex_destroy(&LOCK_wsrep_sst_init); + mysql_cond_destroy(&COND_wsrep_sst_init); + mysql_mutex_destroy(&LOCK_wsrep_replaying); + mysql_cond_destroy(&COND_wsrep_replaying); + mysql_mutex_destroy(&LOCK_wsrep_gtid_wait_upto); + mysql_mutex_destroy(&LOCK_wsrep_slave_threads); + mysql_cond_destroy(&COND_wsrep_slave_threads); + mysql_mutex_destroy(&LOCK_wsrep_cluster_config); + mysql_mutex_destroy(&LOCK_wsrep_desync); + mysql_mutex_destroy(&LOCK_wsrep_config_state); + mysql_mutex_destroy(&LOCK_wsrep_group_commit); + mysql_mutex_destroy(&LOCK_wsrep_SR_pool); + mysql_mutex_destroy(&LOCK_wsrep_SR_store); + mysql_mutex_destroy(&LOCK_wsrep_joiner_monitor); + mysql_mutex_destroy(&LOCK_wsrep_donor_monitor); + mysql_cond_destroy(&COND_wsrep_joiner_monitor); + mysql_cond_destroy(&COND_wsrep_donor_monitor); + + delete wsrep_config_state; + wsrep_config_state= 0; // Safety + + if (wsrep_cluster_capabilities != NULL) + { + char* p= wsrep_cluster_capabilities; + wsrep_cluster_capabilities= NULL; + free(p); + } +} + +void wsrep_recover() +{ + char uuid_str[40]; + + if (wsrep_uuid_compare(&local_uuid, &WSREP_UUID_UNDEFINED) == 0 && + local_seqno == -2) + { + wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str)); + WSREP_INFO("Position %s:%lld given at startup, skipping position recovery", + uuid_str, (long long)local_seqno); + return; + } + wsrep::gtid gtid= wsrep_get_SE_checkpoint(); + std::ostringstream oss; + oss << gtid; + if (wsrep_gtid_mode) + { + wsrep_server_gtid_t server_gtid= wsrep_get_SE_checkpoint(); + WSREP_INFO("Recovered position: %s,%d-%d-%llu", oss.str().c_str(), server_gtid.domain_id, + server_gtid.server_id, server_gtid.seqno); + } + else + { + WSREP_INFO("Recovered position: %s", oss.str().c_str()); + } +} + + +void wsrep_stop_replication(THD *thd) +{ + WSREP_INFO("Stop replication by %llu", (thd) ? thd->thread_id : 0); + if (Wsrep_server_state::instance().state() != + Wsrep_server_state::s_disconnected) + { + WSREP_DEBUG("Disconnect provider"); + Wsrep_server_state::instance().disconnect(); + if (Wsrep_server_state::instance().wait_until_state( + Wsrep_server_state::s_disconnected)) + { + WSREP_WARN("Wsrep interrupted while waiting for disconnected state"); + } + } + + /* my connection, should not terminate with wsrep_close_client_connection(), + make transaction to rollback + */ + if (thd && !thd->wsrep_applier) trans_rollback(thd); + wsrep_close_client_connections(TRUE, thd); + + /* wait until appliers have stopped */ + wsrep_wait_appliers_close(thd); + + node_uuid= WSREP_UUID_UNDEFINED; +} + +void wsrep_shutdown_replication() +{ + WSREP_INFO("Shutdown replication"); + if (Wsrep_server_state::instance().state() != wsrep::server_state::s_disconnected) + { + WSREP_DEBUG("Disconnect provider"); + Wsrep_server_state::instance().disconnect(); + if (Wsrep_server_state::instance().wait_until_state( + Wsrep_server_state::s_disconnected)) + { + WSREP_WARN("Wsrep interrupted while waiting for disconnected state"); + } + } + + wsrep_close_client_connections(TRUE); + + /* wait until appliers have stopped */ + wsrep_wait_appliers_close(NULL); + node_uuid= WSREP_UUID_UNDEFINED; + + /* Undocking the thread specific data. */ + set_current_thd(nullptr); +} + +bool wsrep_start_replication(const char *wsrep_cluster_address) +{ + int rcode; + WSREP_DEBUG("wsrep_start_replication"); + + /* + if provider is trivial, don't even try to connect, + but resume local node operation + */ + if (!WSREP_PROVIDER_EXISTS) + { + // enable normal operation in case no provider is specified + return true; + } + + DBUG_ASSERT(wsrep_cluster_address[0]); + + // --wsrep-new-cluster flag is not used, checking wsrep_cluster_address + // it should match gcomm:// only to be considered as bootstrap node. + // This logic is used in galera. + if (!wsrep_new_cluster && + (strlen(wsrep_cluster_address) == 8) && + !strncmp(wsrep_cluster_address, "gcomm://", 8)) + { + wsrep_new_cluster= true; + } + + bool const bootstrap(TRUE == wsrep_new_cluster); + + WSREP_INFO("Start replication"); + + if ((rcode= Wsrep_server_state::instance().connect( + wsrep_cluster_name, + wsrep_cluster_address, + wsrep_sst_donor, + bootstrap))) + { + DBUG_PRINT("wsrep",("wsrep_ptr->connect(%s) failed: %d", + wsrep_cluster_address, rcode)); + WSREP_ERROR("wsrep::connect(%s) failed: %d", + wsrep_cluster_address, rcode); + return false; + } + else + { + try + { + std::string opts= Wsrep_server_state::instance().provider().options(); + wsrep_provider_options_init(opts.c_str()); + } + catch (const wsrep::runtime_error&) + { + WSREP_WARN("Failed to get wsrep options"); + } + } + + return true; +} + +bool wsrep_check_mode (enum_wsrep_mode mask) +{ + return wsrep_mode & mask; +} + +//seconds after which the limit warnings suppression will be activated +#define WSREP_WARNING_ACTIVATION_TIMEOUT 5*60 +//number of limit warnings after which the suppression will be activated +#define WSREP_WARNING_ACTIVATION_THRESHOLD 10 + +enum wsrep_warning_type { + WSREP_DISABLED = 0, + WSREP_REQUIRE_PRIMARY_KEY= 1, + WSREP_REQUIRE_INNODB= 2, + WSREP_REQUIRE_MAX=3, +}; + +static ulonglong wsrep_warning_start_time=0; +static bool wsrep_warning_active[WSREP_REQUIRE_MAX+1]; +static ulonglong wsrep_warning_count[WSREP_REQUIRE_MAX+1]; +static ulonglong wsrep_total_warnings_count=0; + +/** + Auxiliary function to reset the limit of wsrep warnings. + This is done without mutex protection, but this should be good + enough as it doesn't matter if we loose a couple of suppressed + messages or if this is called multiple times. +*/ + +static void wsrep_reset_warnings(ulonglong now) +{ + uint i; + + wsrep_warning_start_time= now; + wsrep_total_warnings_count= 0; + + for (i= 0 ; i < WSREP_REQUIRE_MAX ; i++) + { + wsrep_warning_active[i]= false; + wsrep_warning_count[i]= 0; + } +} + +static const char* wsrep_warning_name(const enum wsrep_warning_type type) +{ + switch(type) + { + case WSREP_REQUIRE_PRIMARY_KEY: + return "WSREP_REQUIRE_PRIMARY_KEY"; break; + case WSREP_REQUIRE_INNODB: + return "WSREP_REQUIRE_INNODB"; break; + default: assert(0); return " "; break; // for compiler + } +} +/** + Auxiliary function to check if the warning statements should be + thrown or suppressed. + + Logic is: + - If we get more than WSREP_WARNING_ACTIVATION_THRESHOLD errors + of one type, that type of errors will be suppressed for + WSREP_WARNING_ACTIVATION_TIMEOUT. + - When the time limit has been reached, all suppressions are reset. + + This means that if one gets many different types of errors, some of them + may be reset less than WSREP_WARNING_ACTIVATION_TIMEOUT. However at + least one error is disabled for this time. + + SYNOPSIS: + @params + warning_type - The type of warning. + + RETURN: + 0 0k to log + 1 Message suppressed +*/ + +static bool wsrep_protect_against_warning_flood( + enum wsrep_warning_type warning_type) +{ + ulonglong count; + ulonglong now= my_interval_timer()/1000000000ULL; + + count= ++wsrep_warning_count[warning_type]; + wsrep_total_warnings_count++; + + /* + INITIALIZING: + If this is the first time this function is called with log warning + enabled, the monitoring the warnings should start. + */ + if (wsrep_warning_start_time == 0) + { + wsrep_reset_warnings(now); + return false; + } + + /* + The following is true if we got too many errors or if the error was + already suppressed + */ + if (count >= WSREP_WARNING_ACTIVATION_THRESHOLD) + { + ulonglong diff_time= (now - wsrep_warning_start_time); + + if (!wsrep_warning_active[warning_type]) + { + /* + ACTIVATION: + We got WSREP_WARNING_ACTIVATION_THRESHOLD warnings in + less than WSREP_WARNING_ACTIVATION_TIMEOUT we activate the + suppression. + */ + if (diff_time <= WSREP_WARNING_ACTIVATION_TIMEOUT) + { + wsrep_warning_active[warning_type]= true; + WSREP_INFO("Suppressing warnings of type '%s' for up to %d seconds because of flooding", + wsrep_warning_name(warning_type), + WSREP_WARNING_ACTIVATION_TIMEOUT); + } + else + { + /* + There is no flooding till now, therefore we restart the monitoring + */ + wsrep_reset_warnings(now); + } + } + else + { + /* This type of warnings was suppressed */ + if (diff_time > WSREP_WARNING_ACTIVATION_TIMEOUT) + { + ulonglong save_count= wsrep_total_warnings_count; + /* Print a suppression note and remove the suppression */ + wsrep_reset_warnings(now); + WSREP_INFO("Suppressed %lu unsafe warnings during " + "the last %d seconds", + save_count, (int) diff_time); + } + } + } + + return wsrep_warning_active[warning_type]; +} + +/** + Auxiliary function to push warning to client and to the error log +*/ +static void wsrep_push_warning(THD *thd, + enum wsrep_warning_type type, + const handlerton *hton, + const TABLE_LIST *tables) +{ + switch(type) + { + case WSREP_REQUIRE_PRIMARY_KEY: + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OPTION_PREVENTS_STATEMENT, + "WSREP: wsrep_mode = REQUIRED_PRIMARY_KEY enabled. " + "Table '%s'.'%s' should have PRIMARY KEY defined.", + tables->db.str, tables->table_name.str); + if (global_system_variables.log_warnings > 1 && + !wsrep_protect_against_warning_flood(type)) + WSREP_WARN("wsrep_mode = REQUIRED_PRIMARY_KEY enabled. " + "Table '%s'.'%s' should have PRIMARY KEY defined", + tables->db.str, tables->table_name.str); + break; + case WSREP_REQUIRE_INNODB: + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OPTION_PREVENTS_STATEMENT, + "WSREP: wsrep_mode = STRICT_REPLICATION enabled. " + "Storage engine %s for table '%s'.'%s' is " + "not supported in Galera", + ha_resolve_storage_engine_name(hton), + tables->db.str, tables->table_name.str); + if (global_system_variables.log_warnings > 1 && + !wsrep_protect_against_warning_flood(type)) + WSREP_WARN("wsrep_mode = STRICT_REPLICATION enabled. " + "Storage engine %s for table '%s'.'%s' is " + "not supported in Galera", + ha_resolve_storage_engine_name(hton), + tables->db.str, tables->table_name.str); + break; + + default: assert(0); break; + } +} + +bool wsrep_check_mode_after_open_table (THD *thd, + const handlerton *hton, + TABLE_LIST *tables) +{ + enum_sql_command sql_command= thd->lex->sql_command; + bool is_dml_stmt= thd->get_command() != COM_STMT_PREPARE && + (sql_command == SQLCOM_INSERT || + sql_command == SQLCOM_INSERT_SELECT || + sql_command == SQLCOM_REPLACE || + sql_command == SQLCOM_REPLACE_SELECT || + sql_command == SQLCOM_UPDATE || + sql_command == SQLCOM_UPDATE_MULTI || + sql_command == SQLCOM_LOAD || + sql_command == SQLCOM_DELETE); + + if (!is_dml_stmt) + return true; + + const legacy_db_type db_type= hton->db_type; + bool replicate= ((db_type == DB_TYPE_MYISAM && wsrep_check_mode(WSREP_MODE_REPLICATE_MYISAM)) || + (db_type == DB_TYPE_ARIA && wsrep_check_mode(WSREP_MODE_REPLICATE_ARIA))); + TABLE *tbl= tables->table; + + if (replicate) + { + /* It is not recommended to replicate MyISAM as it lacks rollback feature + but if user demands then actions are replicated using TOI. + Following code will kick-start the TOI but this has to be done only once + per statement. + Note: kick-start will take-care of creating isolation key for all tables + involved in the list (provided all of them are MYISAM or Aria tables). */ + if (!is_stat_table(&tables->db, &tables->alias)) + { + if (tbl->s->primary_key == MAX_KEY && + wsrep_check_mode(WSREP_MODE_REQUIRED_PRIMARY_KEY)) + { + /* Other replicated table doesn't have explicit primary-key defined. */ + wsrep_push_warning(thd, WSREP_REQUIRE_PRIMARY_KEY, hton, tables); + } + + wsrep_before_rollback(thd, true); + wsrep_after_rollback(thd, true); + wsrep_after_statement(thd); + WSREP_TO_ISOLATION_BEGIN(NULL, NULL, (tables)); + } + } else if (db_type != DB_TYPE_UNKNOWN && + db_type != DB_TYPE_PERFORMANCE_SCHEMA) + { + bool is_system_db= (tbl && + ((strcmp(tbl->s->db.str, "mysql") == 0) || + (strcmp(tbl->s->db.str, "information_schema") == 0))); + + if (!is_system_db && + !is_temporary_table(tables)) + { + + if (db_type != DB_TYPE_INNODB && + wsrep_check_mode(WSREP_MODE_STRICT_REPLICATION)) + { + /* Table is not an InnoDB table and strict replication is requested*/ + wsrep_push_warning(thd, WSREP_REQUIRE_INNODB, hton, tables); + } + + if (tbl->s->primary_key == MAX_KEY && + db_type == DB_TYPE_INNODB && + wsrep_check_mode(WSREP_MODE_REQUIRED_PRIMARY_KEY)) + { + /* InnoDB table doesn't have explicit primary-key defined. */ + wsrep_push_warning(thd, WSREP_REQUIRE_PRIMARY_KEY, hton, tables); + } + + if (db_type != DB_TYPE_INNODB && + thd->variables.sql_log_bin == 1 && + wsrep_check_mode(WSREP_MODE_DISALLOW_LOCAL_GTID)) + { + /* Table is not an InnoDB table and local GTIDs are disallowed */ + my_error(ER_GALERA_REPLICATION_NOT_SUPPORTED, MYF(0)); + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OPTION_PREVENTS_STATEMENT, + "You can't execute statements that would generate local " + "GTIDs when wsrep_mode = DISALLOW_LOCAL_GTID is set. " + "Try disabling binary logging with SET sql_log_bin=0 " + "to execute this statement."); + goto wsrep_error_label; + } + } + } + + return true; + +wsrep_error_label: + return false; +} + +bool wsrep_check_mode_before_cmd_execute (THD *thd) +{ + bool ret= true; + if (wsrep_check_mode(WSREP_MODE_BINLOG_ROW_FORMAT_ONLY) && + !thd->is_current_stmt_binlog_format_row() && is_update_query(thd->lex->sql_command)) + { + my_error(ER_GALERA_REPLICATION_NOT_SUPPORTED, MYF(0)); + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OPTION_PREVENTS_STATEMENT, + "WSREP: wsrep_mode = BINLOG_ROW_FORMAT_ONLY enabled. Only ROW binlog format is supported."); + ret= false; + } + if (wsrep_check_mode(WSREP_MODE_REQUIRED_PRIMARY_KEY) && + thd->lex->sql_command == SQLCOM_CREATE_TABLE) + { + Key *key; + List_iterator key_iterator(thd->lex->alter_info.key_list); + bool primary_key_found= false; + while ((key= key_iterator++)) + { + if (key->type == Key::PRIMARY) + { + primary_key_found= true; + break; + } + } + if (!primary_key_found) + { + my_error(ER_GALERA_REPLICATION_NOT_SUPPORTED, MYF(0)); + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OPTION_PREVENTS_STATEMENT, + "WSREP: wsrep_mode = REQUIRED_PRIMARY_KEY enabled. Table should have PRIMARY KEY defined."); + ret= false; + } + } + return ret; +} + +bool wsrep_must_sync_wait (THD* thd, uint mask) +{ + bool ret= 0; + if (thd->variables.wsrep_on) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + ret= (thd->variables.wsrep_sync_wait & mask) && + thd->wsrep_client_thread && + WSREP_ON && + !(thd->variables.wsrep_dirty_reads && + !is_update_query(thd->lex->sql_command)) && + !thd->in_active_multi_stmt_transaction() && + thd->wsrep_trx().state() != + wsrep::transaction::s_replaying && + thd->wsrep_cs().sync_wait_gtid().is_undefined(); + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + return ret; +} + +bool wsrep_sync_wait (THD* thd, uint mask) +{ + if (wsrep_must_sync_wait(thd, mask)) + { + WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait= %u, " + "mask= %u, thd->variables.wsrep_on= %d", + thd->variables.wsrep_sync_wait, mask, + thd->variables.wsrep_on); + /* + This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 + TODO: modify to check if thd has locked any rows. + */ + if (thd->wsrep_cs().sync_wait(-1)) + { + const char* msg; + int err; + + /* + Possibly relevant error codes: + ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY, + ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET, + ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED + */ + + switch (thd->wsrep_cs().current_error()) + { + case wsrep::e_not_supported_error: + msg= "synchronous reads by wsrep backend. " + "Please unset wsrep_causal_reads variable."; + err= ER_NOT_SUPPORTED_YET; + break; + default: + msg= "Synchronous wait failed."; + err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed + // with ER_LOCK_WAIT_TIMEOUT + } + + my_error(err, MYF(0), msg); + + return true; + } + } + + return false; +} + +enum wsrep::provider::status +wsrep_sync_wait_upto (THD* thd, + wsrep_gtid_t* upto, + int timeout) +{ + DBUG_ASSERT(upto); + enum wsrep::provider::status ret; + if (upto) + { + wsrep::gtid upto_gtid(wsrep::id(upto->uuid.data, sizeof(upto->uuid.data)), + wsrep::seqno(upto->seqno)); + ret= Wsrep_server_state::instance().wait_for_gtid(upto_gtid, timeout); + } + else + { + ret= Wsrep_server_state::instance().causal_read(timeout).second; + } + WSREP_DEBUG("wsrep_sync_wait_upto: %d", ret); + return ret; +} + +bool wsrep_is_show_query(enum enum_sql_command command) +{ + DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); + return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; +} + +static bool wsrep_is_diagnostic_query(enum enum_sql_command command) +{ + assert(command >= 0 && command <= SQLCOM_END); + return (sql_command_flags[command] & CF_DIAGNOSTIC_STMT) != 0; +} + +static enum enum_wsrep_sync_wait +wsrep_sync_wait_mask_for_command(enum enum_sql_command command) +{ + switch (command) + { + case SQLCOM_SELECT: + case SQLCOM_CHECKSUM: + return WSREP_SYNC_WAIT_BEFORE_READ; + case SQLCOM_DELETE: + case SQLCOM_DELETE_MULTI: + case SQLCOM_UPDATE: + case SQLCOM_UPDATE_MULTI: + return WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE; + case SQLCOM_REPLACE: + case SQLCOM_INSERT: + case SQLCOM_REPLACE_SELECT: + case SQLCOM_INSERT_SELECT: + return WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE; + default: + if (wsrep_is_diagnostic_query(command)) + { + return WSREP_SYNC_WAIT_NONE; + } + if (wsrep_is_show_query(command)) + { + switch (command) + { + case SQLCOM_SHOW_PROFILE: + case SQLCOM_SHOW_PROFILES: + case SQLCOM_SHOW_SLAVE_HOSTS: + case SQLCOM_SHOW_RELAYLOG_EVENTS: + case SQLCOM_SHOW_SLAVE_STAT: + case SQLCOM_SHOW_BINLOG_STAT: + case SQLCOM_SHOW_ENGINE_STATUS: + case SQLCOM_SHOW_ENGINE_MUTEX: + case SQLCOM_SHOW_ENGINE_LOGS: + case SQLCOM_SHOW_PROCESSLIST: + case SQLCOM_SHOW_PRIVILEGES: + return WSREP_SYNC_WAIT_NONE; + default: + return WSREP_SYNC_WAIT_BEFORE_SHOW; + } + } + } + return WSREP_SYNC_WAIT_NONE; +} + +bool wsrep_sync_wait(THD* thd, enum enum_sql_command command) +{ + bool res = false; + if (WSREP_CLIENT(thd) && thd->variables.wsrep_sync_wait) + res = wsrep_sync_wait(thd, wsrep_sync_wait_mask_for_command(command)); + return res; +} + +void wsrep_keys_free(wsrep_key_arr_t* key_arr) +{ + for (size_t i= 0; i < key_arr->keys_len; ++i) + { + my_free((void*)key_arr->keys[i].key_parts); + } + my_free(key_arr->keys); + key_arr->keys= 0; + key_arr->keys_len= 0; +} + +/*! + * @param thd thread + * @param tables list of tables + * @param keys prepared keys + + * @return true if parent table append was successfull, otherwise false. +*/ +bool +wsrep_append_fk_parent_table(THD* thd, TABLE_LIST* tables, wsrep::key_array* keys) +{ + bool fail= false; + TABLE_LIST *table; + TABLE_LIST *table_last_in_list; + + for (table= tables; table; table= table->next_local) + { + if (is_temporary_table(table)) + { + WSREP_DEBUG("Temporary table %s.%s already opened query=%s", table->db.str, + table->table_name.str, wsrep_thd_query(thd)); + return false; + } + } + + thd->release_transactional_locks(); + uint counter; + MDL_savepoint mdl_savepoint= thd->mdl_context.mdl_savepoint(); + + for (table_last_in_list= tables;;table_last_in_list= table_last_in_list->next_local) { + if (!table_last_in_list->next_local) { + break; + } + } + + if (open_tables(thd, &tables, &counter, MYSQL_OPEN_FORCE_SHARED_HIGH_PRIO_MDL)) + { + WSREP_DEBUG("Unable to open table for FK checks for %s", wsrep_thd_query(thd)); + fail= true; + goto exit; + } + + for (table= tables; table; table= table->next_local) + { + if (!is_temporary_table(table) && table->table) + { + FOREIGN_KEY_INFO *f_key_info; + List f_key_list; + + table->table->file->get_foreign_key_list(thd, &f_key_list); + List_iterator_fast it(f_key_list); + while ((f_key_info=it++)) + { + WSREP_DEBUG("appended fkey %s", f_key_info->referenced_table->str); + keys->push_back(wsrep_prepare_key_for_toi(f_key_info->referenced_db->str, + f_key_info->referenced_table->str, + wsrep::key::shared)); + } + } + } + +exit: + /* close the table and release MDL locks */ + close_thread_tables(thd); + thd->mdl_context.rollback_to_savepoint(mdl_savepoint); + bool invalidate_next_global= false; + for (table= tables; table; table= table->next_local) + { + table->table= NULL; + table->mdl_request.ticket= NULL; + // We should invalidate `next_global` only for entries that are added + // in this function + if (table == table_last_in_list) { + invalidate_next_global= true; + } + if (invalidate_next_global) { + table->next_global= NULL; + } + } + + return fail; +} + +bool wsrep_reload_ssl() +{ + try + { + std::string opts= Wsrep_server_state::instance().provider().options(); + if (opts.find("socket.ssl_reload") == std::string::npos) + { + WSREP_DEBUG("Option `socket.ssl_reload` not found in parameters."); + return false; + } + const std::string reload_ssl_param("socket.ssl_reload=1"); + enum wsrep::provider::status ret= Wsrep_server_state::instance().provider().options(reload_ssl_param); + if (ret) + { + WSREP_ERROR("Set options returned %d", ret); + return true; + } + return false; + } + catch (...) + { + WSREP_ERROR("Failed to get provider options"); + return true; + } +} + +bool wsrep_split_allowlist(std::vector& allowlist) +{ + if (!wsrep_allowlist || 0 == strlen(wsrep_allowlist)) + { + return false; + } + std::istringstream ss{wsrep_allowlist}; + std::string token; + while (std::getline(ss, token, ',')) + { + if (!token.empty()) + { + struct sockaddr_in sa_4; + struct sockaddr_in6 sa_6; + if ((inet_pton(AF_INET, token.c_str(), &(sa_4.sin_addr)) != 0) || + (inet_pton(AF_INET6, token.c_str(), &(sa_6.sin6_addr)) != 0)) + { + allowlist.push_back(token); + } + else + { + WSREP_WARN("Invalid IP address %s provided in `wsrep_allowlist` variable", token.c_str()); + } + } + } + return allowlist.size(); +} + +/*! + * @param db Database string + * @param table Table string + * @param key Array of wsrep_key_t + * @param key_len In: number of elements in key array, Out: number of + * elements populated + * + * @return true if preparation was successful, otherwise false. + */ + +static bool wsrep_prepare_key_for_isolation(const char* db, + const char* table, + wsrep_buf_t* key, + size_t* key_len) +{ + if (*key_len < 2) return false; + + switch (wsrep_protocol_version) + { + case 0: + *key_len= 0; + break; + case 1: + case 2: + case 3: + case 4: + { + *key_len= 0; + if (db) + { + key[*key_len].ptr= db; + key[*key_len].len= strlen(db); + ++(*key_len); + if (table) + { + key[*key_len].ptr= table; + key[*key_len].len= strlen(table); + ++(*key_len); + } + } + break; + } + default: + assert(0); + WSREP_ERROR("Unsupported protocol version: %ld", wsrep_protocol_version); + unireg_abort(1); + return false; + } + + return true; +} + +static bool wsrep_prepare_key_for_isolation(const char* db, + const char* table, + wsrep_key_arr_t* ka) +{ + wsrep_key_t* tmp; + tmp= (wsrep_key_t*)my_realloc(PSI_INSTRUMENT_ME, ka->keys, + (ka->keys_len + 1) * sizeof(wsrep_key_t), + MYF(MY_ALLOW_ZERO_PTR)); + if (!tmp) + { + WSREP_ERROR("Can't allocate memory for key_array"); + return false; + } + ka->keys= tmp; + if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) + my_malloc(PSI_INSTRUMENT_ME, sizeof(wsrep_buf_t)*2, MYF(0)))) + { + WSREP_ERROR("Can't allocate memory for key_parts"); + return false; + } + ka->keys[ka->keys_len].key_parts_num= 2; + ++ka->keys_len; + if (!wsrep_prepare_key_for_isolation(db, table, + (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, + &ka->keys[ka->keys_len - 1].key_parts_num)) + { + WSREP_ERROR("Preparing keys for isolation failed"); + return false; + } + + return true; +} + +static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db, + const Alter_info* alter_info, + wsrep_key_arr_t* ka) +{ + Key *key; + List_iterator key_iterator(const_cast(alter_info)->key_list); + while ((key= key_iterator++)) + { + if (key->type == Key::FOREIGN_KEY) + { + Foreign_key *fk_key= (Foreign_key *)key; + const char *db_name= fk_key->ref_db.str; + const char *table_name= fk_key->ref_table.str; + if (!db_name) + { + db_name= child_table_db; + } + if (!wsrep_prepare_key_for_isolation(db_name, table_name, ka)) + { + return false; + } + } + } + return true; +} + +static bool wsrep_prepare_keys_for_isolation(THD* thd, + const char* db, + const char* table, + const TABLE_LIST* table_list, + Alter_info* alter_info, + wsrep_key_arr_t* ka) +{ + ka->keys= 0; + ka->keys_len= 0; + + if (db || table) + { + if (!wsrep_prepare_key_for_isolation(db, table, ka)) + goto err; + } + + for (const TABLE_LIST* table= table_list; table; table= table->next_global) + { + if (!wsrep_prepare_key_for_isolation(table->db.str, table->table_name.str, ka)) + goto err; + } + + if (alter_info) + { + if (!wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info, ka)) + goto err; + } + return false; + +err: + wsrep_keys_free(ka); + return true; +} + +/* + * Prepare key list from db/table and table_list + * + * Return zero in case of success, 1 in case of failure. + */ + +bool wsrep_prepare_keys_for_isolation(THD* thd, + const char* db, + const char* table, + const TABLE_LIST* table_list, + wsrep_key_arr_t* ka) +{ + return wsrep_prepare_keys_for_isolation(thd, db, table, table_list, NULL, ka); +} + +bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len, + const uchar* row_id, size_t row_id_len, + wsrep_buf_t* key, size_t* key_len) +{ + if (*key_len < 3) return false; + + *key_len= 0; + switch (wsrep_protocol_version) + { + case 0: + { + key[0].ptr= cache_key; + key[0].len= cache_key_len; + + *key_len= 1; + break; + } + case 1: + case 2: + case 3: + case 4: + { + key[0].ptr= cache_key; + key[0].len= strlen( (char*)cache_key ); + + key[1].ptr= cache_key + strlen( (char*)cache_key ) + 1; + key[1].len= strlen( (char*)(key[1].ptr) ); + + *key_len= 2; + break; + } + default: + return false; + } + + key[*key_len].ptr= row_id; + key[*key_len].len= row_id_len; + ++(*key_len); + + return true; +} + +bool wsrep_prepare_key_for_innodb(THD* thd, + const uchar* cache_key, + size_t cache_key_len, + const uchar* row_id, + size_t row_id_len, + wsrep_buf_t* key, + size_t* key_len) +{ + + return wsrep_prepare_key(cache_key, cache_key_len, row_id, row_id_len, key, key_len); +} + +wsrep::key wsrep_prepare_key_for_toi(const char* db, const char* table, + enum wsrep::key::type type) +{ + wsrep::key ret(type); + DBUG_ASSERT(db); + ret.append_key_part(db, strlen(db)); + if (table) ret.append_key_part(table, strlen(table)); + return ret; +} + +wsrep::key_array +wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db, + const Alter_info* alter_info) + +{ + wsrep::key_array ret; + Key *key; + List_iterator key_iterator(const_cast(alter_info)->key_list); + while ((key= key_iterator++)) + { + if (key->type == Key::FOREIGN_KEY) + { + Foreign_key *fk_key= (Foreign_key *)key; + const char *db_name= fk_key->ref_db.str; + const char *table_name= fk_key->ref_table.str; + if (!db_name) + { + db_name= child_table_db; + } + ret.push_back(wsrep_prepare_key_for_toi(db_name, table_name, + wsrep::key::exclusive)); + } + } + return ret; +} + +wsrep::key_array wsrep_prepare_keys_for_toi(const char *db, + const char *table, + const TABLE_LIST *table_list, + const Alter_info *alter_info, + const wsrep::key_array *fk_tables) +{ + wsrep::key_array ret; + if (db || table) + { + ret.push_back(wsrep_prepare_key_for_toi(db, table, wsrep::key::exclusive)); + } + for (const TABLE_LIST* table= table_list; table; table= table->next_global) + { + ret.push_back(wsrep_prepare_key_for_toi(table->db.str, table->table_name.str, + wsrep::key::exclusive)); + } + if (alter_info) + { + wsrep::key_array fk(wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info)); + if (!fk.empty()) + { + ret.insert(ret.end(), fk.begin(), fk.end()); + } + } + if (fk_tables && !fk_tables->empty()) + { + ret.insert(ret.end(), fk_tables->begin(), fk_tables->end()); + } + return ret; +} + +/* + * Construct Query_log_Event from thd query and serialize it + * into buffer. + * + * Return 0 in case of success, 1 in case of error. + */ +int wsrep_to_buf_helper( + THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) +{ + IO_CACHE tmp_io_cache; + Log_event_writer writer(&tmp_io_cache, 0); + if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, + 65536, MYF(MY_WME))) + return 1; + int ret(0); + enum enum_binlog_checksum_alg current_binlog_check_alg= + (enum_binlog_checksum_alg) binlog_checksum_options; + + Format_description_log_event *tmp_fd= new Format_description_log_event(4); + tmp_fd->checksum_alg= current_binlog_check_alg; + writer.write(tmp_fd); + delete tmp_fd; + +#ifdef GTID_SUPPORT + if (thd->variables.gtid_next.type == GTID_GROUP) + { + Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next); + if (!gtid_ev.is_valid()) ret= 0; + if (!ret && writer.write(>id_ev)) ret= 1; + } +#endif /* GTID_SUPPORT */ + /* + * Check if this is applier thread, slave_thread or + * we have set manually WSREP GTID seqno. Add GTID event. + */ + if (thd->slave_thread || wsrep_thd_is_applying(thd) || + thd->variables.wsrep_gtid_seq_no) + { + uint64 seqno= thd->variables.gtid_seq_no; + uint32 domain_id= thd->variables.gtid_domain_id; + uint32 server_id= thd->variables.server_id; + if (!thd->variables.gtid_seq_no && thd->variables.wsrep_gtid_seq_no) + { + seqno= thd->variables.wsrep_gtid_seq_no; + domain_id= wsrep_gtid_server.domain_id; + server_id= wsrep_gtid_server.server_id; + } + Gtid_log_event gtid_event(thd, seqno, domain_id, true, + LOG_EVENT_SUPPRESS_USE_F, true, 0); + gtid_event.server_id= server_id; + if (!gtid_event.is_valid()) ret= 0; + ret= writer.write(>id_event); + } + /* + It's local DDL so in case of possible gtid seqno (SET gtid_seq_no=X) + manipulation, seqno value will be ignored. + */ + else + { + thd->variables.gtid_seq_no= 0; + } + + /* if there is prepare query, add event for it */ + if (!ret && thd->wsrep_TOI_pre_query) + { + Query_log_event ev(thd, thd->wsrep_TOI_pre_query, + thd->wsrep_TOI_pre_query_len, + FALSE, FALSE, FALSE, 0); + ev.checksum_alg= current_binlog_check_alg; + if (writer.write(&ev)) ret= 1; + } + + /* continue to append the actual query */ + Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); + /* WSREP GTID mode, we need to change server_id */ + if (wsrep_gtid_mode && !thd->variables.gtid_seq_no) + ev.server_id= wsrep_gtid_server.server_id; + ev.checksum_alg= current_binlog_check_alg; + if (!ret && writer.write(&ev)) ret= 1; + if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; + close_cached_file(&tmp_io_cache); + return ret; +} + +static int +wsrep_alter_query_string(THD *thd, String *buf) +{ + /* Append the "ALTER" part of the query */ + if (buf->append(STRING_WITH_LEN("ALTER "))) + return 1; + /* Append definer */ + append_definer(thd, buf, &(thd->lex->definer->user), &(thd->lex->definer->host)); + /* Append the left part of thd->query after event name part */ + if (buf->append(thd->lex->stmt_definition_begin, + thd->lex->stmt_definition_end - + thd->lex->stmt_definition_begin)) + return 1; + + return 0; +} + +static int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len) +{ + String log_query; + + if (wsrep_alter_query_string(thd, &log_query)) + { + WSREP_WARN("events alter string failed: schema: %s, query: %s", + thd->get_db(), thd->query()); + return 1; + } + return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); +} + +#include "sql_show.h" +static int +create_view_query(THD *thd, uchar** buf, size_t* buf_len) +{ + LEX *lex= thd->lex; + SELECT_LEX *select_lex= lex->first_select_lex(); + TABLE_LIST *first_table= select_lex->table_list.first; + TABLE_LIST *views= first_table; + LEX_USER *definer; + String buff; + const LEX_CSTRING command[3]= + {{ STRING_WITH_LEN("CREATE ") }, + { STRING_WITH_LEN("ALTER ") }, + { STRING_WITH_LEN("CREATE OR REPLACE ") }}; + + buff.append(&command[thd->lex->create_view->mode]); + + if (lex->definer) + definer= get_current_user(thd, lex->definer); + else + { + /* + DEFINER-clause is missing; we have to create default definer in + persistent arena to be PS/SP friendly. + If this is an ALTER VIEW then the current user should be set as + the definer. + */ + definer= create_default_definer(thd, false); + } + + if (definer) + { + views->definer.user= definer->user; + views->definer.host= definer->host; + } else { + WSREP_ERROR("Failed to get DEFINER for VIEW."); + return 1; + } + + views->algorithm = lex->create_view->algorithm; + views->view_suid = lex->create_view->suid; + views->with_check = lex->create_view->check; + + view_store_options(thd, views, &buff); + buff.append(STRING_WITH_LEN("VIEW ")); + /* Test if user supplied a db (ie: we did not use thd->db) */ + if (views->db.str && views->db.str[0] && + (thd->db.str == NULL || cmp(&views->db, &thd->db))) + { + append_identifier(thd, &buff, &views->db); + buff.append('.'); + } + append_identifier(thd, &buff, &views->table_name); + if (lex->view_list.elements) + { + List_iterator_fast names(lex->view_list); + LEX_CSTRING *name; + int i; + + buff.append('('); + for (i= 0; (name= names++); i++) + { + append_identifier(thd, &buff, name); + buff.append(", ", 2); + } + if (i) + buff.length(buff.length()-2); + buff.append(')'); + } + buff.append(STRING_WITH_LEN(" AS ")); + buff.append(thd->lex->create_view->select.str, + thd->lex->create_view->select.length); + return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); +} + +/* + Rewrite DROP TABLE for TOI. Temporary tables are eliminated from + the query as they are visible only to client connection. + + TODO: See comments for sql_base.cc:drop_temporary_table() and refine + the function to deal with transactional locked tables. + */ +static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len) +{ + + LEX* lex= thd->lex; + SELECT_LEX* select_lex= lex->first_select_lex(); + TABLE_LIST* first_table= select_lex->table_list.first; + String buff; + + DBUG_ASSERT(!lex->create_info.tmp_table()); + + bool found_temp_table= false; + for (TABLE_LIST* table= first_table; table; table= table->next_global) + { + if (thd->find_temporary_table(table->db.str, table->table_name.str)) + { + found_temp_table= true; + break; + } + } + + if (found_temp_table) + { + buff.append(STRING_WITH_LEN("DROP TABLE ")); + if (lex->check_exists) + buff.append(STRING_WITH_LEN("IF EXISTS ")); + + for (TABLE_LIST* table= first_table; table; table= table->next_global) + { + if (!thd->find_temporary_table(table->db.str, table->table_name.str)) + { + append_identifier(thd, &buff, table->db.str, table->db.length); + buff.append('.'); + append_identifier(thd, &buff, + table->table_name.str, table->table_name.length); + buff.append(','); + } + } + + /* Chop the last comma */ + buff.chop(); + buff.append(STRING_WITH_LEN(" /* generated by wsrep */")); + + WSREP_DEBUG("Rewrote '%s' as '%s'", thd->query(), buff.ptr()); + + return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); + } + else + { + return wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), + buf, buf_len); + } +} + + +/* Forward declarations. */ +int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); + +bool wsrep_should_replicate_ddl_iterate(THD* thd, const TABLE_LIST* table_list) +{ + if (WSREP(thd)) + { + for (const TABLE_LIST* it= table_list; it; it= it->next_global) + { + if (it->table && + !wsrep_should_replicate_ddl(thd, it->table->s->db_type())) + return false; + } + } + return true; +} + +bool wsrep_should_replicate_ddl(THD* thd, const handlerton *hton) +{ + if (!wsrep_check_mode(WSREP_MODE_STRICT_REPLICATION)) + return true; + + if (!hton) + return true; + + switch (hton->db_type) + { + case DB_TYPE_INNODB: + return true; + break; + case DB_TYPE_MYISAM: + if (wsrep_check_mode(WSREP_MODE_REPLICATE_MYISAM)) + return true; + else + WSREP_DEBUG("wsrep OSU failed for %s", wsrep_thd_query(thd)); + break; + case DB_TYPE_ARIA: + if (wsrep_check_mode(WSREP_MODE_REPLICATE_ARIA)) + return true; + else + WSREP_DEBUG("wsrep OSU failed for %s", wsrep_thd_query(thd)); + break; + default: + WSREP_DEBUG("wsrep OSU failed for %s", wsrep_thd_query(thd)); + break; + } + + /* wsrep_mode = STRICT_REPLICATION, treat as error */ + my_error(ER_GALERA_REPLICATION_NOT_SUPPORTED, MYF(0)); + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_ILLEGAL_HA, + "WSREP: wsrep_mode = STRICT_REPLICATION enabled. " + "Storage engine %s not supported.", + ha_resolve_storage_engine_name(hton)); + return false; +} +/* + Decide if statement should run in TOI. + + Look if table or table_list contain temporary tables. If the + statement affects only temporary tables, statement should not run + in TOI. If the table list contains mix of regular and temporary tables + (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but + should be rewritten at later time for replication to contain only + non-temporary tables. + */ +bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table, + const TABLE_LIST *table_list, + const HA_CREATE_INFO* create_info) +{ + DBUG_ASSERT(!table || db); + DBUG_ASSERT(table_list || db); + + LEX* lex= thd->lex; + SELECT_LEX* select_lex= lex->first_select_lex(); + const TABLE_LIST* first_table= select_lex->table_list.first; + + switch (lex->sql_command) + { + case SQLCOM_CREATE_TABLE: + if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) + { + return false; + } + if (!wsrep_should_replicate_ddl(thd, create_info->db_type)) + { + return false; + } + /* + If mariadb master has replicated a CTAS, we should not replicate the create table + part separately as TOI, but to replicate both create table and following inserts + as one write set. + Howver, if CTAS creates empty table, we should replicate the create table alone + as TOI. We have to do relay log event lookup to see if row events follow the + create table event. + */ + if (thd->slave_thread && + !(thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_STANDALONE)) + { + /* this is CTAS, either empty or populated table */ + ulonglong event_size = 0; + enum Log_event_type ev_type= wsrep_peak_event(thd->rgi_slave, &event_size); + switch (ev_type) + { + case QUERY_EVENT: + /* CTAS with empty table, we replicate create table as TOI */ + break; + + case TABLE_MAP_EVENT: + WSREP_DEBUG("replicating CTAS of empty table as TOI"); + // fall through + case WRITE_ROWS_EVENT: + /* CTAS with populated table, we replicate later at commit time */ + WSREP_DEBUG("skipping create table of CTAS replication"); + return false; + + default: + WSREP_WARN("unexpected async replication event: %d", ev_type); + } + return true; + } + /* no next async replication event */ + return true; + break; + case SQLCOM_CREATE_VIEW: + + DBUG_ASSERT(!table_list); + DBUG_ASSERT(first_table); /* First table is view name */ + /* + If any of the remaining tables refer to temporary table error + is returned to client, so TOI can be skipped + */ + for (const TABLE_LIST* it= first_table->next_global; it; it= it->next_global) + { + if (thd->find_temporary_table(it)) + { + return false; + } + } + return true; + break; + case SQLCOM_CREATE_TRIGGER: + + DBUG_ASSERT(first_table); + + if (thd->find_temporary_table(first_table)) + { + return false; + } + return true; + break; + case SQLCOM_DROP_TRIGGER: + DBUG_ASSERT(table_list); + if (thd->find_temporary_table(table_list)) + { + return false; + } + return true; + break; + case SQLCOM_ALTER_TABLE: + if (create_info) + { + const handlerton *hton= create_info->db_type; + + if (!hton) + hton= ha_default_handlerton(thd); + if (!wsrep_should_replicate_ddl(thd, hton)) + return false; + } + /* fallthrough */ + default: + if (table && !thd->find_temporary_table(db, table)) + { + return true; + } + + if (table_list) + { + for (const TABLE_LIST* table= first_table; table; table= table->next_global) + { + if (!thd->find_temporary_table(table->db.str, table->table_name.str)) + { + return true; + } + } + } + + return !(table || table_list); + break; + case SQLCOM_CREATE_SEQUENCE: + /* No TOI for temporary sequences as they are + not replicated */ + if (thd->lex->tmp_table()) + { + return false; + } + return true; + + } +} + +static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len) +{ + String log_query; + sp_head *sp= thd->lex->sphead; + sql_mode_t saved_mode= thd->variables.sql_mode; + String retstr(64); + LEX_CSTRING returns= empty_clex_str; + retstr.set_charset(system_charset_info); + + log_query.set_charset(system_charset_info); + + if (sp->m_handler->type() == SP_TYPE_FUNCTION) + { + sp_returns_type(thd, retstr, sp); + retstr.get_value(&returns); + } + if (sp->m_handler-> + show_create_sp(thd, &log_query, + sp->m_explicit_name ? sp->m_db : null_clex_str, + sp->m_name, sp->m_params, returns, + sp->m_body, sp->chistics(), + thd->lex->definer[0], + thd->lex->create_info, + saved_mode)) + { + WSREP_WARN("SP create string failed: schema: %s, query: %s", + thd->get_db(), thd->query()); + return 1; + } + + return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); +} + +static int wsrep_TOI_event_buf(THD* thd, uchar** buf, size_t* buf_len) +{ + int err; + switch (thd->lex->sql_command) + { + case SQLCOM_CREATE_VIEW: + err= create_view_query(thd, buf, buf_len); + break; + case SQLCOM_CREATE_PROCEDURE: + case SQLCOM_CREATE_SPFUNCTION: + err= wsrep_create_sp(thd, buf, buf_len); + break; + case SQLCOM_CREATE_TRIGGER: + err= wsrep_create_trigger_query(thd, buf, buf_len); + break; + case SQLCOM_CREATE_EVENT: + err= wsrep_create_event_query(thd, buf, buf_len); + break; + case SQLCOM_ALTER_EVENT: + err= wsrep_alter_event_query(thd, buf, buf_len); + break; + case SQLCOM_DROP_TABLE: + err= wsrep_drop_table_query(thd, buf, buf_len); + break; + case SQLCOM_CREATE_ROLE: + if (sp_process_definer(thd)) + { + WSREP_WARN("Failed to set CREATE ROLE definer for TOI."); + } + /* fallthrough */ + default: + err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), buf, + buf_len); + break; + } + + return err; +} + +static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */) +{ + if (wsrep_thd_trx_seqno(thd) > 0) + { + /* GTID was granted and TO acquired - need to log event and release TO */ + if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); + if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; } + wsrep::client_state& cs(thd->wsrep_cs()); + std::string const err(wsrep::to_c_string(cs.current_error())); + wsrep::mutable_buffer err_buf; + err_buf.push_back(err); + int const ret= cs.leave_toi_local(err_buf); + if (ret) + { + WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, " + "schema: %s, SQL: %s, rcode: %d wsrep_error: %s", + (long long)thd->real_id, thd->db.str, + thd->query(), ret, err.c_str()); + goto fail; + } + } + return; +fail: + WSREP_ERROR("Failed to release TOI resources. Need to abort."); + unireg_abort(1); +} + + +/* + returns: + 0: statement was replicated as TOI + 1: TOI replication was skipped + -1: TOI replication failed + */ +static int wsrep_TOI_begin(THD *thd, const char *db, const char *table, + const TABLE_LIST *table_list, + const Alter_info *alter_info, + const wsrep::key_array *fk_tables, + const HA_CREATE_INFO *create_info) +{ + DBUG_ASSERT(wsrep_OSU_method_get(thd) == WSREP_OSU_TOI); + + WSREP_DEBUG("TOI Begin: %s", wsrep_thd_query(thd)); + + if (wsrep_can_run_in_toi(thd, db, table, table_list, create_info) == false) + { + WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd)); + return 1; + } + + uchar* buf= 0; + size_t buf_len(0); + int buf_err; + int rc; + + buf_err= wsrep_TOI_event_buf(thd, &buf, &buf_len); + + if (buf_err) { + WSREP_ERROR("Failed to create TOI event buf: %d", buf_err); + my_message(ER_UNKNOWN_ERROR, + "WSREP replication failed to prepare TOI event buffer. " + "Check your query.", + MYF(0)); + return -1; + } + + struct wsrep_buf buff= { buf, buf_len }; + + wsrep::key_array key_array= + wsrep_prepare_keys_for_toi(db, table, table_list, alter_info, fk_tables); + + if (thd->has_read_only_protection()) + { + /* non replicated DDL, affecting temporary tables only */ + WSREP_DEBUG("TO isolation skipped, sql: %s." + "Only temporary tables affected.", + wsrep_thd_query(thd)); + if (buf) my_free(buf); + return -1; + } + + thd_proc_info(thd, "acquiring total order isolation"); + WSREP_DEBUG("wsrep_TOI_begin for %s", wsrep_thd_query(thd)); + THD_STAGE_INFO(thd, stage_waiting_isolation); + + wsrep::client_state& cs(thd->wsrep_cs()); + + int ret= cs.enter_toi_local(key_array, + wsrep::const_buffer(buff.ptr, buff.len)); + + if (ret) + { + DBUG_ASSERT(cs.current_error()); + WSREP_DEBUG("to_execute_start() failed for %llu: %s, seqno: %lld", + thd->thread_id, wsrep_thd_query(thd), + (long long)wsrep_thd_trx_seqno(thd)); + + /* jump to error handler in mysql_execute_command() */ + switch (cs.current_error()) + { + case wsrep::e_size_exceeded_error: + WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. " + "Maximum size exceeded.", + ret, + (thd->db.str ? thd->db.str : "(null)"), + wsrep_thd_query(thd)); + my_error(ER_UNKNOWN_ERROR, MYF(0), "Maximum writeset size exceeded"); + break; + case wsrep::e_deadlock_error: + WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. " + "Deadlock error.", + ret, + (thd->db.str ? thd->db.str : "(null)"), + wsrep_thd_query(thd)); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + break; + case wsrep::e_timeout_error: + WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. " + "Operation timed out.", + ret, + (thd->db.str ? thd->db.str : "(null)"), + wsrep_thd_query(thd)); + my_error(ER_LOCK_WAIT_TIMEOUT, MYF(0)); + break; + default: + WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. " + "Check your wsrep connection state and retry the query.", + ret, + (thd->db.str ? thd->db.str : "(null)"), + wsrep_thd_query(thd)); + + if (!thd->is_error()) + { + my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check " + "your wsrep connection state and retry the query."); + } + } + rc= -1; + } + else { + if (!thd->variables.gtid_seq_no) + { + uint64 seqno= 0; + if (thd->variables.wsrep_gtid_seq_no && + thd->variables.wsrep_gtid_seq_no > wsrep_gtid_server.seqno()) + { + seqno= thd->variables.wsrep_gtid_seq_no; + wsrep_gtid_server.seqno(thd->variables.wsrep_gtid_seq_no); + } + else + { + seqno= wsrep_gtid_server.seqno_inc(); + } + thd->variables.wsrep_gtid_seq_no= 0; + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + thd->variables.server_id= wsrep_gtid_server.server_id; + } + } + ++wsrep_to_isolation; + rc= 0; + } + + if (buf) my_free(buf); + + if (rc) wsrep_TOI_begin_failed(thd, NULL); + + return rc; +} + +static void wsrep_TOI_end(THD *thd) { + wsrep_to_isolation--; + wsrep::client_state& client_state(thd->wsrep_cs()); + DBUG_ASSERT(wsrep_thd_is_local_toi(thd)); + WSREP_DEBUG("TO END: %lld: %s", client_state.toi_meta().seqno().get(), + wsrep_thd_query(thd)); + + wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false); + + if (wsrep_thd_is_local_toi(thd)) + { + wsrep::mutable_buffer err; + + thd->wsrep_last_written_gtid_seqno= thd->wsrep_current_gtid_seqno; + wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid()); + + if (thd->is_error() && !wsrep_must_ignore_error(thd)) + { + wsrep_store_error(thd, err); + } + + int const ret= client_state.leave_toi_local(err); + + if (!ret) + { + WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get()); + } + else + { + WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s", + ret, (thd->db.str ? thd->db.str : "(null)"), wsrep_thd_query(thd)); + } + } +} + +static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_) +{ + WSREP_DEBUG("RSU BEGIN: %lld, : %s", wsrep_thd_trx_seqno(thd), + wsrep_thd_query(thd)); + + /* For CREATE TEMPORARY SEQUENCE we do not start RSU because + object is local only and actually CREATE TABLE + INSERT + */ + if (thd->lex->sql_command == SQLCOM_CREATE_SEQUENCE && + thd->lex->tmp_table()) + return 1; + + if (thd->variables.wsrep_OSU_method == WSREP_OSU_RSU && + thd->variables.sql_log_bin == 1 && + wsrep_check_mode(WSREP_MODE_DISALLOW_LOCAL_GTID)) + { + /* wsrep_mode = WSREP_MODE_DISALLOW_LOCAL_GTID, treat as error */ + my_error(ER_GALERA_REPLICATION_NOT_SUPPORTED, MYF(0)); + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OPTION_PREVENTS_STATEMENT, + "You can't execute statements that would generate local " + "GTIDs when wsrep_mode = DISALLOW_LOCAL_GTID is set. " + "Try disabling binary logging with SET sql_log_bin=0 " + "to execute this statement."); + + return -1; + } + + if (thd->wsrep_cs().begin_rsu(5000)) + { + WSREP_WARN("RSU begin failed"); + } + else + { + thd->variables.wsrep_on= 0; + } + return 0; +} + +static void wsrep_RSU_end(THD *thd) +{ + WSREP_DEBUG("RSU END: %lld : %s", wsrep_thd_trx_seqno(thd), + wsrep_thd_query(thd)); + if (thd->wsrep_cs().end_rsu()) + { + WSREP_WARN("Failed to end RSU, server may need to be restarted"); + } + thd->variables.wsrep_on= 1; +} + +static inline bool is_replaying_connection(THD *thd) +{ + bool ret; + + mysql_mutex_lock(&thd->LOCK_thd_data); + ret= (thd->wsrep_trx().state() == wsrep::transaction::s_replaying) ? true : false; + mysql_mutex_unlock(&thd->LOCK_thd_data); + + return ret; +} + +int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, + const TABLE_LIST* table_list, + const Alter_info *alter_info, + const wsrep::key_array *fk_tables, + const HA_CREATE_INFO *create_info) +{ + /* + No isolation for applier or replaying threads. + */ + if (!wsrep_thd_is_local(thd)) + { + if (wsrep_OSU_method_get(thd) == WSREP_OSU_TOI) + WSREP_DEBUG("%s TOI Begin: %s", + is_replaying_connection(thd) ? "Replay" : "Apply", + wsrep_thd_query(thd)); + + return 0; + } + + if (thd->wsrep_parallel_slave_wait_for_prior_commit()) + { + WSREP_WARN("TOI: wait_for_prior_commit() returned error."); + return -1; + } + + int ret= 0; + + mysql_mutex_lock(&thd->LOCK_thd_data); + + if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort) + { + WSREP_INFO("thread: %lld schema: %s query: %s has been aborted due to multi-master conflict", + (longlong) thd->thread_id, thd->get_db(), thd->query()); + mysql_mutex_unlock(&thd->LOCK_thd_data); + return WSREP_TRX_FAIL; + } + mysql_mutex_unlock(&thd->LOCK_thd_data); + + DBUG_ASSERT(wsrep_thd_is_local(thd)); + DBUG_ASSERT(thd->wsrep_trx().ws_meta().seqno().is_undefined()); + + if (Wsrep_server_state::instance().desynced_on_pause()) + { + my_message(ER_UNKNOWN_COM_ERROR, + "Aborting TOI: Replication paused on node for FTWRL/BACKUP STAGE.", MYF(0)); + WSREP_DEBUG("Aborting TOI: Replication paused on node for FTWRL/BACKUP STAGE.: %s %llu", + wsrep_thd_query(thd), thd->thread_id); + return -1; + } + + /* If we are inside LOCK TABLE we release it and give warning. */ + if (thd->variables.option_bits & OPTION_TABLE_LOCK && + thd->lex->sql_command == SQLCOM_CREATE_SEQUENCE) + { + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + HA_ERR_UNSUPPORTED, + "Galera cluster does not support LOCK TABLE on " + "SEQUENCES. Lock is released."); + } + if (wsrep_debug && thd->mdl_context.has_locks()) + { + WSREP_DEBUG("thread holds MDL locks at TO begin: %s %llu", + wsrep_thd_query(thd), thd->thread_id); + } + + /* + It makes sense to set auto_increment_* to defaults in TOI operations. + Must be done before wsrep_TOI_begin() since Query_log_event encapsulating + TOI statement and auto inc variables for wsrep replication is constructed + there. Variables are reset back in THD::reset_for_next_command() before + processing of next command. + */ + if (wsrep_auto_increment_control) + { + thd->variables.auto_increment_offset= 1; + thd->variables.auto_increment_increment= 1; + } + + if (thd->variables.wsrep_on && wsrep_thd_is_local(thd)) + { + switch (wsrep_OSU_method_get(thd)) { + case WSREP_OSU_TOI: + ret= wsrep_TOI_begin(thd, db_, table_, table_list, alter_info, fk_tables, + create_info); + break; + case WSREP_OSU_RSU: + ret= wsrep_RSU_begin(thd, db_, table_); + break; + default: + WSREP_ERROR("Unsupported OSU method: %lu", + wsrep_OSU_method_get(thd)); + ret= -1; + break; + } + + switch (ret) { + case 0: /* wsrep_TOI_begin should set toi mode */ + if (thd->variables.wsrep_OSU_method == WSREP_OSU_TOI) + { + /* + TOI operations ignore the provided lock_wait_timeout once replicated, + and restore it after operation is done. + */ + thd->variables.saved_lock_wait_timeout= thd->variables.lock_wait_timeout; + thd->variables.lock_wait_timeout= LONG_TIMEOUT; + } + break; + case 1: + /* TOI replication skipped, treat as success */ + ret= 0; + break; + case -1: + /* TOI replication failed, treat as error */ + break; + } + } + + return ret; +} + +void wsrep_to_isolation_end(THD *thd) +{ + if (wsrep_thd_is_local_toi(thd)) + { + thd->variables.lock_wait_timeout= thd->variables.saved_lock_wait_timeout; + DBUG_ASSERT(wsrep_OSU_method_get(thd) == WSREP_OSU_TOI); + wsrep_TOI_end(thd); + } + else if (wsrep_thd_is_in_rsu(thd)) + { + thd->variables.lock_wait_timeout= thd->variables.saved_lock_wait_timeout; + DBUG_ASSERT(wsrep_OSU_method_get(thd) == WSREP_OSU_RSU); + wsrep_RSU_end(thd); + } + else + { + /* Applier or replaying threads just output TO END */ + if (wsrep_debug) + { + wsrep::client_state& client_state(thd->wsrep_cs()); + WSREP_DEBUG("%s TO END: %lld: %s", + is_replaying_connection(thd) ? "Replay" : "Apply", + client_state.toi_meta().seqno().get(), + wsrep_thd_query(thd)); + } + return; + } + + if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); +} + +#define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra) \ + WSREP_##severity( \ + "%s\n" \ + "schema: %.*s\n" \ + "request: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)\n" \ + "granted: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)", \ + msg, schema_len, schema, \ + req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ + wsrep_thd_client_mode_str(req), wsrep_thd_client_state_str(req), wsrep_thd_transaction_state_str(req), \ + req->get_command(), req->lex->sql_command, req->query(), \ + gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ + wsrep_thd_client_mode_str(gra), wsrep_thd_client_state_str(gra), wsrep_thd_transaction_state_str(gra), \ + gra->get_command(), gra->lex->sql_command, gra->query()); + +/** + Check if request for the metadata lock should be granted to the requester. + + @param requestor_ctx The MDL context of the requestor + @param ticket MDL ticket for the requested lock + + @retval TRUE Lock request can be granted + @retval FALSE Lock request cannot be granted +*/ + +void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, + const MDL_ticket *ticket, + const MDL_key *key) +{ + THD *request_thd= requestor_ctx->get_thd(); + THD *granted_thd= ticket->get_ctx()->get_thd(); + + /* Fallback to the non-wsrep behaviour */ + if (!WSREP(request_thd)) return; + + const char* schema= key->db_name(); + int schema_len= key->db_name_length(); + + mysql_mutex_lock(&request_thd->LOCK_thd_data); + + if (wsrep_thd_is_toi(request_thd) || + wsrep_thd_is_applying(request_thd)) + { + WSREP_DEBUG("wsrep_handle_mdl_conflict request TOI/APPLY for %s", + wsrep_thd_query(request_thd)); + THD_STAGE_INFO(request_thd, stage_waiting_isolation); + mysql_mutex_unlock(&request_thd->LOCK_thd_data); + WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len, + request_thd, granted_thd); + ticket->wsrep_report(wsrep_debug); + + DEBUG_SYNC(request_thd, "before_wsrep_thd_abort"); + DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort", { + const char act[]= "now " + "SIGNAL sync.before_wsrep_thd_abort_reached " + "WAIT_FOR signal.before_wsrep_thd_abort"; + DBUG_ASSERT(!debug_sync_set_action(request_thd, STRING_WITH_LEN(act))); + };); + + /* Here we will call wsrep_abort_transaction so we should hold + THD::LOCK_thd_data to protect victim from concurrent usage + and THD::LOCK_thd_kill to protect from disconnect or delete. + + */ + mysql_mutex_lock(&granted_thd->LOCK_thd_kill); + mysql_mutex_lock(&granted_thd->LOCK_thd_data); + + if (wsrep_thd_is_toi(granted_thd) || + wsrep_thd_is_applying(granted_thd)) + { + if (wsrep_thd_is_aborting(granted_thd)) + { + WSREP_DEBUG("BF thread waiting for SR in aborting state for %s", + wsrep_thd_query(request_thd)); + THD_STAGE_INFO(request_thd, stage_waiting_isolation); + ticket->wsrep_report(wsrep_debug); + } + else if (wsrep_thd_is_SR(granted_thd) && !wsrep_thd_is_SR(request_thd)) + { + WSREP_MDL_LOG(INFO, "MDL conflict, DDL vs SR", + schema, schema_len, request_thd, granted_thd); + WSREP_DEBUG("wsrep_handle_mdl_conflict DDL vs SR for %s", + wsrep_thd_query(request_thd)); + THD_STAGE_INFO(request_thd, stage_waiting_isolation); + wsrep_abort_thd(request_thd, granted_thd, 1); + } + else + { + WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len, + request_thd, granted_thd); + ticket->wsrep_report(true); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); + unireg_abort(1); + } + } + else if (granted_thd->lex->sql_command == SQLCOM_FLUSH || + granted_thd->mdl_context.has_explicit_locks()) + { + WSREP_DEBUG("BF thread waiting for FLUSH for %s", + wsrep_thd_query(request_thd)); + THD_STAGE_INFO(request_thd, stage_waiting_ddl); + ticket->wsrep_report(wsrep_debug); + if (granted_thd->current_backup_stage != BACKUP_FINISHED && + wsrep_check_mode(WSREP_MODE_BF_MARIABACKUP)) + { + wsrep_abort_thd(request_thd, granted_thd, 1); + } + } + else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE) + { + WSREP_DEBUG("DROP caused BF abort, conf %s for %s", + wsrep_thd_transaction_state_str(granted_thd), + wsrep_thd_query(request_thd)); + THD_STAGE_INFO(request_thd, stage_waiting_isolation); + ticket->wsrep_report(wsrep_debug); + wsrep_abort_thd(request_thd, granted_thd, 1); + } + else + { + WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len, + request_thd, granted_thd); + WSREP_DEBUG("wsrep_handle_mdl_conflict -> BF abort for %s", + wsrep_thd_query(request_thd)); + THD_STAGE_INFO(request_thd, stage_waiting_isolation); + ticket->wsrep_report(wsrep_debug); + + if (granted_thd->wsrep_trx().active()) + { + wsrep_abort_thd(request_thd, granted_thd, 1); + } + else + { + /* + Granted_thd is likely executing with wsrep_on=0. If the requesting + thd is BF, BF abort and wait. + */ + if (wsrep_thd_is_BF(request_thd, FALSE)) + { + granted_thd->awake_no_mutex(KILL_QUERY_HARD); + ha_abort_transaction(request_thd, granted_thd, TRUE); + } + else + { + WSREP_MDL_LOG(INFO, "MDL unknown BF-BF conflict", + schema, schema_len, + request_thd, granted_thd); + ticket->wsrep_report(true); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); + unireg_abort(1); + } + } + } + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); + } + else + { + mysql_mutex_unlock(&request_thd->LOCK_thd_data); + } +} + +/**/ +static bool abort_replicated(THD *thd) +{ + bool ret_code= false; + mysql_mutex_lock(&thd->LOCK_thd_kill); + mysql_mutex_lock(&thd->LOCK_thd_data); + if (thd->wsrep_trx().state() == wsrep::transaction::s_committing) + { + WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id)); + + wsrep_abort_thd(thd, thd, TRUE); + ret_code= true; + } + mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); + return ret_code; +} + +/**/ +static inline bool is_client_connection(THD *thd) +{ + return (thd->wsrep_client_thread && thd->variables.wsrep_on); +} + +static inline bool is_committing_connection(THD *thd) +{ + bool ret; + + mysql_mutex_lock(&thd->LOCK_thd_data); + ret= (thd->wsrep_trx().state() == wsrep::transaction::s_committing) ? true : false; + mysql_mutex_unlock(&thd->LOCK_thd_data); + + return ret; +} + +static my_bool have_client_connections(THD *thd, void*) +{ + DBUG_PRINT("quit",("Informing thread %lld that it's time to die", + (longlong) thd->thread_id)); + if (is_client_connection(thd) && + (thd->killed == KILL_CONNECTION || + thd->killed == KILL_CONNECTION_HARD)) + { + (void)abort_replicated(thd); + return 1; + } + return 0; +} + +static void wsrep_close_thread(THD *thd) +{ + thd->set_killed(KILL_CONNECTION_HARD); + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); + mysql_mutex_lock(&thd->LOCK_thd_kill); + thd->abort_current_cond_wait(true); + mysql_mutex_unlock(&thd->LOCK_thd_kill); +} + +static my_bool have_committing_connections(THD *thd, void *) +{ + return is_client_connection(thd) && is_committing_connection(thd) ? 1 : 0; +} + +int wsrep_wait_committing_connections_close(int wait_time) +{ + int sleep_time= 100; + + WSREP_DEBUG("wait for committing transaction to close: %d sleep: %d", wait_time, sleep_time); + while (server_threads.iterate(have_committing_connections) && wait_time > 0) + { + WSREP_DEBUG("wait for committing transaction to close: %d", wait_time); + my_sleep(sleep_time); + wait_time -= sleep_time; + } + return server_threads.iterate(have_committing_connections); +} + +static my_bool kill_all_threads(THD *thd, THD *caller_thd) +{ + DBUG_PRINT("quit", ("Informing thread %lld that it's time to die", + (longlong) thd->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (is_client_connection(thd) && thd != caller_thd) + { + if (is_replaying_connection(thd)) + thd->set_killed(KILL_CONNECTION_HARD); + else if (!abort_replicated(thd)) + { + /* replicated transactions must be skipped */ + WSREP_DEBUG("closing connection %lld", (longlong) thd->thread_id); + /* instead of wsrep_close_thread() we do now soft kill by THD::awake */ + thd->awake(KILL_CONNECTION_HARD); + } + } + return 0; +} + +static my_bool kill_remaining_threads(THD *thd, THD *caller_thd) +{ +#ifndef __bsdi__ // Bug in BSDI kernel + if (is_client_connection(thd) && + !abort_replicated(thd) && + !is_replaying_connection(thd) && + thd_is_connection_alive(thd) && + thd != caller_thd) + { + + WSREP_INFO("killing local connection: %lld", (longlong) thd->thread_id); + close_connection(thd); + } +#endif + return 0; +} + +void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd) +{ + /* Clear thread cache */ + thread_cache.final_flush(); + + /* + First signal all threads that it's time to die + */ + server_threads.iterate(kill_all_threads, except_caller_thd); + + /* + Force remaining threads to die by closing the connection to the client + */ + server_threads.iterate(kill_remaining_threads, except_caller_thd); + + DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)", THD_count::value())); + WSREP_DEBUG("waiting for client connections to close: %u", THD_count::value()); + + while (wait_to_end && server_threads.iterate(have_client_connections)) + { + sleep(1); + DBUG_PRINT("quit",("One thread died (count=%u)", THD_count::value())); + } + + /* All client connection threads have now been aborted */ +} + + +void wsrep_close_applier(THD *thd) +{ + WSREP_DEBUG("closing applier %lld", (longlong) thd->thread_id); + wsrep_close_thread(thd); +} + +static my_bool wsrep_close_threads_callback(THD *thd, THD *caller_thd) +{ + DBUG_PRINT("quit",("Informing thread %lld that it's time to die", + (longlong) thd->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (thd->wsrep_applier && thd != caller_thd) + { + WSREP_DEBUG("closing wsrep thread %lld", (longlong) thd->thread_id); + wsrep_close_thread(thd); + } + return 0; +} + +void wsrep_close_threads(THD *thd) +{ + server_threads.iterate(wsrep_close_threads_callback, thd); +} + +void wsrep_wait_appliers_close(THD *thd) +{ + /* Wait for wsrep appliers to gracefully exit */ + mysql_mutex_lock(&LOCK_wsrep_slave_threads); + while (wsrep_running_threads > 2) + /* + 2 is for rollbacker thread which needs to be killed explicitly. + This gotta be fixed in a more elegant manner if we gonna have arbitrary + number of non-applier wsrep threads. + */ + { + mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads); + } + mysql_mutex_unlock(&LOCK_wsrep_slave_threads); + DBUG_PRINT("quit",("applier threads have died (count=%u)", + uint32_t(wsrep_running_threads))); + + /* Now kill remaining wsrep threads: rollbacker */ + wsrep_close_threads (thd); + /* and wait for them to die */ + mysql_mutex_lock(&LOCK_wsrep_slave_threads); + while (wsrep_running_threads > 0) + { + mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads); + } + mysql_mutex_unlock(&LOCK_wsrep_slave_threads); + DBUG_PRINT("quit",("all wsrep system threads have died")); + + /* All wsrep applier threads have now been aborted. However, if this thread + is also applier, we are still running... + */ +} +int wsrep_must_ignore_error(THD* thd) +{ + const int error= thd->get_stmt_da()->sql_errno(); + const uint flags= sql_command_flags[thd->lex->sql_command]; + + DBUG_ASSERT(error); + DBUG_ASSERT(wsrep_thd_is_toi(thd)); + + if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL)) + goto ignore_error; + + if ((flags & CF_WSREP_MAY_IGNORE_ERRORS) && + (wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DDL)) + { + switch (error) + { + case ER_DB_DROP_EXISTS: + case ER_BAD_TABLE_ERROR: + case ER_CANT_DROP_FIELD_OR_KEY: + goto ignore_error; + } + } + + return 0; + +ignore_error: + WSREP_WARN("Ignoring error '%s' on query. " + "Default database: '%s'. Query: '%s', Error_code: %d", + thd->get_stmt_da()->message(), + print_slave_db_safe(thd->db.str), + thd->query(), + error); + return 1; +} + +int wsrep_ignored_error_code(Log_event* ev, int error) +{ + const THD* thd= ev->thd; + + DBUG_ASSERT(error); + /* Note that binlog events can be executed on master also with + BINLOG '....'; */ + DBUG_ASSERT(!wsrep_thd_is_local_toi(thd)); + + if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DML)) + { + const int ev_type= ev->get_type_code(); + if ((ev_type == DELETE_ROWS_EVENT || ev_type == DELETE_ROWS_EVENT_V1) + && error == ER_KEY_NOT_FOUND) + goto ignore_error; + } + + return 0; + +ignore_error: + WSREP_WARN("Ignoring error '%s' on %s event. Error_code: %d", + thd->get_stmt_da()->message(), + ev->get_type_str(), + error); + return 1; +} + +bool wsrep_provider_is_SR_capable() +{ + return Wsrep_server_state::has_capability(wsrep::provider::capability::streaming); +} + +int wsrep_thd_retry_counter(const THD *thd) +{ + return thd->wsrep_retry_counter; +} + +extern bool wsrep_thd_ignore_table(THD *thd) +{ + return thd->wsrep_ignore_table; +} + +bool wsrep_create_like_table(THD* thd, TABLE_LIST* table, + TABLE_LIST* src_table, + HA_CREATE_INFO *create_info) +{ + if (create_info->tmp_table()) + { + /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */ + WSREP_DEBUG("CREATE TEMPORARY TABLE LIKE... skipped replication\n %s", + thd->query()); + } + else if (!(thd->find_temporary_table(src_table))) + { + /* this is straight CREATE TABLE LIKE... with no tmp tables */ + WSREP_TO_ISOLATION_BEGIN_CREATE(table->db.str, table->table_name.str, table, create_info); + } + else + { + /* Non-MERGE tables ignore this call. */ + if (src_table->table->file->extra(HA_EXTRA_ADD_CHILDREN_LIST)) + return (true); + + char buf[2048]; + String query(buf, sizeof(buf), system_charset_info); + query.length(0); // Have to zero it since constructor doesn't + + int result __attribute__((unused))= + show_create_table(thd, src_table, &query, NULL, WITH_DB_NAME); + WSREP_DEBUG("TMP TABLE: %s ret_code %d", query.ptr(), result); + + thd->wsrep_TOI_pre_query= query.ptr(); + thd->wsrep_TOI_pre_query_len= query.length(); + + WSREP_TO_ISOLATION_BEGIN_CREATE(table->db.str, table->table_name.str, table, create_info); + + thd->wsrep_TOI_pre_query= NULL; + thd->wsrep_TOI_pre_query_len= 0; + + /* Non-MERGE tables ignore this call. */ + src_table->table->file->extra(HA_EXTRA_DETACH_CHILDREN); + } + + return(false); +#ifdef WITH_WSREP +wsrep_error_label: + thd->wsrep_TOI_pre_query= NULL; + return (true); +#endif +} + +int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len) +{ + LEX *lex= thd->lex; + String stmt_query; + + LEX_CSTRING definer_user; + LEX_CSTRING definer_host; + + if (!lex->definer) + { + if (!thd->slave_thread) + { + if (!(lex->definer= create_default_definer(thd, false))) + return 1; + } + } + + if (lex->definer) + { + /* SUID trigger. */ + LEX_USER *d= get_current_user(thd, lex->definer); + + if (!d) + return 1; + + definer_user= d->user; + definer_host= d->host; + } + else + { + /* non-SUID trigger. */ + + definer_user.str= 0; + definer_user.length= 0; + + definer_host.str= 0; + definer_host.length= 0; + } + + const LEX_CSTRING command[2]= + {{ C_STRING_WITH_LEN("CREATE ") }, + { C_STRING_WITH_LEN("CREATE OR REPLACE ") }}; + + if (thd->lex->create_info.or_replace()) + stmt_query.append(command[1]); + else + stmt_query.append(command[0]); + + append_definer(thd, &stmt_query, &definer_user, &definer_host); + + LEX_CSTRING stmt_definition; + stmt_definition.str= (char*) thd->lex->stmt_definition_begin; + stmt_definition.length= thd->lex->stmt_definition_end + - thd->lex->stmt_definition_begin; + trim_whitespace(thd->charset(), &stmt_definition); + + stmt_query.append(stmt_definition.str, stmt_definition.length); + + return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(), + buf, buf_len); +} + +void* start_wsrep_THD(void *arg) +{ + THD *thd= NULL; + + Wsrep_thd_args* thd_args= (Wsrep_thd_args*) arg; + + if (my_thread_init() || (!(thd= new THD(next_thread_id(), true)))) + { + goto error; + } + + statistic_increment(thread_created, &LOCK_status); + + thd->real_id=pthread_self(); // Keep purify happy + + my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); + + DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); + thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); + + server_threads.insert(thd); + + /* from bootstrap()... */ + thd->bootstrap=1; + thd->max_client_packet_length= thd->net.max_packet; + thd->security_ctx->master_access= ALL_KNOWN_ACL; + + /* from handle_one_connection... */ + pthread_detach_this_thread(); + + mysql_thread_set_psi_id(thd->thread_id); + thd->thr_create_utime= microsecond_interval_timer(); + + DBUG_EXECUTE_IF("wsrep_simulate_failed_connection_1", goto error; ); +// + /* + handle_one_connection() is normally the only way a thread would + start and would always be on the very high end of the stack , + therefore, the thread stack always starts at the address of the + first local variable of handle_one_connection, which is thd. We + need to know the start of the stack so that we could check for + stack overruns. + */ + DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld", + (long long)thd->thread_id)); + /* now that we've called my_thread_init(), it is safe to call DBUG_* */ + + thd->thread_stack= (char*) &thd; + wsrep_assign_from_threadvars(thd); + wsrep_store_threadvars(thd); + + thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; + thd->security_ctx->skip_grants(); + + /* handle_one_connection() again... */ + thd->proc_info= 0; + thd->set_command(COM_SLEEP); + thd->init_for_queries(); + mysql_mutex_lock(&LOCK_wsrep_slave_threads); + + wsrep_running_threads++; + + switch (thd_args->thread_type()) { + case WSREP_APPLIER_THREAD: + wsrep_running_applier_threads++; + break; + case WSREP_ROLLBACKER_THREAD: + wsrep_running_rollbacker_threads++; + break; + default: + WSREP_ERROR("Incorrect wsrep thread type: %d", thd_args->thread_type()); + break; + } + + mysql_cond_broadcast(&COND_wsrep_slave_threads); + mysql_mutex_unlock(&LOCK_wsrep_slave_threads); + + WSREP_DEBUG("wsrep system thread %llu, %p starting", + thd->thread_id, thd); + thd_args->fun()(thd, static_cast(thd_args)); + + WSREP_DEBUG("wsrep system thread: %llu, %p closing", + thd->thread_id, thd); + + /* Wsrep may reset globals during thread context switches, store globals + before cleanup. */ + wsrep_store_threadvars(thd); + + close_connection(thd, 0); + + mysql_mutex_lock(&LOCK_wsrep_slave_threads); + DBUG_ASSERT(wsrep_running_threads > 0); + wsrep_running_threads--; + + switch (thd_args->thread_type()) { + case WSREP_APPLIER_THREAD: + DBUG_ASSERT(wsrep_running_applier_threads > 0); + wsrep_running_applier_threads--; + break; + case WSREP_ROLLBACKER_THREAD: + DBUG_ASSERT(wsrep_running_rollbacker_threads > 0); + wsrep_running_rollbacker_threads--; + break; + default: + WSREP_ERROR("Incorrect wsrep thread type: %d", thd_args->thread_type()); + break; + } + + delete thd_args; + WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads); + mysql_cond_broadcast(&COND_wsrep_slave_threads); + mysql_mutex_unlock(&LOCK_wsrep_slave_threads); + /* + Note: We can't call THD destructor without crashing + if plugins have not been initialized. However, in most of the + cases this means that pre SE initialization SST failed and + we are going to exit anyway. + */ + if (plugins_are_initialized) + { + net_end(&thd->net); + unlink_thd(thd); + } + else + { + /* + TODO: lightweight cleanup to get rid of: + 'Error in my_thread_global_end(): 2 threads didn't exit' + at server shutdown + */ + server_threads.erase(thd); + } + + delete thd; + my_thread_end(); + return(NULL); + +error: + WSREP_ERROR("Failed to create/initialize system thread"); + + if (thd) + { + close_connection(thd, ER_OUT_OF_RESOURCES); + statistic_increment(aborted_connects, &LOCK_status); + server_threads.erase(thd); + delete thd; + my_thread_end(); + } + delete thd_args; + // This will signal error to wsrep_slave_threads_update + wsrep_thread_create_failed.store(true, std::memory_order_relaxed); + + /* Abort if its the first applier/rollbacker thread. */ + if (!mysqld_server_initialized) + unireg_abort(1); + else + return NULL; +} + +enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit) +{ + switch (unit) + { + case WSREP_FRAG_BYTES: return wsrep::streaming_context::bytes; + case WSREP_FRAG_ROWS: return wsrep::streaming_context::row; + case WSREP_FRAG_STATEMENTS: return wsrep::streaming_context::statement; + default: + DBUG_ASSERT(0); + return wsrep::streaming_context::bytes; + } +} + +bool THD::wsrep_parallel_slave_wait_for_prior_commit() +{ + if (rgi_slave && rgi_slave->is_parallel_exec && wait_for_prior_commit()) + { + return 1; + } + return 0; +} + +/***** callbacks for wsrep service ************/ + +my_bool get_wsrep_recovery() +{ + return wsrep_recovery; +} + +bool wsrep_consistency_check(THD *thd) +{ + return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING; +} + +// Wait until wsrep has reached ready state +void wsrep_wait_ready(THD *thd) +{ + mysql_mutex_lock(&LOCK_wsrep_ready); + while(!wsrep_ready) + { + WSREP_INFO("Waiting to reach ready state"); + mysql_cond_wait(&COND_wsrep_ready, &LOCK_wsrep_ready); + } + WSREP_INFO("ready state reached"); + mysql_mutex_unlock(&LOCK_wsrep_ready); +} + +void wsrep_ready_set(bool ready_value) +{ + WSREP_DEBUG("Setting wsrep_ready to %d", ready_value); + mysql_mutex_lock(&LOCK_wsrep_ready); + wsrep_ready= ready_value; + // Signal if we have reached ready state + if (wsrep_ready) + mysql_cond_signal(&COND_wsrep_ready); + mysql_mutex_unlock(&LOCK_wsrep_ready); +} + + +/* + Commit an empty transaction. + + If the transaction is real and the wsrep transaction is still active, + the transaction did not generate any rows or keys and is committed + as empty. Here the wsrep transaction is rolled back and after statement + step is performed to leave the wsrep transaction in the state as it + never existed. + + This should not be an inline functions as it requires a lot of stack space + because of WSREP_DBUG() usage. It's also not a function that is + frequently called. +*/ + +void wsrep_commit_empty(THD* thd, bool all) +{ + DBUG_ENTER("wsrep_commit_empty"); + WSREP_DEBUG("wsrep_commit_empty for %llu client_state %s client_mode" + " %s trans_state %s sql %s", + thd_get_thread_id(thd), + wsrep::to_c_string(thd->wsrep_cs().state()), + wsrep::to_c_string(thd->wsrep_cs().mode()), + wsrep::to_c_string(thd->wsrep_cs().transaction().state()), + wsrep_thd_query(thd)); + + if (wsrep_is_real(thd, all) && + wsrep_thd_is_local(thd) && + thd->wsrep_trx().active() && + !thd->internal_transaction() && + thd->wsrep_trx().state() != wsrep::transaction::s_committed) + { + /* Here transaction is either empty (i.e. no changes) or + it was CREATE TABLE with no row binlog format or + we have already aborted transaction e.g. because max writeset size + has been reached. */ + DBUG_ASSERT(!wsrep_has_changes(thd) || + (thd->lex->sql_command == SQLCOM_CREATE_TABLE && + !thd->is_current_stmt_binlog_format_row()) || + thd->wsrep_cs().transaction().state() == wsrep::transaction::s_aborted); + bool have_error= wsrep_current_error(thd); + int ret= wsrep_before_rollback(thd, all) || + wsrep_after_rollback(thd, all) || + wsrep_after_statement(thd); + /* The committing transaction was empty but it held some locks and + got BF aborted. As there were no certified changes in the + data, we ignore the deadlock error and rely on error reporting + by storage engine/server. */ + if (!ret && !have_error && wsrep_current_error(thd)) + { + DBUG_ASSERT(wsrep_current_error(thd) == wsrep::e_deadlock_error); + thd->wsrep_cs().reset_error(); + } + if (ret) + { + WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd)); + } + } + DBUG_VOID_RETURN; +} -- cgit v1.2.3