diff options
Diffstat (limited to '')
-rw-r--r-- | sql/slave.cc | 8136 |
1 files changed, 8136 insertions, 0 deletions
diff --git a/sql/slave.cc b/sql/slave.cc new file mode 100644 index 00000000..1da03008 --- /dev/null +++ b/sql/slave.cc @@ -0,0 +1,8136 @@ +/* Copyright (c) 2000, 2017, Oracle and/or its affiliates. + Copyright (c) 2009, 2020, MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */ + + +/** + @addtogroup Replication + @{ + + @file + + @brief Code to run the io thread and the sql thread on the + replication slave. +*/ + +#include "mariadb.h" +#include "sql_priv.h" +#include "slave.h" +#include "sql_parse.h" // execute_init_command +#include "sql_table.h" // mysql_rm_table +#include "rpl_mi.h" +#include "rpl_rli.h" +#include "sql_repl.h" +#include "rpl_filter.h" +#include "repl_failsafe.h" +#include "transaction.h" +#include <thr_alarm.h> +#include <my_dir.h> +#include <sql_common.h> +#include <errmsg.h> +#include <ssl_compat.h> +#include "unireg.h" +#include <mysys_err.h> +#include <signal.h> +#include <mysql.h> +#include <myisam.h> + +#include "sql_base.h" // close_thread_tables +#include "tztime.h" // struct Time_zone +#include "log_event.h" // Rotate_log_event, + // Create_file_log_event, + // Format_description_log_event +#include "wsrep_mysqld.h" +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" +#endif + +class Master_info_index; +Master_info_index *master_info_index; + +#ifdef HAVE_REPLICATION + +#include "rpl_tblmap.h" +#include "debug_sync.h" +#include "rpl_parallel.h" +#include "sql_show.h" +#include "semisync_slave.h" +#include "sql_manager.h" + +#define FLAGSTR(V,F) ((V)&(F)?#F" ":"") + +#define MAX_SLAVE_RETRY_PAUSE 5 +/* + a parameter of sql_slave_killed() to defer the killed status +*/ +#define SLAVE_WAIT_GROUP_DONE 60 +bool use_slave_mask = 0; +MY_BITMAP slave_error_mask; +char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; +uint *slave_transaction_retry_errors; +uint slave_transaction_retry_error_length= 0; +char slave_transaction_retry_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; + +char* slave_load_tmpdir = 0; +Master_info *active_mi= 0; +my_bool replicate_same_server_id; +ulonglong relay_log_space_limit = 0; +ulonglong opt_read_binlog_speed_limit = 0; + +const char *relay_log_index= 0; +const char *relay_log_basename= 0; + +LEX_CSTRING default_master_connection_name= { (char*) "", 0 }; + +/* + When slave thread exits, we need to remember the temporary tables so we + can re-use them on slave start. + + TODO: move the vars below under Master_info +*/ + +int disconnect_slave_event_count = 0, abort_slave_event_count = 0; + +static pthread_key(Master_info*, RPL_MASTER_INFO); + +enum enum_slave_reconnect_actions +{ + SLAVE_RECON_ACT_REG= 0, + SLAVE_RECON_ACT_DUMP= 1, + SLAVE_RECON_ACT_EVENT= 2, + SLAVE_RECON_ACT_MAX +}; + +enum enum_slave_reconnect_messages +{ + SLAVE_RECON_MSG_WAIT= 0, + SLAVE_RECON_MSG_KILLED_WAITING= 1, + SLAVE_RECON_MSG_AFTER= 2, + SLAVE_RECON_MSG_FAILED= 3, + SLAVE_RECON_MSG_COMMAND= 4, + SLAVE_RECON_MSG_KILLED_AFTER= 5, + SLAVE_RECON_MSG_MAX +}; + +static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]= +{ + { + "Waiting to reconnect after a failed registration on master", + "Slave I/O thread killed while waiting to reconnect after a failed \ +registration on master", + "Reconnecting after a failed registration on master", + "failed registering on master, reconnecting to try again, \ +log '%s' at position %llu%s", + "COM_REGISTER_SLAVE", + "Slave I/O thread killed during or after reconnect" + }, + { + "Waiting to reconnect after a failed binlog dump request", + "Slave I/O thread killed while retrying master dump", + "Reconnecting after a failed binlog dump request", + "failed dump request, reconnecting to try again, log '%s' at position %llu%s", + "COM_BINLOG_DUMP", + "Slave I/O thread killed during or after reconnect" + }, + { + "Waiting to reconnect after a failed master event read", + "Slave I/O thread killed while waiting to reconnect after a failed read", + "Reconnecting after a failed master event read", + "Slave I/O thread: Failed reading log event, reconnecting to retry, \ +log '%s' at position %llu%s", + "", + "Slave I/O thread killed during or after a reconnect done to recover from \ +failed read" + } +}; + + +typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; + +static int process_io_rotate(Master_info* mi, Rotate_log_event* rev); +static int process_io_create_file(Master_info* mi, Create_file_log_event* cev); +static bool wait_for_relay_log_space(Relay_log_info* rli); +static bool io_slave_killed(Master_info* mi); +static bool sql_slave_killed(rpl_group_info *rgi); +static int init_slave_thread(THD*, Master_info *, SLAVE_THD_TYPE); +static void make_slave_skip_errors_printable(void); +static void make_slave_transaction_retry_errors_printable(void); +static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); +static int safe_reconnect(THD*, MYSQL*, Master_info*, bool); +static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool); +static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size); +static int queue_event(Master_info* mi,const char* buf,ulong event_len); +static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *, + volatile uint *, bool); +static bool check_io_slave_killed(Master_info *mi, const char *info); +static bool send_show_master_info_data(THD *, Master_info *, bool, String *); +/* + Function to set the slave's max_allowed_packet based on the value + of slave_max_allowed_packet. + + @in_param thd Thread handler for slave + @in_param mysql MySQL connection handle +*/ + +static void set_slave_max_allowed_packet(THD *thd, MYSQL *mysql) +{ + DBUG_ENTER("set_slave_max_allowed_packet"); + // thd and mysql must be valid + DBUG_ASSERT(thd && mysql); + + thd->variables.max_allowed_packet= slave_max_allowed_packet; + thd->net.max_packet_size= slave_max_allowed_packet; + /* + Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O + thread and the mysql->option max_allowed_packet, since a + replication event can become this much larger than + the corresponding packet (query) sent from client to master. + */ + thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER; + /* + Skipping the setting of mysql->net.max_packet size to slave + max_allowed_packet since this is done during mysql_real_connect. + */ + mysql->options.max_allowed_packet= + slave_max_allowed_packet+MAX_LOG_EVENT_HEADER; + DBUG_VOID_RETURN; +} + +/* + Find out which replications threads are running + + SYNOPSIS + init_thread_mask() + mask Return value here + mi master_info for slave + inverse If set, returns which threads are not running + + IMPLEMENTATION + Get a bit mask for which threads are running so that we can later restart + these threads. + + RETURN + mask If inverse == 0, running threads + If inverse == 1, stopped threads +*/ + +void init_thread_mask(int* mask,Master_info* mi,bool inverse) +{ + bool set_io = mi->slave_running, set_sql = mi->rli.slave_running; + int tmp_mask=0; + DBUG_ENTER("init_thread_mask"); + + if (set_io) + tmp_mask |= SLAVE_IO; + if (set_sql) + tmp_mask |= SLAVE_SQL; + if (inverse) + tmp_mask^= (SLAVE_IO | SLAVE_SQL); + *mask = tmp_mask; + DBUG_VOID_RETURN; +} + + +/* + lock_slave_threads() against other threads doing STOP, START or RESET SLAVE + +*/ + +void Master_info::lock_slave_threads() +{ + DBUG_ENTER("lock_slave_threads"); + mysql_mutex_lock(&start_stop_lock); + DBUG_VOID_RETURN; +} + + +/* + unlock_slave_threads() +*/ + +void Master_info::unlock_slave_threads() +{ + DBUG_ENTER("unlock_slave_threads"); + mysql_mutex_unlock(&start_stop_lock); + DBUG_VOID_RETURN; +} + +#ifdef HAVE_PSI_INTERFACE +static PSI_thread_key key_thread_slave_io, key_thread_slave_sql; + +static PSI_thread_info all_slave_threads[]= +{ + { &key_thread_slave_io, "slave_io", PSI_FLAG_GLOBAL}, + { &key_thread_slave_sql, "slave_sql", PSI_FLAG_GLOBAL} +}; + +static void init_slave_psi_keys(void) +{ + const char* category= "sql"; + int count; + + if (PSI_server == NULL) + return; + + count= array_elements(all_slave_threads); + PSI_server->register_thread(category, all_slave_threads, count); +} +#endif /* HAVE_PSI_INTERFACE */ + + +/* + Note: This definition needs to be kept in sync with the one in + mysql_system_tables.sql which is used by mysql_create_db. +*/ +static const char gtid_pos_table_definition1[]= + "CREATE TABLE "; +static const char gtid_pos_table_definition2[]= + " (domain_id INT UNSIGNED NOT NULL, " + "sub_id BIGINT UNSIGNED NOT NULL, " + "server_id INT UNSIGNED NOT NULL, " + "seq_no BIGINT UNSIGNED NOT NULL, " + "PRIMARY KEY (domain_id, sub_id)) CHARSET=latin1 " + "COMMENT='Replication slave GTID position' " + "ENGINE="; + +/* + Build a query string + CREATE TABLE mysql.gtid_slave_pos_<engine> ... ENGINE=<engine> +*/ +static bool +build_gtid_pos_create_query(THD *thd, String *query, + LEX_CSTRING *table_name, + LEX_CSTRING *engine_name) +{ + bool err= false; + err|= query->append(gtid_pos_table_definition1); + err|= append_identifier(thd, query, table_name); + err|= query->append(gtid_pos_table_definition2); + err|= append_identifier(thd, query, engine_name); + return err; +} + + +static int +gtid_pos_table_creation(THD *thd, plugin_ref engine, LEX_CSTRING *table_name) +{ + int err; + StringBuffer<sizeof(gtid_pos_table_definition1) + + sizeof(gtid_pos_table_definition1) + + 2*FN_REFLEN> query; + + if (build_gtid_pos_create_query(thd, &query, table_name, plugin_name(engine))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + + thd->set_db(&MYSQL_SCHEMA_NAME); + thd->clear_error(); + ulonglong thd_saved_option= thd->variables.option_bits; + /* This query shuold not be binlogged. */ + thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG; + thd->set_query_and_id(query.c_ptr(), query.length(), thd->charset(), + next_query_id()); + Parser_state parser_state; + err= parser_state.init(thd, thd->query(), thd->query_length()); + if (err) + goto end; + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state, + FALSE, FALSE); + if (unlikely(thd->is_error())) + err= 1; + /* The warning is relevant to 10.3 and earlier. */ + sql_print_warning("The automatically created table '%s' name may not be " + "entirely in lowercase. The table name will be converted " + "to lowercase to any future upgrade to 10.4.0 and later " + "version where it will be auto-created at once " + "in lowercase.", + table_name->str); +end: + thd->variables.option_bits= thd_saved_option; + thd->reset_query(); + return err; +} + +static THD *new_bg_THD() +{ + THD *thd= new THD(next_thread_id()); + thd->thread_stack= (char*) &thd; + thd->store_globals(); + thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; + thd->security_ctx->skip_grants(); + thd->set_command(COM_DAEMON); + thd->variables.wsrep_on= 0; + return thd; +} + +static void bg_gtid_delete_pending(void *) +{ + THD *thd= new_bg_THD(); + + rpl_slave_state::list_element *list; + list= rpl_global_gtid_slave_state->gtid_grab_pending_delete_list(); + rpl_global_gtid_slave_state->gtid_delete_pending(thd, &list); + if (list) + rpl_global_gtid_slave_state->put_back_list(list); + delete thd; +} + +static void bg_gtid_pos_auto_create(void *hton) +{ + THD *thd= NULL; + int UNINIT_VAR(err); + plugin_ref engine= NULL, *auto_engines; + rpl_slave_state::gtid_pos_table *entry; + StringBuffer<FN_REFLEN> loc_table_name; + LEX_CSTRING table_name; + + /* + Check that the plugin is still in @@gtid_pos_auto_engines, and lock + it. + */ + mysql_mutex_lock(&LOCK_global_system_variables); + for (auto_engines= opt_gtid_pos_auto_plugins; + auto_engines && *auto_engines; + ++auto_engines) + { + if (plugin_hton(*auto_engines) == hton) + { + engine= my_plugin_lock(NULL, *auto_engines); + break; + } + } + mysql_mutex_unlock(&LOCK_global_system_variables); + if (!engine) + { + /* The engine is gone from @@gtid_pos_auto_engines, so no action. */ + goto end; + } + + /* Find the entry for the table to auto-create. */ + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + entry= rpl_global_gtid_slave_state-> + gtid_pos_tables.load(std::memory_order_relaxed); + while (entry) + { + if (entry->table_hton == hton && + entry->state == rpl_slave_state::GTID_POS_CREATE_REQUESTED) + break; + entry= entry->next; + } + if (entry) + { + entry->state = rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS; + err= loc_table_name.append(entry->table_name.str, entry->table_name.length); + } + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (!entry) + goto end; + if (err) + { + sql_print_error("Out of memory while trying to auto-create GTID position table"); + goto end; + } + table_name.str= loc_table_name.c_ptr_safe(); + table_name.length= loc_table_name.length(); + + thd= new_bg_THD(); + err= gtid_pos_table_creation(thd, engine, &table_name); + if (err) + { + sql_print_error("Error auto-creating GTID position table `mysql.%s`: %s Error_code: %d", + table_name.str, thd->get_stmt_da()->message(), + thd->get_stmt_da()->sql_errno()); + thd->clear_error(); + goto end; + } + + /* Now enable the entry for the auto-created table. */ + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + entry= rpl_global_gtid_slave_state-> + gtid_pos_tables.load(std::memory_order_relaxed); + while (entry) + { + if (entry->table_hton == hton && + entry->state == rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS) + { + entry->state= rpl_slave_state::GTID_POS_AVAILABLE; + break; + } + entry= entry->next; + } + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + +end: + delete thd; + if (engine) + plugin_unlock(NULL, engine); +} + +static bool slave_background_thread_gtid_loaded; + +static void bg_rpl_load_gtid_slave_state(void *) +{ + THD *thd= new_bg_THD(); + thd->set_psi(PSI_CALL_get_thread()); + thd_proc_info(thd, "Loading slave GTID position from table"); + if (rpl_load_gtid_slave_state(thd)) + sql_print_warning("Failed to load slave replication state from table " + "%s.%s: %u: %s", "mysql", + rpl_gtid_slave_state_table_name.str, + thd->get_stmt_da()->sql_errno(), + thd->get_stmt_da()->message()); + + // hijacking global_rpl_thread_pool cond here - it's only once on startup + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + slave_background_thread_gtid_loaded= true; + mysql_cond_signal(&global_rpl_thread_pool.COND_rpl_thread_pool); + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + delete thd; +} + +static void bg_slave_kill(void *victim) +{ + THD *to_kill= (THD *)victim; + to_kill->awake(KILL_CONNECTION); + mysql_mutex_lock(&to_kill->LOCK_wakeup_ready); + to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; + mysql_cond_broadcast(&to_kill->COND_wakeup_ready); + mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready); +} + +void slave_background_kill_request(THD *to_kill) +{ + if (to_kill->rgi_slave->killed_for_retry) + return; // Already deadlock killed. + to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_PENDING; + mysql_manager_submit(bg_slave_kill, to_kill); +} + +/* + This function must only be called from a slave SQL thread (or worker thread), + to ensure that the table_entry will not go away before we can lock the + LOCK_slave_state. +*/ +void slave_background_gtid_pos_create_request( + rpl_slave_state::gtid_pos_table *table_entry) +{ + if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) + return; + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + return; + } + table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED; + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + + mysql_manager_submit(bg_gtid_pos_auto_create, table_entry->table_hton); +} + + +/* + Request the manager thread to delete no longer used rows from the + mysql.gtid_slave_pos* tables. +*/ +void slave_background_gtid_pending_delete_request(void) +{ + mysql_manager_submit(bg_gtid_delete_pending, NULL); +} + + +/* Initialize slave structures */ + +int init_slave() +{ + DBUG_ENTER("init_slave"); + int error= 0; + +#ifdef HAVE_PSI_INTERFACE + init_slave_psi_keys(); +#endif + + if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) + return 1; + + slave_background_thread_gtid_loaded= false; + mysql_manager_submit(bg_rpl_load_gtid_slave_state, NULL); + + // hijacking global_rpl_thread_pool cond here - it's only once on startup + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + while (!slave_background_thread_gtid_loaded) + mysql_cond_wait(&global_rpl_thread_pool.COND_rpl_thread_pool, + &global_rpl_thread_pool.LOCK_rpl_thread_pool); + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + + /* + This is called when mysqld starts. Before client connections are + accepted. However bootstrap may conflict with us if it does START SLAVE. + So it's safer to take the lock. + */ + + if (pthread_key_create(&RPL_MASTER_INFO, NULL)) + goto err; + + master_info_index= new Master_info_index; + if (!master_info_index || master_info_index->init_all_master_info()) + { + sql_print_error("Failed to initialize multi master structures"); + DBUG_RETURN(1); + } + if (!(active_mi= new Master_info(&default_master_connection_name, + relay_log_recovery)) || + active_mi->error()) + { + delete active_mi; + active_mi= 0; + sql_print_error("Failed to allocate memory for the Master Info structure"); + goto err; + } + + if (master_info_index->add_master_info(active_mi, FALSE)) + { + delete active_mi; + active_mi= 0; + goto err; + } + + /* + If master_host is not specified, try to read it from the master_info file. + If master_host is specified, create the master_info file if it doesn't + exists. + */ + + if (init_master_info(active_mi,master_info_file,relay_log_info_file, + 1, (SLAVE_IO | SLAVE_SQL))) + { + sql_print_error("Failed to initialize the master info structure"); + goto err; + } + + /* If server id is not set, start_slave_thread() will say it */ + + if (active_mi->host[0] && !opt_skip_slave_start) + { + int error; + THD *thd= new THD(next_thread_id()); + thd->thread_stack= (char*) &thd; + thd->store_globals(); + + error= start_slave_threads(0, /* No active thd */ + 1 /* need mutex */, + 1 /* wait for start*/, + active_mi, + master_info_file, + relay_log_info_file, + SLAVE_IO | SLAVE_SQL); + + thd->reset_globals(); + delete thd; + if (unlikely(error)) + { + sql_print_error("Failed to create slave threads"); + goto err; + } + } + +end: + DBUG_RETURN(error); + +err: + error= 1; + goto end; +} + +/* + Updates the master info based on the information stored in the + relay info and ignores relay logs previously retrieved by the IO + thread, which thus starts fetching again based on to the + group_master_log_pos and group_master_log_name. Eventually, the old + relay logs will be purged by the normal purge mechanism. + + In the feature, we should improve this routine in order to avoid throwing + away logs that are safely stored in the disk. Note also that this recovery + routine relies on the correctness of the relay-log.info and only tolerates + coordinate problems in master.info. + + In this function, there is no need for a mutex as the caller + (i.e. init_slave) already has one acquired. + + Specifically, the following structures are updated: + + 1 - mi->master_log_pos <-- rli->group_master_log_pos + 2 - mi->master_log_name <-- rli->group_master_log_name + 3 - It moves the relay log to the new relay log file, by + rli->group_relay_log_pos <-- BIN_LOG_HEADER_SIZE; + rli->event_relay_log_pos <-- BIN_LOG_HEADER_SIZE; + rli->group_relay_log_name <-- rli->relay_log.get_log_fname(); + rli->event_relay_log_name <-- rli->relay_log.get_log_fname(); + + If there is an error, it returns (1), otherwise returns (0). + */ +int init_recovery(Master_info* mi, const char** errmsg) +{ + DBUG_ENTER("init_recovery"); + + Relay_log_info *rli= &mi->rli; + if (rli->group_master_log_name[0]) + { + mi->master_log_pos= MY_MAX(BIN_LOG_HEADER_SIZE, + rli->group_master_log_pos); + strmake_buf(mi->master_log_name, rli->group_master_log_name); + + sql_print_warning("Recovery from master pos %ld and file %s.", + (ulong) mi->master_log_pos, mi->master_log_name); + + strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); + strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); + + rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; + } + + DBUG_RETURN(0); +} + + +/** + Convert slave skip errors bitmap into a printable string. +*/ + +static void make_slave_skip_errors_printable(void) +{ + /* + To be safe, we want 10 characters of room in the buffer for a number + plus terminators. Also, we need some space for constant strings. + 10 characters must be sufficient for a number plus {',' | '...'} + plus a NUL terminator. That is a max 6 digit number. + */ + const size_t MIN_ROOM= 10; + DBUG_ENTER("make_slave_skip_errors_printable"); + DBUG_ASSERT(sizeof(slave_skip_error_names) > MIN_ROOM); + DBUG_ASSERT(MAX_SLAVE_ERROR <= 999999); // 6 digits + + /* Make @@slave_skip_errors show the nice human-readable value. */ + opt_slave_skip_errors= slave_skip_error_names; + + if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask)) + { + /* purecov: begin tested */ + memcpy(slave_skip_error_names, STRING_WITH_LEN("OFF")); + /* purecov: end */ + } + else if (bitmap_is_set_all(&slave_error_mask)) + { + /* purecov: begin tested */ + memcpy(slave_skip_error_names, STRING_WITH_LEN("ALL")); + /* purecov: end */ + } + else + { + char *buff= slave_skip_error_names; + char *bend= buff + sizeof(slave_skip_error_names) - MIN_ROOM; + int errnum; + + for (errnum= 0; errnum < MAX_SLAVE_ERROR; errnum++) + { + if (bitmap_is_set(&slave_error_mask, errnum)) + { + if (buff >= bend) + break; /* purecov: tested */ + buff= int10_to_str(errnum, buff, 10); + *buff++= ','; + } + } + if (buff != slave_skip_error_names) + buff--; // Remove last ',' + if (errnum < MAX_SLAVE_ERROR) + { + /* Couldn't show all errors */ + buff= strmov(buff, "..."); /* purecov: tested */ + } + *buff=0; + } + DBUG_PRINT("init", ("error_names: '%s'", slave_skip_error_names)); + DBUG_VOID_RETURN; +} + +/* + Init function to set up array for errors that should be skipped for slave + + SYNOPSIS + init_slave_skip_errors() + arg List of errors numbers to skip, separated with ',' + + NOTES + Called from get_options() in mysqld.cc on start-up +*/ + +bool init_slave_skip_errors(const char* arg) +{ + const char *p; + DBUG_ENTER("init_slave_skip_errors"); + + if (!arg || !*arg) // No errors defined + goto end; + + if (unlikely(my_bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))) + DBUG_RETURN(1); + + use_slave_mask= 1; + for (;my_isspace(system_charset_info,*arg);++arg) + /* empty */; + if (!system_charset_info->strnncoll((uchar*)arg,4,(const uchar*)"all",4)) + { + bitmap_set_all(&slave_error_mask); + goto end; + } + for (p= arg ; *p; ) + { + long err_code; + if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code))) + break; + if (err_code < MAX_SLAVE_ERROR) + bitmap_set_bit(&slave_error_mask,(uint)err_code); + while (!my_isdigit(system_charset_info,*p) && *p) + p++; + } + +end: + make_slave_skip_errors_printable(); + DBUG_RETURN(0); +} + +/** + Make printable version if slave_transaction_retry_errors + This is never empty as at least ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT + will be there +*/ + +static void make_slave_transaction_retry_errors_printable(void) +{ + /* + To be safe, we want 10 characters of room in the buffer for a number + plus terminators. Also, we need some space for constant strings. + 10 characters must be sufficient for a number plus {',' | '...'} + plus a NUL terminator. That is a max 6 digit number. + */ + const size_t MIN_ROOM= 10; + char *buff= slave_transaction_retry_error_names; + char *bend= buff + sizeof(slave_transaction_retry_error_names) - MIN_ROOM; + uint i; + DBUG_ENTER("make_slave_transaction_retry_errors_printable"); + DBUG_ASSERT(sizeof(slave_transaction_retry_error_names) > MIN_ROOM); + + /* Make @@slave_transaction_retry_errors show a human-readable value */ + opt_slave_transaction_retry_errors= slave_transaction_retry_error_names; + + for (i= 0; i < slave_transaction_retry_error_length && buff < bend; i++) + { + buff= int10_to_str(slave_transaction_retry_errors[i], buff, 10); + *buff++= ','; + } + if (buff != slave_transaction_retry_error_names) + buff--; // Remove last ',' + if (i < slave_transaction_retry_error_length) + { + /* Couldn't show all errors */ + buff= strmov(buff, "..."); /* purecov: tested */ + } + *buff=0; + DBUG_PRINT("exit", ("error_names: '%s'", + slave_transaction_retry_error_names)); + DBUG_VOID_RETURN; +} + + +#define DEFAULT_SLAVE_RETRY_ERRORS 9 + +bool init_slave_transaction_retry_errors(const char* arg) +{ + const char *p; + long err_code; + uint i; + DBUG_ENTER("init_slave_transaction_retry_errors"); + + /* Handle empty strings */ + if (!arg) + arg= ""; + + slave_transaction_retry_error_length= DEFAULT_SLAVE_RETRY_ERRORS; + for (;my_isspace(system_charset_info,*arg);++arg) + /* empty */; + for (p= arg; *p; ) + { + if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code))) + break; + slave_transaction_retry_error_length++; + while (!my_isdigit(system_charset_info,*p) && *p) + p++; + } + + if (unlikely(!(slave_transaction_retry_errors= + (uint *) my_once_alloc(sizeof(int) * + slave_transaction_retry_error_length, + MYF(MY_WME))))) + DBUG_RETURN(1); + + /* + Temporary error codes: + currently, InnoDB deadlock detected by InnoDB or lock + wait timeout (innodb_lock_wait_timeout exceeded + */ + slave_transaction_retry_errors[0]= ER_NET_READ_ERROR; + slave_transaction_retry_errors[1]= ER_NET_READ_INTERRUPTED; + slave_transaction_retry_errors[2]= ER_NET_ERROR_ON_WRITE; + slave_transaction_retry_errors[3]= ER_NET_WRITE_INTERRUPTED; + slave_transaction_retry_errors[4]= ER_LOCK_WAIT_TIMEOUT; + slave_transaction_retry_errors[5]= ER_LOCK_DEADLOCK; + slave_transaction_retry_errors[6]= ER_CONNECT_TO_FOREIGN_DATA_SOURCE; + slave_transaction_retry_errors[7]= 2013; /* CR_SERVER_LOST */ + slave_transaction_retry_errors[8]= 12701; /* ER_SPIDER_REMOTE_SERVER_GONE_AWAY_NUM */ + + /* Add user codes after this */ + for (p= arg, i= DEFAULT_SLAVE_RETRY_ERRORS; *p; ) + { + if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code))) + break; + if (err_code > 0) + slave_transaction_retry_errors[i++]= (uint) err_code; + while (!my_isdigit(system_charset_info,*p) && *p) + p++; + } + slave_transaction_retry_error_length= i; + + make_slave_transaction_retry_errors_printable(); + DBUG_RETURN(0); +} + + +int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) +{ + DBUG_ENTER("terminate_slave_threads"); + + if (!mi->inited) + DBUG_RETURN(0); /* successfully do nothing */ + int error,force_all = (thread_mask & SLAVE_FORCE_ALL); + int retval= 0; + mysql_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; + mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock(); + + if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) + { + DBUG_PRINT("info",("Terminating SQL thread")); + if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until) + { + mi->rli.stop_for_until= false; + mi->rli.parallel.stop_during_until(); + } + else + mi->rli.abort_slave=1; + if (unlikely((error= terminate_slave_thread(mi->rli.sql_driver_thd, + sql_lock, + &mi->rli.stop_cond, + &mi->rli.slave_running, + skip_lock))) && + !force_all) + DBUG_RETURN(error); + retval= error; + + mysql_mutex_lock(log_lock); + + DBUG_PRINT("info",("Flushing relay-log info file.")); + if (current_thd) + THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file); + if (mi->rli.flush() || my_sync(mi->rli.info_fd, MYF(MY_WME))) + retval= ER_ERROR_DURING_FLUSH_LOGS; + + mysql_mutex_unlock(log_lock); + } + if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) + { + DBUG_PRINT("info",("Terminating IO thread")); + mi->abort_slave=1; + if (unlikely((error= terminate_slave_thread(mi->io_thd, io_lock, + &mi->stop_cond, + &mi->slave_running, + skip_lock))) && + !force_all) + DBUG_RETURN(error); + if (!retval) + retval= error; + + mysql_mutex_lock(log_lock); + + DBUG_PRINT("info",("Flushing relay log and master info file.")); + if (current_thd) + THD_STAGE_INFO(current_thd, stage_flushing_relay_log_and_master_info_repository); + if (likely(mi->fd >= 0)) + { + if (flush_master_info(mi, TRUE, FALSE) || my_sync(mi->fd, MYF(MY_WME))) + retval= ER_ERROR_DURING_FLUSH_LOGS; + } + if (mi->rli.relay_log.is_open() && + my_sync(mi->rli.relay_log.get_log_file()->file, MYF(MY_WME))) + retval= ER_ERROR_DURING_FLUSH_LOGS; + + mysql_mutex_unlock(log_lock); + } + DBUG_RETURN(retval); +} + + +/** + Wait for a slave thread to terminate. + + This function is called after requesting the thread to terminate + (by setting @c abort_slave member of @c Relay_log_info or @c + Master_info structure to 1). Termination of the thread is + controlled with the the predicate <code>*slave_running</code>. + + Function will acquire @c term_lock before waiting on the condition + unless @c skip_lock is true in which case the mutex should be owned + by the caller of this function and will remain acquired after + return from the function. + + @param term_lock + Associated lock to use when waiting for @c term_cond + + @param term_cond + Condition that is signalled when the thread has terminated + + @param slave_running + Pointer to predicate to check for slave thread termination + + @param skip_lock + If @c true the lock will not be acquired before waiting on + the condition. In this case, it is assumed that the calling + function acquires the lock before calling this function. + + @retval 0 All OK ER_SLAVE_NOT_RUNNING otherwise. + + @note If the executing thread has to acquire term_lock (skip_lock + is false), the negative running status does not represent + any issue therefore no error is reported. + + */ +static int +terminate_slave_thread(THD *thd, + mysql_mutex_t *term_lock, + mysql_cond_t *term_cond, + volatile uint *slave_running, + bool skip_lock) +{ + DBUG_ENTER("terminate_slave_thread"); + if (!skip_lock) + { + mysql_mutex_lock(term_lock); + } + else + { + mysql_mutex_assert_owner(term_lock); + } + if (!*slave_running) + { + if (!skip_lock) + { + /* + if run_lock (term_lock) is acquired locally then either + slave_running status is fine + */ + mysql_mutex_unlock(term_lock); + DBUG_RETURN(0); + } + else + { + DBUG_RETURN(ER_SLAVE_NOT_RUNNING); + } + } + DBUG_ASSERT(thd != 0); + THD_CHECK_SENTRY(thd); + + /* + Is is critical to test if the slave is running. Otherwise, we might + be referening freed memory trying to kick it + */ + + while (*slave_running) // Should always be true + { + int error __attribute__((unused)); + DBUG_PRINT("loop", ("killing slave thread")); + + mysql_mutex_lock(&thd->LOCK_thd_kill); + mysql_mutex_lock(&thd->LOCK_thd_data); +#ifndef DONT_USE_THR_ALARM + /* + Error codes from pthread_kill are: + EINVAL: invalid signal number (can't happen) + ESRCH: thread already killed (can happen, should be ignored) + */ + int err __attribute__((unused))= pthread_kill(thd->real_id, thr_client_alarm); + DBUG_ASSERT(err != EINVAL); +#endif + thd->awake_no_mutex(NOT_KILLED); + + mysql_mutex_unlock(&thd->LOCK_thd_kill); + mysql_mutex_unlock(&thd->LOCK_thd_data); + + /* + There is a small chance that slave thread might miss the first + alarm. To protect againts it, resend the signal until it reacts + */ + struct timespec abstime; + set_timespec(abstime,2); + error= mysql_cond_timedwait(term_cond, term_lock, &abstime); + DBUG_ASSERT(error == ETIMEDOUT || error == 0); + } + + DBUG_ASSERT(*slave_running == 0); + + if (!skip_lock) + mysql_mutex_unlock(term_lock); + DBUG_RETURN(0); +} + + +int start_slave_thread( +#ifdef HAVE_PSI_INTERFACE + PSI_thread_key thread_key, +#endif + pthread_handler h_func, mysql_mutex_t *start_lock, + mysql_mutex_t *cond_lock, + mysql_cond_t *start_cond, + volatile uint *slave_running, + volatile ulong *slave_run_id, + Master_info* mi) +{ + pthread_t th; + ulong start_id; + int error; + DBUG_ENTER("start_slave_thread"); + + DBUG_ASSERT(mi->inited); + + if (start_lock) + mysql_mutex_lock(start_lock); + if (!global_system_variables.server_id) + { + if (start_cond) + mysql_cond_broadcast(start_cond); + if (start_lock) + mysql_mutex_unlock(start_lock); + sql_print_error("Server id not set, will not start slave"); + DBUG_RETURN(ER_BAD_SLAVE); + } + + if (*slave_running) + { + if (start_cond) + mysql_cond_broadcast(start_cond); + if (start_lock) + mysql_mutex_unlock(start_lock); + DBUG_RETURN(ER_SLAVE_MUST_STOP); + } + start_id= *slave_run_id; + DBUG_PRINT("info",("Creating new slave thread")); + if (unlikely((error= mysql_thread_create(thread_key, + &th, &connection_attrib, h_func, + (void*)mi)))) + { + sql_print_error("Can't create slave thread (errno= %d).", error); + if (start_lock) + mysql_mutex_unlock(start_lock); + DBUG_RETURN(ER_SLAVE_THREAD); + } + + /* + In the following loop we can't check for thd->killed as we have to + wait until THD structures for the slave thread are created + before we can return. + This should be ok as there is no major work done in the slave + threads before they signal that we can stop waiting. + */ + + if (start_cond && cond_lock) // caller has cond_lock + { + THD* thd = current_thd; + while (start_id == *slave_run_id) + { + DBUG_PRINT("sleep",("Waiting for slave thread to start")); + PSI_stage_info saved_stage= {0, "", 0}; + thd->ENTER_COND(start_cond, cond_lock, + & stage_waiting_for_slave_thread_to_start, + & saved_stage); + /* + It is not sufficient to test this at loop bottom. We must test + it after registering the mutex in enter_cond(). If the kill + happens after testing of thd->killed and before the mutex is + registered, we could otherwise go waiting though thd->killed is + set. + */ + mysql_cond_wait(start_cond, cond_lock); + thd->EXIT_COND(& saved_stage); + mysql_mutex_lock(cond_lock); // re-acquire it as exit_cond() released + } + } + if (start_lock) + mysql_mutex_unlock(start_lock); + DBUG_RETURN(0); +} + + +/* + start_slave_threads() + + NOTES + SLAVE_FORCE_ALL is not implemented here on purpose since it does not make + sense to do that for starting a slave--we always care if it actually + started the threads that were not previously running +*/ + +int start_slave_threads(THD *thd, + bool need_slave_mutex, bool wait_for_start, + Master_info* mi, const char* master_info_fname, + const char* slave_info_fname, int thread_mask) +{ + mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0; + mysql_cond_t* cond_io=0, *cond_sql=0; + int error=0; + const char *errmsg; + DBUG_ENTER("start_slave_threads"); + + if (need_slave_mutex) + { + lock_io = &mi->run_lock; + lock_sql = &mi->rli.run_lock; + } + if (wait_for_start) + { + cond_io = &mi->start_cond; + cond_sql = &mi->rli.start_cond; + lock_cond_io = &mi->run_lock; + lock_cond_sql = &mi->rli.run_lock; + } + + /* + If we are using GTID and both SQL and IO threads are stopped, then get + rid of all relay logs. + + Relay logs are not very useful when using GTID, except as a buffer + between the fetch in the IO thread and the apply in SQL thread. However + while one of the threads is running, they are in use and cannot be + removed. + */ + if (mi->using_gtid != Master_info::USE_GTID_NO && + !mi->slave_running && !mi->rli.slave_running) + { + /* + purge_relay_logs() clears the mi->rli.group_master_log_pos. + So save and restore them, like we do in CHANGE MASTER. + (We are not going to use them for GTID, but it might be worth to + keep them in case connection with GTID fails and user wants to go + back and continue with previous old-style replication coordinates). + */ + mi->master_log_pos = MY_MAX(BIN_LOG_HEADER_SIZE, + mi->rli.group_master_log_pos); + strmake(mi->master_log_name, mi->rli.group_master_log_name, + sizeof(mi->master_log_name)-1); + purge_relay_logs(&mi->rli, thd, 0, &errmsg); + mi->rli.group_master_log_pos= mi->master_log_pos; + strmake(mi->rli.group_master_log_name, mi->master_log_name, + sizeof(mi->rli.group_master_log_name)-1); + + error= rpl_load_gtid_state(&mi->gtid_current_pos, mi->using_gtid == + Master_info::USE_GTID_CURRENT_POS); + mi->events_queued_since_last_gtid= 0; + mi->gtid_reconnect_event_skip_count= 0; + + mi->rli.restart_gtid_pos.reset(); + } + + if (likely(!error) && likely((thread_mask & SLAVE_IO))) + error= start_slave_thread( +#ifdef HAVE_PSI_INTERFACE + key_thread_slave_io, +#endif + handle_slave_io, lock_io, lock_cond_io, + cond_io, + &mi->slave_running, &mi->slave_run_id, + mi); + if (likely(!error) && likely(thread_mask & SLAVE_SQL)) + { + error= start_slave_thread( +#ifdef HAVE_PSI_INTERFACE + key_thread_slave_sql, +#endif + handle_slave_sql, lock_sql, lock_cond_sql, + cond_sql, + &mi->rli.slave_running, &mi->rli.slave_run_id, + mi); + if (unlikely(error)) + terminate_slave_threads(mi, thread_mask & SLAVE_IO, !need_slave_mutex); + } + DBUG_RETURN(error); +} + + +/* + Kill slaves preparing for shutdown +*/ + +void slave_prepare_for_shutdown() +{ + mysql_mutex_lock(&LOCK_active_mi); + master_info_index->free_connections(); + mysql_mutex_unlock(&LOCK_active_mi); + // It's safe to destruct worker pool now when + // all driver threads are gone. + global_rpl_thread_pool.deactivate(); +} + +/* + Release slave threads at time of executing shutdown. +*/ + +void end_slave() +{ + DBUG_ENTER("end_slave"); + + /* + This is called when the server terminates, in close_connections(). + It terminates slave threads. However, some CHANGE MASTER etc may still be + running presently. If a START SLAVE was in progress, the mutex lock below + will make us wait until slave threads have started, and START SLAVE + returns, then we terminate them here. + + We can also be called by cleanup(), which only happens if some + startup parameter to the server was wrong. + */ + mysql_mutex_lock(&LOCK_active_mi); + /* + master_info_index should not have any threads anymore as they where + killed as part of slave_prepare_for_shutdown() + */ + delete master_info_index; + master_info_index= 0; + active_mi= 0; + mysql_mutex_unlock(&LOCK_active_mi); + + global_rpl_thread_pool.destroy(); + free_all_rpl_filters(); + DBUG_VOID_RETURN; +} + +static bool io_slave_killed(Master_info* mi) +{ + DBUG_ENTER("io_slave_killed"); + + DBUG_ASSERT(mi->slave_running); // tracking buffer overrun + DBUG_RETURN(mi->abort_slave || mi->io_thd->killed); +} + +/** + The function analyzes a possible killed status and makes + a decision whether to accept it or not. + Normally upon accepting the sql thread goes to shutdown. + In the event of deffering decision @rli->last_event_start_time waiting + timer is set to force the killed status be accepted upon its expiration. + + @param thd pointer to a THD instance + @param rli pointer to Relay_log_info instance + + @return TRUE the killed status is recognized, FALSE a possible killed + status is deferred. +*/ +static bool sql_slave_killed(rpl_group_info *rgi) +{ + bool ret= FALSE; + Relay_log_info *rli= rgi->rli; + THD *thd= rgi->thd; + DBUG_ENTER("sql_slave_killed"); + + DBUG_ASSERT(rli->sql_driver_thd == thd); + DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun + if (rli->sql_driver_thd->killed || rli->abort_slave) + { + /* + The transaction should always be binlogged if OPTION_KEEP_LOG is + set (it implies that something can not be rolled back). And such + case should be regarded similarly as modifing a + non-transactional table because retrying of the transaction will + lead to an error or inconsistency as well. + + Example: OPTION_KEEP_LOG is set if a temporary table is created + or dropped. + + Note that transaction.all.modified_non_trans_table may be 1 + if last statement was a single row transaction without begin/end. + Testing this flag must always be done in connection with + rli->is_in_group(). + */ + + if ((thd->transaction->all.modified_non_trans_table || + (thd->variables.option_bits & OPTION_KEEP_LOG)) && + rli->is_in_group()) + { + char msg_stopped[]= + "... Slave SQL Thread stopped with incomplete event group " + "having non-transactional changes. " + "If the group consists solely of row-based events, you can try " + "to restart the slave with --slave-exec-mode=IDEMPOTENT, which " + "ignores duplicate key, key not found, and similar errors (see " + "documentation for details)."; + + DBUG_PRINT("info", ("modified_non_trans_table: %d OPTION_BEGIN: %d " + "OPTION_KEEP_LOG: %d is_in_group: %d", + thd->transaction->all.modified_non_trans_table, + MY_TEST(thd->variables.option_bits & OPTION_BEGIN), + MY_TEST(thd->variables.option_bits & OPTION_KEEP_LOG), + rli->is_in_group())); + + if (rli->abort_slave) + { + DBUG_PRINT("info", + ("Request to stop slave SQL Thread received while " + "applying a group that has non-transactional " + "changes; waiting for completion of the group ... ")); + + /* + Slave sql thread shutdown in face of unfinished group + modified Non-trans table is handled via a timer. The slave + may eventually give out to complete the current group and in + that case there might be issues at consequent slave restart, + see the error message. WL#2975 offers a robust solution + requiring to store the last exectuted event's coordinates + along with the group's coordianates instead of waiting with + @c last_event_start_time the timer. + */ + + if (rgi->last_event_start_time == 0) + rgi->last_event_start_time= my_time(0); + ret= difftime(my_time(0), rgi->last_event_start_time) <= + SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE; + + DBUG_EXECUTE_IF("stop_slave_middle_group", + DBUG_EXECUTE_IF("incomplete_group_in_relay_log", + ret= TRUE;);); // time is over + + if (ret == 0) + { + rli->report(WARNING_LEVEL, 0, rgi->gtid_info(), + "Request to stop slave SQL Thread received while " + "applying a group that has non-transactional " + "changes; waiting for completion of the group ... "); + } + else + { + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(), + ER_THD(thd, ER_SLAVE_FATAL_ERROR), msg_stopped); + } + } + else + { + ret= TRUE; + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(), + ER_THD(thd, ER_SLAVE_FATAL_ERROR), + msg_stopped); + } + } + else + { + ret= TRUE; + } + } + if (ret) + rgi->last_event_start_time= 0; + + DBUG_RETURN(ret); +} + + +/* + skip_load_data_infile() + + NOTES + This is used to tell a 3.23 master to break send_file() +*/ + +void skip_load_data_infile(NET *net) +{ + DBUG_ENTER("skip_load_data_infile"); + + (void)net_request_file(net, "/dev/null"); + (void)my_net_read(net); // discard response + (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok + DBUG_VOID_RETURN; +} + + +bool net_request_file(NET* net, const char* fname) +{ + DBUG_ENTER("net_request_file"); + DBUG_RETURN(net_write_command(net, 251, (uchar*) fname, strlen(fname), + (uchar*) "", 0)); +} + +/* + From other comments and tests in code, it looks like + sometimes Query_log_event and Load_log_event can have db == 0 + (see rewrite_db() above for example) + (cases where this happens are unclear; it may be when the master is 3.23). +*/ + +const char *print_slave_db_safe(const char* db) +{ + DBUG_ENTER("*print_slave_db_safe"); + + DBUG_RETURN((db ? db : "")); +} + +#endif /* HAVE_REPLICATION */ + +bool Sql_cmd_show_slave_status::execute(THD *thd) +{ +#ifndef HAVE_REPLICATION + my_ok(thd); + return false; +#else + DBUG_ENTER("Sql_cmd_show_slave_status::execute"); + bool res= true; + + /* Accept one of two privileges */ + if (check_global_access(thd, PRIV_STMT_SHOW_SLAVE_STATUS)) + goto error; + if (is_show_all_slaves_stat()) + { + mysql_mutex_lock(&LOCK_active_mi); + res= show_all_master_info(thd); + mysql_mutex_unlock(&LOCK_active_mi); + } + else + { + LEX_MASTER_INFO *lex_mi= &thd->lex->mi; + Master_info *mi; + if ((mi= get_master_info(&lex_mi->connection_name, + Sql_condition::WARN_LEVEL_ERROR))) + { + res= show_master_info(thd, mi, 0); + mi->release(); + } + } +error: + DBUG_RETURN(res); +#endif +} + +int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, + const char *default_val) +{ + size_t length; + DBUG_ENTER("init_strvar_from_file"); + + if ((length=my_b_gets(f,var, max_size))) + { + char* last_p = var + length -1; + if (*last_p == '\n') + *last_p = 0; // if we stopped on newline, kill it + else + { + /* + If we truncated a line or stopped on last char, remove all chars + up to and including newline. + */ + int c; + while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) ; + } + DBUG_RETURN(0); + } + else if (default_val) + { + strmake(var, default_val, max_size-1); + DBUG_RETURN(0); + } + DBUG_RETURN(1); +} + +/* + when moving these functions to mysys, don't forget to + remove slave.cc from libmysqld/CMakeLists.txt +*/ +int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) +{ + char buf[32]; + DBUG_ENTER("init_intvar_from_file"); + + + if (my_b_gets(f, buf, sizeof(buf))) + { + *var = atoi(buf); + DBUG_RETURN(0); + } + else if (default_val) + { + *var = default_val; + DBUG_RETURN(0); + } + DBUG_RETURN(1); +} + +int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val) +{ + char buf[16]; + DBUG_ENTER("init_floatvar_from_file"); + + + if (my_b_gets(f, buf, sizeof(buf))) + { + if (sscanf(buf, "%f", var) != 1) + DBUG_RETURN(1); + else + DBUG_RETURN(0); + } + else if (default_val != 0.0) + { + *var = default_val; + DBUG_RETURN(0); + } + DBUG_RETURN(1); +} + + +/** + A master info read method + + This function is called from @c init_master_info() along with + relatives to restore some of @c active_mi members. + Particularly, this function is responsible for restoring + IGNORE_SERVER_IDS list of servers whose events the slave is + going to ignore (to not log them in the relay log). + Items being read are supposed to be decimal output of values of a + type shorter or equal of @c long and separated by the single space. + It also used to restore DO_DOMAIN_IDS & IGNORE_DOMAIN_IDS lists. + + @param arr @c DYNAMIC_ARRAY pointer to storage for servers id + @param f @c IO_CACHE pointer to the source file + + @retval 0 All OK + @retval non-zero An error +*/ + +int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f) +{ + int ret= 0; + char buf[16 * (sizeof(long)*4 + 1)]; // static buffer to use most of times + char *buf_act= buf; // actual buffer can be dynamic if static is short + char *token, *last; + uint num_items; // number of items of `arr' + size_t read_size; + DBUG_ENTER("init_dynarray_intvar_from_file"); + + if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0) + { + DBUG_RETURN(0); // no line in master.info + } + if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n') + { + /* + short read happend; allocate sufficient memory and make the 2nd read + */ + char buf_work[(sizeof(long)*3 + 1)*16]; + memcpy(buf_work, buf, sizeof(buf_work)); + num_items= atoi(strtok_r(buf_work, " ", &last)); + size_t snd_size; + /* + max size lower bound approximate estimation bases on the formula: + (the items number + items themselves) * + (decimal size + space) - 1 + `\n' + '\0' + */ + size_t max_size= (1 + num_items) * (sizeof(long)*3 + 1) + 1; + buf_act= (char*) my_malloc(key_memory_Rpl_info_file_buffer, max_size, + MYF(MY_WME)); + memcpy(buf_act, buf, read_size); + snd_size= my_b_gets(f, buf_act + read_size, max_size - read_size); + if (snd_size == 0 || + ((snd_size + 1 == max_size - read_size) && buf_act[max_size - 2] != '\n')) + { + /* + failure to make the 2nd read or short read again + */ + ret= 1; + goto err; + } + } + token= strtok_r(buf_act, " ", &last); + if (token == NULL) + { + ret= 1; + goto err; + } + num_items= atoi(token); + for (uint i=0; i < num_items; i++) + { + token= strtok_r(NULL, " ", &last); + if (token == NULL) + { + ret= 1; + goto err; + } + else + { + ulong val= atol(token); + insert_dynamic(arr, (uchar *) &val); + } + } +err: + if (buf_act != buf) + my_free(buf_act); + DBUG_RETURN(ret); +} + +#ifdef HAVE_REPLICATION + +/* + Check if the error is caused by network. + @param[in] errorno Number of the error. + RETURNS: + TRUE network error + FALSE not network error +*/ + +bool is_network_error(uint errorno) +{ + if (errorno == CR_CONNECTION_ERROR || + errorno == CR_CONN_HOST_ERROR || + errorno == CR_SERVER_GONE_ERROR || + errorno == CR_SERVER_LOST || + errorno == ER_CON_COUNT_ERROR || + errorno == ER_CONNECTION_KILLED || + errorno == ER_NEW_ABORTING_CONNECTION || + errorno == ER_NET_READ_INTERRUPTED || + errorno == ER_SERVER_SHUTDOWN) + return TRUE; +#ifdef WITH_WSREP + if (errorno == ER_UNKNOWN_COM_ERROR) + return TRUE; +#endif + + return FALSE; +} + + +/* + Note that we rely on the master's version (3.23, 4.0.14 etc) instead of + relying on the binlog's version. This is not perfect: imagine an upgrade + of the master without waiting that all slaves are in sync with the master; + then a slave could be fooled about the binlog's format. This is what happens + when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0 + slaves are fooled. So we do this only to distinguish between 3.23 and more + recent masters (it's too late to change things for 3.23). + + RETURNS + 0 ok + 1 error + 2 transient network problem, the caller should try to reconnect +*/ + +static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) +{ + char err_buff[MAX_SLAVE_ERRMSG], err_buff2[MAX_SLAVE_ERRMSG]; + const char* errmsg= 0; + int err_code= 0; + MYSQL_RES *master_res= 0; + MYSQL_ROW master_row; + uint version= mysql_get_server_version(mysql) / 10000; + DBUG_ENTER("get_master_version_and_clock"); + + /* + Free old description_event_for_queue (that is needed if we are in + a reconnection). + */ + delete mi->rli.relay_log.description_event_for_queue; + mi->rli.relay_log.description_event_for_queue= 0; + + if (!my_isdigit(&my_charset_bin,*mysql->server_version)) + { + errmsg= err_buff2; + snprintf(err_buff2, sizeof(err_buff2), + "Master reported unrecognized MySQL version: %s", + mysql->server_version); + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, ER_DEFAULT(err_code), err_buff2); + } + else + { + /* + Note the following switch will bug when we have MySQL branch 30 ;) + */ + switch (version) { + case 0: + case 1: + case 2: + errmsg= err_buff2; + snprintf(err_buff2, sizeof(err_buff2), + "Master reported unrecognized MySQL version: %s", + mysql->server_version); + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, ER_DEFAULT(err_code), err_buff2); + break; + case 3: + mi->rli.relay_log.description_event_for_queue= new + Format_description_log_event(1, mysql->server_version); + break; + case 4: + mi->rli.relay_log.description_event_for_queue= new + Format_description_log_event(3, mysql->server_version); + break; + default: + /* + Master is MySQL >=5.0. Give a default Format_desc event, so that we can + take the early steps (like tests for "is this a 3.23 master") which we + have to take before we receive the real master's Format_desc which will + override this one. Note that the Format_desc we create below is garbage + (it has the format of the *slave*); it's only good to help know if the + master is 3.23, 4.0, etc. + */ + mi->rli.relay_log.description_event_for_queue= new + Format_description_log_event(4, mysql->server_version); + break; + } + } + + /* + This does not mean that a 5.0 slave will be able to read a 6.0 master; but + as we don't know yet, we don't want to forbid this for now. If a 5.0 slave + can't read a 6.0 master, this will show up when the slave can't read some + events sent by the master, and there will be error messages. + */ + + if (errmsg) + goto err; + + /* as we are here, we tried to allocate the event */ + if (!mi->rli.relay_log.description_event_for_queue) + { + errmsg= "default Format_description_log_event"; + err_code= ER_SLAVE_CREATE_EVENT_FAILURE; + sprintf(err_buff, ER_DEFAULT(err_code), errmsg); + goto err; + } + + /* + FD_q's (A) is set initially from RL's (A): FD_q.(A) := RL.(A). + It's necessary to adjust FD_q.(A) at this point because in the following + course FD_q is going to be dumped to RL. + Generally FD_q is derived from a received FD_m (roughly FD_q := FD_m) + in queue_event and the master's (A) is installed. + At one step with the assignment the Relay-Log's checksum alg is set to + a new value: RL.(A) := FD_q.(A). If the slave service is stopped + the last time assigned RL.(A) will be passed over to the restarting + service (to the current execution point). + RL.A is a "codec" to verify checksum in queue_event() almost all the time + the first fake Rotate event. + Starting from this point IO thread will executes the following checksum + warmup sequence of actions: + + FD_q.A := RL.A, + A_m^0 := master.@@global.binlog_checksum, + {queue_event(R_f): verifies(R_f, A_m^0)}, + {queue_event(FD_m): verifies(FD_m, FD_m.A), dump(FD_q), rotate(RL), + FD_q := FD_m, RL.A := FD_q.A)} + + See legends definition on MYSQL_BIN_LOG::relay_log_checksum_alg + docs lines (binlog.h). + In above A_m^0 - the value of master's + @@binlog_checksum determined in the upcoming handshake (stored in + mi->checksum_alg_before_fd). + + + After the warm-up sequence IO gets to "normal" checksum verification mode + to use RL.A in + + {queue_event(E_m): verifies(E_m, RL.A)} + + until it has received a new FD_m. + */ + mi->rli.relay_log.description_event_for_queue->checksum_alg= + mi->rli.relay_log.relay_log_checksum_alg; + + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg != + BINLOG_CHECKSUM_ALG_UNDEF); + DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg != + BINLOG_CHECKSUM_ALG_UNDEF); + /* + Compare the master and slave's clock. Do not die if master's clock is + unavailable (very old master not supporting UNIX_TIMESTAMP()?). + */ + +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF("dbug.before_get_UNIX_TIMESTAMP", + { + const char act[]= + "now " + "wait_for signal.get_unix_timestamp"; + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action(current_thd, + STRING_WITH_LEN(act))); + };); +#endif + + master_res= NULL; + if (!mysql_real_query(mysql, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res))) + { + mysql_mutex_lock(&mi->data_lock); + mi->clock_diff_with_master= + (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10)); + mysql_mutex_unlock(&mi->data_lock); + } + else if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "Get master clock failed with error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + mysql_mutex_lock(&mi->data_lock); + mi->clock_diff_with_master= 0; /* The "most sensible" value */ + mysql_mutex_unlock(&mi->data_lock); + sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, " + "do not trust column Seconds_Behind_Master of SHOW " + "SLAVE STATUS. Error: %s (%d)", + mysql_error(mysql), mysql_errno(mysql)); + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + + /* + Check that the master's server id and ours are different. Because if they + are equal (which can result from a simple copy of master's datadir to slave, + thus copying some my.cnf), replication will work but all events will be + skipped. + Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old + master?). + Note: we could have put a @@SERVER_ID in the previous SELECT + UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters. + */ +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF("dbug.before_get_SERVER_ID", + { + const char act[]= + "now " + "wait_for signal.get_server_id"; + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action(current_thd, + STRING_WITH_LEN(act))); + };); +#endif + master_res= NULL; + master_row= NULL; + if (!mysql_real_query(mysql, + STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res))) + { + if ((global_system_variables.server_id == + (mi->master_id= strtoul(master_row[1], 0, 10))) && + !mi->rli.replicate_same_server_id) + { + errmsg= "The slave I/O thread stops because master and slave have equal \ +MySQL server ids; these ids must be different for replication to work (or \ +the --replicate-same-server-id option must be used on slave but this does \ +not always make sense; please check the manual before using it)."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, ER_DEFAULT(err_code), errmsg); + goto err; + } + } + else if (mysql_errno(mysql)) + { + if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "Get master SERVER_ID failed with error: %s", mysql_error(mysql)); + goto network_err; + } + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is encountered \ +when it try to get the value of SERVER_ID variable from master."; + err_code= mysql_errno(mysql); + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + else if (!master_row && master_res) + { + mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE, NULL, + "Unknown system variable 'SERVER_ID' on master, \ +maybe it is a *VERY OLD MASTER*."); + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + if (mi->master_id == 0 && mi->ignore_server_ids.elements > 0) + { + errmsg= "Slave configured with server id filtering could not detect the master server id."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, ER_DEFAULT(err_code), errmsg); + goto err; + } + + /* + Check that the master's global character_set_server and ours are the same. + Not fatal if query fails (old master?). + Note that we don't check for equality of global character_set_client and + collation_connection (neither do we prevent their setting in + set_var.cc). That's because from what I (Guilhem) have tested, the global + values of these 2 are never used (new connections don't use them). + We don't test equality of global collation_database either as it's is + going to be deprecated (made read-only) in 4.1 very soon. + The test is only relevant if master < 5.0.3 (we'll test only if it's older + than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores + charset info in each binlog event. + We don't do it for 3.23 because masters <3.23.50 hang on + SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we + test only if master is 4.x. + */ + + /* redundant with rest of code but safer against later additions */ + if (version == 3) + goto err; + + if (version == 4) + { + master_res= NULL; + if (!mysql_real_query(mysql, + STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res))) + { + if (strcmp(master_row[0], global_system_variables.collation_server->name)) + { + errmsg= "The slave I/O thread stops because master and slave have \ +different values for the COLLATION_SERVER global variable. The values must \ +be equal for the Statement-format replication to work"; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, ER_DEFAULT(err_code), errmsg); + goto err; + } + } + else if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "Get master COLLATION_SERVER failed with error: %s", mysql_error(mysql)); + goto network_err; + } + else if (mysql_errno(mysql) != ER_UNKNOWN_SYSTEM_VARIABLE) + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is encountered \ +when it try to get the value of COLLATION_SERVER global variable from master."; + err_code= mysql_errno(mysql); + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + else + mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE, NULL, + "Unknown system variable 'COLLATION_SERVER' on master, \ +maybe it is a *VERY OLD MASTER*. *NOTE*: slave may experience \ +inconsistency if replicated data deals with collation."); + + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + } + + /* + Perform analogous check for time zone. Theoretically we also should + perform check here to verify that SYSTEM time zones are the same on + slave and master, but we can't rely on value of @@system_time_zone + variable (it is time zone abbreviation) since it determined at start + time and so could differ for slave and master even if they are really + in the same system time zone. So we are omiting this check and just + relying on documentation. Also according to Monty there are many users + who are using replication between servers in various time zones. Hence + such check will broke everything for them. (And now everything will + work for them because by default both their master and slave will have + 'SYSTEM' time zone). + This check is only necessary for 4.x masters (and < 5.0.4 masters but + those were alpha). + */ + if (version == 4) + { + master_res= NULL; + if (!mysql_real_query(mysql, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res))) + { + if (strcmp(master_row[0], + global_system_variables.time_zone->get_name()->ptr())) + { + errmsg= "The slave I/O thread stops because master and slave have \ +different values for the TIME_ZONE global variable. The values must \ +be equal for the Statement-format replication to work"; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, ER_DEFAULT(err_code), errmsg); + goto err; + } + } + else if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + else if (is_network_error(err_code= mysql_errno(mysql))) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Get master TIME_ZONE failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE) + { + /* We use ERROR_LEVEL to get the error logged to file */ + mi->report(ERROR_LEVEL, err_code, NULL, + + "MySQL master doesn't have a TIME_ZONE variable. Note that" + "if your timezone is not same between master and slave, your " + "slave may get wrong data into timestamp columns"); + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is encountered \ +when it try to get the value of TIME_ZONE global variable from master."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + } + + if (mi->heartbeat_period != 0.0) + { + const char query_format[]= "SET @master_heartbeat_period= %llu"; + char query[sizeof(query_format) + 32]; + /* + the period is an ulonglong of nano-secs. + */ + my_snprintf(query, sizeof(query), query_format, + (ulonglong) (mi->heartbeat_period*1000000000UL)); + + DBUG_EXECUTE_IF("simulate_slave_heartbeat_network_error", + { static ulong dbug_count= 0; + if (++dbug_count < 3) + goto heartbeat_network_error; + }); + if (mysql_real_query(mysql, query, (ulong)strlen(query))) + { + if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + + if (is_network_error(mysql_errno(mysql))) + { + IF_DBUG(heartbeat_network_error: , ) + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "SET @master_heartbeat_period to master failed with error: %s", + mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is encountered " + "when it tries to SET @master_heartbeat_period on master."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto err; + } + } + mysql_free_result(mysql_store_result(mysql)); + } + + /* + Querying if master is capable to checksum and notifying it about own + CRC-awareness. The master's side instant value of @@global.binlog_checksum + is stored in the dump thread's uservar area as well as cached locally + to become known in consensus by master and slave. + */ + DBUG_EXECUTE_IF("simulate_slave_unaware_checksum", + mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_OFF; + goto past_checksum;); + { + int rc; + const char query[]= "SET @master_binlog_checksum= @@global.binlog_checksum"; + master_res= NULL; + mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; //initially undefined + /* + @c checksum_alg_before_fd is queried from master in this block. + If master is old checksum-unaware the value stays undefined. + Once the first FD will be received its alg descriptor will replace + the being queried one. + */ + rc= mysql_real_query(mysql, query,(ulong)strlen(query)); + if (rc != 0) + { + if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + + if (mysql_errno(mysql) == ER_UNKNOWN_SYSTEM_VARIABLE) + { + /* Ignore this expected error if not a high error level */ + if (global_system_variables.log_warnings > 1) + { + // this is tolerable as OM -> NS is supported + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "Notifying master by %s failed with " + "error: %s", query, mysql_error(mysql)); + } + } + else + { + if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "Notifying master by %s failed with " + "error: %s", query, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto network_err; + } + else + { + errmsg= "The slave I/O thread stops because a fatal error is encountered " + "when it tried to SET @master_binlog_checksum on master."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto err; + } + } + } + else + { + mysql_free_result(mysql_store_result(mysql)); + if (!mysql_real_query(mysql, + STRING_WITH_LEN("SELECT @master_binlog_checksum")) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res)) && + (master_row[0] != NULL)) + { + mi->checksum_alg_before_fd= (enum_binlog_checksum_alg) + (find_type(master_row[0], &binlog_checksum_typelib, 1) - 1); + // valid outcome is either of + DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF || + mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32); + } + else if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "Get master BINLOG_CHECKSUM failed with error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + errmsg= "The slave I/O thread stops because a fatal error is encountered " + "when it tried to SELECT @master_binlog_checksum."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto err; + } + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + } + +#ifndef DBUG_OFF +past_checksum: +#endif + + /* + Request the master to filter away events with the @@skip_replication flag + set, if we are running with + --replicate-events-marked-for-skip=FILTER_ON_MASTER. + */ + if (opt_replicate_events_marked_for_skip == RPL_SKIP_FILTER_ON_MASTER) + { + if (unlikely(mysql_real_query(mysql, + STRING_WITH_LEN("SET skip_replication=1")))) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting master-side filtering of @@skip_replication failed " + "with error: %s", mysql_error(mysql)); + goto network_err; + } + else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE) + { + /* + The master is older than the slave and does not support the + @@skip_replication feature. + This is not a problem, as such master will not generate events with + the @@skip_replication flag set in the first place. We will still + do slave-side filtering of such events though, to handle the (rare) + case of downgrading a master and receiving old events generated from + before the downgrade with the @@skip_replication flag set. + */ + DBUG_PRINT("info", ("Old master does not support master-side filtering " + "of @@skip_replication events.")); + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to request filtering of events marked " + "with the @@skip_replication flag."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } + + /* Announce MariaDB slave capabilities. */ + DBUG_EXECUTE_IF("simulate_slave_capability_none", goto after_set_capability;); + { + int rc= DBUG_EVALUATE_IF("simulate_slave_capability_old_53", + mysql_real_query(mysql, STRING_WITH_LEN("SET @mariadb_slave_capability=" + STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_ANNOTATE))), + mysql_real_query(mysql, STRING_WITH_LEN("SET @mariadb_slave_capability=" + STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_MINE)))); + if (unlikely(rc)) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @mariadb_slave_capability failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @mariadb_slave_capability."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } +#ifndef DBUG_OFF +after_set_capability: +#endif + + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* Request dump to start from slave replication GTID state. */ + int rc; + char str_buf[256]; + String query_str(str_buf, sizeof(str_buf), system_charset_info); + query_str.length(0); + + /* + Read the master @@GLOBAL.gtid_domain_id variable. + This is mostly to check that master is GTID aware, but we could later + perhaps use it to check that different multi-source masters are correctly + configured with distinct domain_id. + */ + if (mysql_real_query(mysql, + STRING_WITH_LEN("SELECT @@GLOBAL.gtid_domain_id")) || + !(master_res= mysql_store_result(mysql)) || + !(master_row= mysql_fetch_row(master_res))) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Get master @@GLOBAL.gtid_domain_id failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + errmsg= "The slave I/O thread stops because master does not support " + "MariaDB global transaction id. A fatal error is encountered when " + "it tries to SELECT @@GLOBAL.gtid_domain_id."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + mysql_free_result(master_res); + master_res= NULL; + + query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + if (mi->gtid_current_pos.append_to_string(&query_str)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_connect_state."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + query_str.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (unlikely(rc)) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @slave_connect_state failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_connect_state."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + + query_str.length(0); + if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_strict_mode="), + system_charset_info) || + query_str.append_ulonglong(opt_gtid_strict_mode != false)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (unlikely(rc)) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @slave_gtid_strict_mode failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + + query_str.length(0); + if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_ignore_duplicates="), + system_charset_info) || + query_str.append_ulonglong(opt_gtid_ignore_duplicates != false)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory error " + "is encountered when it tries to set @slave_gtid_ignore_duplicates."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (unlikely(rc)) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @slave_gtid_ignore_duplicates failed with " + "error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_gtid_ignore_duplicates."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID) + { + query_str.length(0); + query_str.append(STRING_WITH_LEN("SET @slave_until_gtid='"), + system_charset_info); + if (mi->rli.until_gtid_pos.append_to_string(&query_str)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_until_gtid."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + query_str.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (unlikely(rc)) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @slave_until_gtid failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_until_gtid."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } + } + else + { + /* + If we are not using GTID to connect this time, then instead request + the corresponding GTID position from the master, so that the user + can reconnect the next time using MASTER_GTID_POS=AUTO. + */ + char quote_buf[2*sizeof(mi->master_log_name)+1]; + char str_buf[28+2*sizeof(mi->master_log_name)+10]; + String query(str_buf, sizeof(str_buf), system_charset_info); + query.length(0); + + query.append("SELECT binlog_gtid_pos('"); + escape_quotes_for_mysql(&my_charset_bin, quote_buf, sizeof(quote_buf), + mi->master_log_name, strlen(mi->master_log_name)); + query.append(quote_buf); + query.append("',"); + query.append_ulonglong(mi->master_log_pos); + query.append(")"); + + if (!mysql_real_query(mysql, query.c_ptr_safe(), query.length()) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res)) && + (master_row[0] != NULL)) + { + rpl_global_gtid_slave_state->load(mi->io_thd, master_row[0], + strlen(master_row[0]), false, false); + } + else if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "Get master GTID position failed with error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + /* + ToDo: If the master does not have the binlog_gtid_pos() function, it + just means that it is an old master with no GTID support, so we should + do nothing. + + However, if binlog_gtid_pos() exists, but fails or returns NULL, then + it means that the requested position is not valid. We could use this + to catch attempts to replicate from within the middle of an event, + avoiding strange failures or possible corruption. + */ + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + } + +err: + if (errmsg) + { + if (master_res) + mysql_free_result(master_res); + DBUG_ASSERT(err_code != 0); + mi->report(ERROR_LEVEL, err_code, NULL, "%s", err_buff); + DBUG_RETURN(1); + } + + DBUG_RETURN(0); + +network_err: + if (master_res) + mysql_free_result(master_res); + DBUG_RETURN(2); + +slave_killed_err: + if (master_res) + mysql_free_result(master_res); + DBUG_RETURN(2); +} + + +static bool wait_for_relay_log_space(Relay_log_info* rli) +{ + bool slave_killed=0; + bool ignore_log_space_limit; + Master_info* mi = rli->mi; + PSI_stage_info old_stage; + THD* thd = mi->io_thd; + DBUG_ENTER("wait_for_relay_log_space"); + + mysql_mutex_lock(&rli->log_space_lock); + thd->ENTER_COND(&rli->log_space_cond, + &rli->log_space_lock, + &stage_waiting_for_relay_log_space, + &old_stage); + while (rli->log_space_limit < rli->log_space_total && + !(slave_killed=io_slave_killed(mi)) && + !rli->ignore_log_space_limit) + mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock); + + ignore_log_space_limit= rli->ignore_log_space_limit; + rli->ignore_log_space_limit= 0; + + thd->EXIT_COND(&old_stage); + + /* + Makes the IO thread read only one event at a time + until the SQL thread is able to purge the relay + logs, freeing some space. + + Therefore, once the SQL thread processes this next + event, it goes to sleep (no more events in the queue), + sets ignore_log_space_limit=true and wakes the IO thread. + However, this event may have been enough already for + the SQL thread to purge some log files, freeing + rli->log_space_total . + + This guarantees that the SQL and IO thread move + forward only one event at a time (to avoid deadlocks), + when the relay space limit is reached. It also + guarantees that when the SQL thread is prepared to + rotate (to be able to purge some logs), the IO thread + will know about it and will rotate. + + NOTE: The ignore_log_space_limit is only set when the SQL + thread sleeps waiting for events. + + */ + + if (ignore_log_space_limit) + { +#ifndef DBUG_OFF + { + DBUG_PRINT("info", ("log_space_limit=%llu log_space_total=%llu " + "ignore_log_space_limit=%d " + "sql_force_rotate_relay=%d", + rli->log_space_limit, uint64(rli->log_space_total), + (int) rli->ignore_log_space_limit, + (int) rli->sql_force_rotate_relay)); + } +#endif + if (rli->sql_force_rotate_relay) + { + mysql_mutex_lock(&mi->data_lock); + rotate_relay_log(rli->mi); + mysql_mutex_unlock(&mi->data_lock); + rli->sql_force_rotate_relay= false; + } + } + + DBUG_RETURN(slave_killed); +} + + +/* + Builds a Rotate from the ignored events' info and writes it to relay log. + + SYNOPSIS + write_ignored_events_info_to_relay_log() + thd pointer to I/O thread's thd + mi + + DESCRIPTION + Slave I/O thread, going to die, must leave a durable trace of the + ignored events' end position for the use of the slave SQL thread, by + calling this function. Only that thread can call it (see assertion). + */ +static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) +{ + Relay_log_info *rli= &mi->rli; + mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); + DBUG_ENTER("write_ignored_events_info_to_relay_log"); + + DBUG_ASSERT(thd == mi->io_thd); + mysql_mutex_lock(log_lock); + if (rli->ign_master_log_name_end[0] || rli->ign_gtids.count()) + { + Rotate_log_event *rev= NULL; + Gtid_list_log_event *glev= NULL; + if (rli->ign_master_log_name_end[0]) + { + rev= new Rotate_log_event(rli->ign_master_log_name_end, + 0, rli->ign_master_log_pos_end, + Rotate_log_event::DUP_NAME); + rli->ign_master_log_name_end[0]= 0; + if (unlikely(!(bool)rev)) + mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_CREATE_EVENT_FAILURE), + "Rotate_event (out of memory?)," + " SHOW SLAVE STATUS may be inaccurate"); + } + if (rli->ign_gtids.count()) + { + DBUG_ASSERT(!rli->is_in_group()); // Ensure no active transaction + glev= new Gtid_list_log_event(&rli->ign_gtids, + Gtid_list_log_event::FLAG_IGN_GTIDS); + rli->ign_gtids.reset(); + if (unlikely(!(bool)glev)) + mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_CREATE_EVENT_FAILURE), + "Gtid_list_event (out of memory?)," + " gtid_slave_pos may be inaccurate"); + } + + /* Can unlock before writing as slave SQL thd will soon see our event. */ + mysql_mutex_unlock(log_lock); + if (rev) + { + DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); + rev->server_id= 0; // don't be ignored by slave SQL thread + if (unlikely(rli->relay_log.append(rev))) + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "failed to write a Rotate event" + " to the relay log, SHOW SLAVE STATUS may be" + " inaccurate"); + delete rev; + } + if (glev) + { + DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events")); + glev->server_id= 0; // don't be ignored by slave SQL thread + glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos + if (unlikely(rli->relay_log.append(glev))) + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "failed to write a Gtid_list event to the relay log, " + "gtid_slave_pos may be inaccurate"); + delete glev; + } + if (likely (rev || glev)) + { + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + if (flush_master_info(mi, TRUE, TRUE)) + sql_print_error("Failed to flush master info file"); + } + } + else + mysql_mutex_unlock(log_lock); + DBUG_VOID_RETURN; +} + + +int register_slave_on_master(MYSQL* mysql, Master_info *mi, + bool *suppress_warnings) +{ + uchar buf[1024], *pos= buf; + size_t report_host_len=0, report_user_len=0, report_password_len=0; + DBUG_ENTER("register_slave_on_master"); + + *suppress_warnings= FALSE; + if (report_host) + report_host_len= strlen(report_host); + if (report_host_len > HOSTNAME_LENGTH) + { + sql_print_warning("The length of report_host is %zu. " + "It is larger than the max length(%d), so this " + "slave cannot be registered to the master.", + report_host_len, HOSTNAME_LENGTH); + DBUG_RETURN(0); + } + + if (report_user) + report_user_len= strlen(report_user); + if (report_user_len > USERNAME_LENGTH) + { + sql_print_warning("The length of report_user is %zu. " + "It is larger than the max length(%d), so this " + "slave cannot be registered to the master.", + report_user_len, USERNAME_LENGTH); + DBUG_RETURN(0); + } + + if (report_password) + report_password_len= strlen(report_password); + if (report_password_len > MAX_PASSWORD_LENGTH) + { + sql_print_warning("The length of report_password is %zu. " + "It is larger than the max length(%d), so this " + "slave cannot be registered to the master.", + report_password_len, MAX_PASSWORD_LENGTH); + DBUG_RETURN(0); + } + + int4store(pos, global_system_variables.server_id); pos+= 4; + pos= net_store_data(pos, (uchar*) report_host, report_host_len); + pos= net_store_data(pos, (uchar*) report_user, report_user_len); + pos= net_store_data(pos, (uchar*) report_password, report_password_len); + int2store(pos, (uint16) report_port); pos+= 2; + /* + Fake rpl_recovery_rank, which was removed in BUG#13963, + so that this server can register itself on old servers, + see BUG#49259. + */ + int4store(pos, /* rpl_recovery_rank */ 0); pos+= 4; + /* The master will fill in master_id */ + int4store(pos, 0); pos+= 4; + + if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (ulong) (pos- buf), 0)) + { + if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) + { + *suppress_warnings= TRUE; // Suppress reconnect warning + } + else if (!check_io_slave_killed(mi, NULL)) + { + char buf[256]; + my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql), + mysql_errno(mysql)); + mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, NULL, + ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf); + } + DBUG_RETURN(1); + } + DBUG_RETURN(0); +} + + +/** + Execute a SHOW SLAVE STATUS statement. + + @param thd Pointer to THD object for the client thread executing the + statement. + + @param mi Pointer to Master_info object for the IO thread. + + @retval FALSE success + @retval TRUE failure +*/ + +bool show_master_info(THD *thd, Master_info *mi, bool full) +{ + DBUG_ENTER("show_master_info"); + String gtid_pos; + List<Item> field_list; + + if (full && rpl_global_gtid_slave_state->tostring(>id_pos, NULL, 0)) + DBUG_RETURN(TRUE); + show_master_info_get_fields(thd, &field_list, full, gtid_pos.length()); + if (thd->protocol->send_result_set_metadata(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) + DBUG_RETURN(TRUE); + if (send_show_master_info_data(thd, mi, full, >id_pos)) + DBUG_RETURN(TRUE); + my_eof(thd); + DBUG_RETURN(FALSE); +} + +void show_master_info_get_fields(THD *thd, List<Item> *field_list, + bool full, size_t gtid_pos_length) +{ + Master_info *mi; + MEM_ROOT *mem_root= thd->mem_root; + DBUG_ENTER("show_master_info_get_fields"); + + if (full) + { + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Connection_name", + MAX_CONNECTION_NAME), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Slave_SQL_State", 30), + mem_root); + } + + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Slave_IO_State", 30), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_Host", sizeof(mi->host)), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_User", sizeof(mi->user)), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Master_Port", 7, MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Connect_Retry", 10, + MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_Log_File", FN_REFLEN), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Read_Master_Log_Pos", 10, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Relay_Log_File", FN_REFLEN), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Relay_Log_Pos", 10, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Relay_Master_Log_File", + FN_REFLEN), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Slave_IO_Running", 3), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Slave_SQL_Running", 3), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Do_DB", 20), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Ignore_DB", 20), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Do_Table", 20), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Ignore_Table", 23), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Wild_Do_Table", 24), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Wild_Ignore_Table", + 28), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Last_Errno", 4, MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Last_Error", 20), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Skip_Counter", 10, + MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Exec_Master_Log_Pos", 10, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Relay_Log_Space", 10, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Until_Condition", 6), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Until_Log_File", FN_REFLEN), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Until_Log_Pos", 10, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_Allowed", 7), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_CA_File", + sizeof(mi->ssl_ca)), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_CA_Path", + sizeof(mi->ssl_capath)), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_Cert", + sizeof(mi->ssl_cert)), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_Cipher", + sizeof(mi->ssl_cipher)), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_Key", + sizeof(mi->ssl_key)), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Seconds_Behind_Master", 10, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_Verify_Server_Cert", + 3), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Last_IO_Errno", 4, + MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Last_IO_Error", 20), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Last_SQL_Errno", 4, + MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Last_SQL_Error", 20), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Ignore_Server_Ids", + FN_REFLEN), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Master_Server_Id", sizeof(ulong), + MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_Crl", + sizeof(mi->ssl_crl)), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Master_SSL_Crlpath", + sizeof(mi->ssl_crlpath)), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Using_Gtid", + sizeof("Current_Pos")-1), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Gtid_IO_Pos", 30), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Do_Domain_Ids", + FN_REFLEN), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Replicate_Ignore_Domain_Ids", + FN_REFLEN), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Parallel_Mode", + sizeof("conservative")-1), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "SQL_Delay", 10, + MYSQL_TYPE_LONG)); + field_list->push_back(new (mem_root) + Item_return_int(thd, "SQL_Remaining_Delay", 8, + MYSQL_TYPE_LONG)); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Slave_SQL_Running_State", + 20)); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Slave_DDL_Groups", 20, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Slave_Non_Transactional_Groups", 20, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Slave_Transactional_Groups", 20, + MYSQL_TYPE_LONGLONG), + mem_root); + + if (full) + { + field_list->push_back(new (mem_root) + Item_return_int(thd, "Retried_transactions", 10, + MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Max_relay_log_size", 10, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Executed_log_entries", 10, + MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Slave_received_heartbeats", 10, + MYSQL_TYPE_LONG), + mem_root); + field_list->push_back(new (mem_root) + Item_float(thd, "Slave_heartbeat_period", 0.0, 3, 10), + mem_root); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Gtid_Slave_Pos", + (uint)gtid_pos_length), + mem_root); + } + DBUG_VOID_RETURN; +} + +/* Text for Slave_IO_Running */ +static const char *slave_running[]= { "No", "Connecting", "Preparing", "Yes" }; + +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, + String *gtid_pos) +{ + DBUG_ENTER("send_show_master_info_data"); + + if (mi->host[0]) + { + DBUG_PRINT("info",("host is set: '%s'", mi->host)); + String *packet= &thd->packet; + Protocol *protocol= thd->protocol; + Rpl_filter *rpl_filter= mi->rpl_filter; + StringBuffer<256> tmp; + + protocol->prepare_for_resend(); + + /* + slave_running can be accessed without run_lock but not other + non-volotile members like mi->io_thd, which is guarded by the mutex. + */ + if (full) + protocol->store(mi->connection_name.str, mi->connection_name.length, + &my_charset_bin); + mysql_mutex_lock(&mi->run_lock); + if (full) + { + /* + Show what the sql driver replication thread is doing + This is only meaningful if there is only one slave thread. + */ + protocol->store(mi->rli.sql_driver_thd ? + mi->rli.sql_driver_thd->get_proc_info() : "", + &my_charset_bin); + } + protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin); + mysql_mutex_unlock(&mi->run_lock); + + mysql_mutex_lock(&mi->data_lock); + mysql_mutex_lock(&mi->rli.data_lock); + /* err_lock is to protect mi->last_error() */ + mysql_mutex_lock(&mi->err_lock); + /* err_lock is to protect mi->rli.last_error() */ + mysql_mutex_lock(&mi->rli.err_lock); + protocol->store(mi->host, &my_charset_bin); + protocol->store(mi->user, &my_charset_bin); + protocol->store((uint32) mi->port); + protocol->store((uint32) mi->connect_retry); + protocol->store(mi->master_log_name, &my_charset_bin); + protocol->store((ulonglong) mi->master_log_pos); + protocol->store(mi->rli.group_relay_log_name + + dirname_length(mi->rli.group_relay_log_name), + &my_charset_bin); + protocol->store((ulonglong) mi->rli.group_relay_log_pos); + protocol->store(mi->rli.group_master_log_name, &my_charset_bin); + protocol->store(slave_running[mi->slave_running], &my_charset_bin); + protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin); + protocol->store(rpl_filter->get_do_db()); + protocol->store(rpl_filter->get_ignore_db()); + + rpl_filter->get_do_table(&tmp); + protocol->store(&tmp); + rpl_filter->get_ignore_table(&tmp); + protocol->store(&tmp); + rpl_filter->get_wild_do_table(&tmp); + protocol->store(&tmp); + rpl_filter->get_wild_ignore_table(&tmp); + protocol->store(&tmp); + + protocol->store(mi->rli.last_error().number); + protocol->store(mi->rli.last_error().message, &my_charset_bin); + protocol->store((uint32) mi->rli.slave_skip_counter); + protocol->store((ulonglong) mi->rli.group_master_log_pos); + protocol->store((ulonglong) mi->rli.log_space_total); + + protocol->store( + mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None": + ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master": + ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay": + "Gtid")), &my_charset_bin); + protocol->store(mi->rli.until_log_name, &my_charset_bin); + protocol->store((ulonglong) mi->rli.until_log_pos); + +#ifdef HAVE_OPENSSL + protocol->store(mi->ssl? "Yes":"No", &my_charset_bin); +#else + protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin); +#endif + protocol->store(mi->ssl_ca, &my_charset_bin); + protocol->store(mi->ssl_capath, &my_charset_bin); + protocol->store(mi->ssl_cert, &my_charset_bin); + protocol->store(mi->ssl_cipher, &my_charset_bin); + protocol->store(mi->ssl_key, &my_charset_bin); + + /* + Seconds_Behind_Master: if SQL thread is running and I/O thread is + connected, we can compute it otherwise show NULL (i.e. unknown). + */ + if ((mi->slave_running == MYSQL_SLAVE_RUN_READING) && + mi->rli.slave_running) + { + long time_diff; + bool idle; + time_t stamp= mi->rli.last_master_timestamp; + + if (!stamp) + idle= true; + else + { + idle= mi->rli.sql_thread_caught_up; + if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle()) + idle= false; + } + if (idle) + time_diff= 0; + else + { + time_diff= ((long)(time(0) - stamp) - mi->clock_diff_with_master); + /* + Apparently on some systems time_diff can be <0. Here are possible + reasons related to MySQL: + - the master is itself a slave of another master whose time is ahead. + - somebody used an explicit SET TIMESTAMP on the master. + Possible reason related to granularity-to-second of time functions + (nothing to do with MySQL), which can explain a value of -1: + assume the master's and slave's time are perfectly synchronized, and + that at slave's connection time, when the master's timestamp is read, + it is at the very end of second 1, and (a very short time later) when + the slave's timestamp is read it is at the very beginning of second + 2. Then the recorded value for master is 1 and the recorded value for + slave is 2. At SHOW SLAVE STATUS time, assume that the difference + between timestamp of slave and rli->last_master_timestamp is 0 + (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result. + This confuses users, so we don't go below 0. + + last_master_timestamp == 0 (an "impossible" timestamp 1970) is a + special marker to say "consider we have caught up". + */ + if (time_diff < 0) + time_diff= 0; + } + protocol->store((longlong)time_diff); + } + else + { + protocol->store_null(); + } + protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin); + + // Last_IO_Errno + protocol->store(mi->last_error().number); + // Last_IO_Error + protocol->store(mi->last_error().message, &my_charset_bin); + // Last_SQL_Errno + protocol->store(mi->rli.last_error().number); + // Last_SQL_Error + protocol->store(mi->rli.last_error().message, &my_charset_bin); + // Replicate_Ignore_Server_Ids + prot_store_ids(thd, &mi->ignore_server_ids); + // Master_Server_id + protocol->store((uint32) mi->master_id); + // SQL_Delay + // Master_Ssl_Crl + protocol->store(mi->ssl_ca, &my_charset_bin); + // Master_Ssl_Crlpath + protocol->store(mi->ssl_capath, &my_charset_bin); + // Using_Gtid + protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin); + // Gtid_IO_Pos + { + mi->gtid_current_pos.to_string(&tmp); + protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin); + } + + // Replicate_Do_Domain_Ids & Replicate_Ignore_Domain_Ids + mi->domain_id_filter.store_ids(thd); + + // Parallel_Mode + { + const char *mode_name= get_type(&slave_parallel_mode_typelib, + mi->parallel_mode); + protocol->store(mode_name, strlen(mode_name), &my_charset_bin); + } + + protocol->store((uint32) mi->rli.get_sql_delay()); + // SQL_Remaining_Delay + // THD::proc_info is not protected by any lock, so we read it once + // to ensure that we use the same value throughout this function. + const char *slave_sql_running_state= + mi->rli.sql_driver_thd ? mi->rli.sql_driver_thd->proc_info : ""; + if (slave_sql_running_state == Relay_log_info::state_delaying_string) + { + time_t t= my_time(0), sql_delay_end= mi->rli.get_sql_delay_end(); + protocol->store((uint32)(t < sql_delay_end ? sql_delay_end - t : 0)); + } + else + protocol->store_null(); + // Slave_SQL_Running_State + protocol->store(slave_sql_running_state, &my_charset_bin); + + protocol->store(mi->total_ddl_groups); + protocol->store(mi->total_non_trans_groups); + protocol->store(mi->total_trans_groups); + + if (full) + { + protocol->store((uint32) mi->rli.retried_trans); + protocol->store((ulonglong) mi->rli.max_relay_log_size); + protocol->store(mi->rli.executed_entries); + protocol->store((uint32) mi->received_heartbeats); + protocol->store_double(mi->heartbeat_period, 3); + protocol->store(gtid_pos->ptr(), gtid_pos->length(), &my_charset_bin); + } + + mysql_mutex_unlock(&mi->rli.err_lock); + mysql_mutex_unlock(&mi->err_lock); + mysql_mutex_unlock(&mi->rli.data_lock); + mysql_mutex_unlock(&mi->data_lock); + + if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length())) + DBUG_RETURN(TRUE); + } + DBUG_RETURN(FALSE); +} + + +/* Used to sort connections by name */ + +static int cmp_mi_by_name(const Master_info **arg1, + const Master_info **arg2) +{ + return my_strcasecmp(system_charset_info, (*arg1)->connection_name.str, + (*arg2)->connection_name.str); +} + + +/** + Execute a SHOW FULL SLAVE STATUS statement. + + @param thd Pointer to THD object for the client thread executing the + statement. + + Elements are sorted according to the original connection_name. + + @retval FALSE success + @retval TRUE failure + + @note + master_info_index is protected by LOCK_active_mi. +*/ + +bool show_all_master_info(THD* thd) +{ + uint i, elements; + String gtid_pos; + Master_info **tmp; + List<Item> field_list; + DBUG_ENTER("show_master_info"); + mysql_mutex_assert_owner(&LOCK_active_mi); + + gtid_pos.length(0); + if (rpl_append_gtid_state(>id_pos, true)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + DBUG_RETURN(TRUE); + } + + show_master_info_get_fields(thd, &field_list, 1, gtid_pos.length()); + if (thd->protocol->send_result_set_metadata(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) + DBUG_RETURN(TRUE); + + if (!master_info_index || + !(elements= master_info_index->master_info_hash.records)) + goto end; + + /* + Sort lines to get them into a predicted order + (needed for test cases and to not confuse users) + */ + if (!(tmp= (Master_info**) thd->alloc(sizeof(Master_info*) * elements))) + DBUG_RETURN(TRUE); + + for (i= 0; i < elements; i++) + { + tmp[i]= (Master_info *) my_hash_element(&master_info_index-> + master_info_hash, i); + } + my_qsort(tmp, elements, sizeof(Master_info*), (qsort_cmp) cmp_mi_by_name); + + for (i= 0; i < elements; i++) + { + if (send_show_master_info_data(thd, tmp[i], 1, >id_pos)) + DBUG_RETURN(TRUE); + } + +end: + my_eof(thd); + DBUG_RETURN(FALSE); +} + + +void set_slave_thread_options(THD* thd) +{ + DBUG_ENTER("set_slave_thread_options"); + /* + It's nonsense to constrain the slave threads with max_join_size; if a + query succeeded on master, we HAVE to execute it. So set + OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough + (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT + SELECT examining more than 4 billion rows would still fail (yes, because + when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but + only for client threads. + */ + ulonglong options= (thd->variables.option_bits | + OPTION_BIG_SELECTS | OPTION_BIN_LOG); + if (!opt_log_slave_updates) + options&= ~OPTION_BIN_LOG; + /* For easier test in LOGGER::log_command */ + if (thd->variables.log_disabled_statements & LOG_DISABLE_SLAVE) + options|= OPTION_LOG_OFF; + thd->variables.option_bits= options; + + thd->variables.completion_type= 0; + thd->variables.sql_log_slow= + !MY_TEST(thd->variables.log_slow_disabled_statements & + LOG_SLOW_DISABLE_SLAVE); + DBUG_VOID_RETURN; +} + +void set_slave_thread_default_charset(THD* thd, rpl_group_info *rgi) +{ + DBUG_ENTER("set_slave_thread_default_charset"); + + thd->variables.collation_server= + global_system_variables.collation_server; + thd->update_charset(global_system_variables.character_set_client, + global_system_variables.collation_connection); + + thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); + DBUG_VOID_RETURN; +} + +/* + init_slave_thread() +*/ + +static int init_slave_thread(THD* thd, Master_info *mi, + SLAVE_THD_TYPE thd_type) +{ + DBUG_ENTER("init_slave_thread"); + int simulate_error __attribute__((unused))= 0; + DBUG_EXECUTE_IF("simulate_io_slave_error_on_init", + simulate_error|= (1 << SLAVE_THD_IO);); + DBUG_EXECUTE_IF("simulate_sql_slave_error_on_init", + simulate_error|= (1 << SLAVE_THD_SQL);); + + thd->system_thread = (thd_type == SLAVE_THD_SQL) ? + SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; + + if (init_thr_lock()) + { + thd->cleanup(); + DBUG_RETURN(-1); + } + + /* We must call store_globals() before doing my_net_init() */ + thd->store_globals(); + + if (my_net_init(&thd->net, 0, thd, MYF(MY_THREAD_SPECIFIC)) || + IF_DBUG(simulate_error & (1<< thd_type), 0)) + { + thd->cleanup(); + DBUG_RETURN(-1); + } + + thd->security_ctx->skip_grants(); + thd->slave_thread= 1; + thd->connection_name= mi->connection_name; + thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE); + set_slave_thread_options(thd); + + if (thd_type == SLAVE_THD_SQL) + THD_STAGE_INFO(thd, stage_waiting_for_the_next_event_in_relay_log); + else + THD_STAGE_INFO(thd, stage_waiting_for_master_update); + thd->set_time(); + /* Do not use user-supplied timeout value for system threads. */ + thd->variables.lock_wait_timeout= LONG_TIMEOUT; + DBUG_RETURN(0); +} + +/* + Sleep for a given amount of time or until killed. + + @param thd Thread context of the current thread. + @param seconds The number of seconds to sleep. + @param func Function object to check if the thread has been killed. + @param info The Rpl_info object associated with this sleep. + + @retval True if the thread has been killed, false otherwise. +*/ +template <typename killed_func, typename rpl_info> +static bool slave_sleep(THD *thd, time_t seconds, + killed_func func, rpl_info info) +{ + + bool ret; + struct timespec abstime; + + mysql_mutex_t *lock= &info->sleep_lock; + mysql_cond_t *cond= &info->sleep_cond; + + /* Absolute system time at which the sleep time expires. */ + set_timespec(abstime, seconds); + mysql_mutex_lock(lock); + thd->ENTER_COND(cond, lock, NULL, NULL); + + while (! (ret= func(info))) + { + int error= mysql_cond_timedwait(cond, lock, &abstime); + if (error == ETIMEDOUT || error == ETIME) + break; + } + /* Implicitly unlocks the mutex. */ + thd->EXIT_COND(NULL); + return ret; +} + + +static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, + bool *suppress_warnings) +{ + uchar buf[FN_REFLEN + 10]; + int len; + ushort binlog_flags = 0; // for now + char* logname = mi->master_log_name; + DBUG_ENTER("request_dump"); + + *suppress_warnings= FALSE; + + if (opt_log_slave_updates && opt_replicate_annotate_row_events) + binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; + + if (repl_semisync_slave.request_transmit(mi)) + DBUG_RETURN(1); + + // TODO if big log files: Change next to int8store() + int4store(buf, (ulong) mi->master_log_pos); + int2store(buf + 4, binlog_flags); + int4store(buf + 6, global_system_variables.server_id); + len = (uint) strlen(logname); + memcpy(buf + 10, logname,len); + if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) + { + /* + Something went wrong, so we will just reconnect and retry later + in the future, we should do a better error analysis, but for + now we just fill up the error log :-) + */ + if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED || + mysql_errno(mysql) == ER_NET_ERROR_ON_WRITE) + *suppress_warnings= TRUE; // Suppress reconnect warning + else + sql_print_error("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs", + mysql_errno(mysql), mysql_error(mysql), + mi->connect_retry); + DBUG_RETURN(1); + } + + DBUG_RETURN(0); +} + + +/* + Read one event from the master + + SYNOPSIS + read_event() + mysql MySQL connection + mi Master connection information + suppress_warnings TRUE when a normal net read timeout has caused us to + try a reconnect. We do not want to print anything to + the error log in this case because this a anormal + event in an idle server. + network_read_len get the real network read length in VIO, especially using compressed protocol + + RETURN VALUES + 'packet_error' Error + number Length of packet +*/ + +static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, + ulong* network_read_len) +{ + ulong len; + DBUG_ENTER("read_event"); + + *suppress_warnings= FALSE; + /* + my_real_read() will time us out + We check if we were told to die, and if not, try reading again + */ +#ifndef DBUG_OFF + if (disconnect_slave_event_count && !(mi->events_till_disconnect--)) + DBUG_RETURN(packet_error); +#endif + + len = cli_safe_read_reallen(mysql, network_read_len); + if (unlikely(len == packet_error || (long) len < 1)) + { + if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) + { + /* + We are trying a normal reconnect after a read timeout; + we suppress prints to .err file as long as the reconnect + happens without problems + */ + *suppress_warnings= + global_system_variables.log_warnings < 2 ? TRUE : FALSE; + } + else + { + if (!mi->rli.abort_slave) + { + sql_print_error("Error reading packet from server: %s (server_errno=%d)", + mysql_error(mysql), mysql_errno(mysql)); + } + } + DBUG_RETURN(packet_error); + } + + /* Check if eof packet */ + if (len < 8 && mysql->net.read_pos[0] == 254) + { + sql_print_information("Slave: received end packet from server, apparent " + "master shutdown: %s", + mysql_error(mysql)); + DBUG_RETURN(packet_error); + } + + DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d", + len, mysql->net.read_pos[4])); + DBUG_RETURN(len - 1); +} + + +/** + Check if the current error is of temporary nature of not. + Some errors are temporary in nature, such as + ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. + + @retval 0 if fatal error + @retval 1 temporary error, do retry +*/ + +int +has_temporary_error(THD *thd) +{ + uint current_errno; + DBUG_ENTER("has_temporary_error"); + + DBUG_EXECUTE_IF("all_errors_are_temporary_errors", + if (thd->get_stmt_da()->is_error()) + { + thd->clear_error(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + }); + + /* + If there is no message in THD, we can't say if it's a temporary + error or not. This is currently the case for Incident_log_event, + which sets no message. Return FALSE. + */ + if (!likely(thd->is_error())) + DBUG_RETURN(0); + + current_errno= thd->get_stmt_da()->sql_errno(); + for (uint i= 0; i < slave_transaction_retry_error_length; i++) + { + if (current_errno == slave_transaction_retry_errors[i]) + DBUG_RETURN(1); + } + + DBUG_RETURN(0); +} + + +/** + If this is a lagging slave (specified with CHANGE MASTER TO MASTER_DELAY = X), delays accordingly. Also unlocks rli->data_lock. + + Design note: this is the place to unlock rli->data_lock. The lock + must be held when reading delay info from rli, but it should not be + held while sleeping. + + @param ev Event that is about to be executed. + + @param thd The sql thread's THD object. + + @param rli The sql thread's Relay_log_info structure. + + @retval 0 If the delay timed out and the event shall be executed. + + @retval nonzero If the delay was interrupted and the event shall be skipped. +*/ +int +sql_delay_event(Log_event *ev, THD *thd, rpl_group_info *rgi) +{ + Relay_log_info* rli= rgi->rli; + long sql_delay= rli->get_sql_delay(); + + DBUG_ENTER("sql_delay_event"); + mysql_mutex_assert_owner(&rli->data_lock); + DBUG_ASSERT(!rli->belongs_to_client()); + + int type= ev->get_type_code(); + if (sql_delay && type != ROTATE_EVENT && + type != FORMAT_DESCRIPTION_EVENT && type != START_EVENT_V3) + { + // The time when we should execute the event. + time_t sql_delay_end= + ev->when + rli->mi->clock_diff_with_master + sql_delay; + // The current time. + time_t now= my_time(0); + // The time we will have to sleep before executing the event. + unsigned long nap_time= 0; + if (sql_delay_end > now) + nap_time= (ulong)(sql_delay_end - now); + + DBUG_PRINT("info", ("sql_delay= %lu " + "ev->when= %lu " + "rli->mi->clock_diff_with_master= %lu " + "now= %ld " + "sql_delay_end= %llu " + "nap_time= %ld", + sql_delay, (long)ev->when, + rli->mi->clock_diff_with_master, + (long)now, (ulonglong)sql_delay_end, (long)nap_time)); + + if (sql_delay_end > now) + { + DBUG_PRINT("info", ("delaying replication event %lu secs", + nap_time)); + rli->start_sql_delay(sql_delay_end); + mysql_mutex_unlock(&rli->data_lock); + DBUG_RETURN(slave_sleep(thd, nap_time, sql_slave_killed, rgi)); + } + } + + mysql_mutex_unlock(&rli->data_lock); + + DBUG_RETURN(0); +} + + +/* + First half of apply_event_and_update_pos(), see below. + Setup some THD variables for applying the event. + + Split out so that it can run with rli->data_lock held in non-parallel + replication, but without the mutex held in the parallel case. +*/ +static int +apply_event_and_update_pos_setup(Log_event* ev, THD* thd, rpl_group_info *rgi) +{ + DBUG_ENTER("apply_event_and_update_pos_setup"); + + DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)", + ev->get_type_str(), ev->get_type_code(), + ev->server_id)); + DBUG_PRINT("info", ("thd->options: '%s%s%s' rgi->last_event_start_time: %lu", + FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->variables.option_bits, OPTION_BEGIN), + FLAGSTR(thd->variables.option_bits, OPTION_GTID_BEGIN), + (ulong) rgi->last_event_start_time)); + + /* + Execute the event to change the database and update the binary + log coordinates, but first we set some data that is needed for + the thread. + + The event will be executed unless it is supposed to be skipped. + + Queries originating from this server must be skipped. Low-level + events (Format_description_log_event, Rotate_log_event, + Stop_log_event) from this server must also be skipped. But for + those we don't want to modify 'group_master_log_pos', because + these events did not exist on the master. + Format_description_log_event is not completely skipped. + + Skip queries specified by the user in 'slave_skip_counter'. We + can't however skip events that has something to do with the log + files themselves. + + Filtering on own server id is extremely important, to ignore + execution of events created by the creation/rotation of the relay + log (remember that now the relay log starts with its Format_desc, + has a Rotate etc). + */ + + /* Use the original server id for logging. */ + thd->variables.server_id = ev->server_id; + thd->set_time(); // time the query + thd->lex->current_select= 0; + thd->variables.option_bits= + (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | + (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); + ev->thd = thd; // because up to this point, ev->thd == 0 + + DBUG_RETURN(ev->shall_skip(rgi)); +} + + +/* + Second half of apply_event_and_update_pos(), see below. + + Do the actual event apply (or skip), and position update. + */ +static int +apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi, + int reason) +{ + int exec_res= 0; + Relay_log_info* rli= rgi->rli; + + DBUG_ENTER("apply_event_and_update_pos_apply"); + DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event", + { + DBUG_ASSERT(!debug_sync_set_action + (thd, STRING_WITH_LEN("now WAIT_FOR continue"))); + DBUG_SET_INITIAL("-d,inject_slave_sql_before_apply_event"); + };); + if (reason == Log_event::EVENT_SKIP_NOT) + exec_res= ev->apply_event(rgi); + +#ifdef WITH_WSREP + if (WSREP(thd)) { + + if (exec_res) { + mysql_mutex_lock(&thd->LOCK_thd_data); + switch(thd->wsrep_trx().state()) { + case wsrep::transaction::s_must_replay: + /* this transaction will be replayed, + so not raising slave error here */ + WSREP_DEBUG("SQL apply failed for MUST_REPLAY, res %d", exec_res); + exec_res = 0; + break; + default: + WSREP_DEBUG("SQL apply failed, res %d conflict state: %s", + exec_res, wsrep_thd_transaction_state_str(thd)); + rli->abort_slave= 1; + rli->report(ERROR_LEVEL, ER_UNKNOWN_COM_ERROR, rgi->gtid_info(), + "Node has dropped from cluster"); + break; + } + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + } +#endif + +#ifndef DBUG_OFF + /* + This only prints information to the debug trace. + + TODO: Print an informational message to the error log? + */ + static const char *const explain[] = { + // EVENT_SKIP_NOT, + "not skipped", + // EVENT_SKIP_IGNORE, + "skipped because event should be ignored", + // EVENT_SKIP_COUNT + "skipped because event skip counter was non-zero" + }; + DBUG_PRINT("info", ("OPTION_BEGIN: %d IN_STMT: %d IN_TRANSACTION: %d", + MY_TEST(thd->variables.option_bits & OPTION_BEGIN), + rli->get_flag(Relay_log_info::IN_STMT), + rli->get_flag(Relay_log_info::IN_TRANSACTION))); + DBUG_PRINT("skip_event", ("%s event was %s", + ev->get_type_str(), explain[reason])); +#endif + + DBUG_PRINT("info", ("apply_event error = %d", exec_res)); + if (exec_res == 0) + { + int error= ev->update_pos(rgi); + #ifndef DBUG_OFF + DBUG_PRINT("info", ("update_pos error = %d", error)); + if (!rli->belongs_to_client()) + { + DBUG_PRINT("info", ("group %llu %s", rli->group_relay_log_pos, + rli->group_relay_log_name)); + DBUG_PRINT("info", ("event %llu %s", rli->event_relay_log_pos, + rli->event_relay_log_name)); + } +#endif + /* + The update should not fail, so print an error message and + return an error code. + + TODO: Replace this with a decent error message when merged + with BUG#24954 (which adds several new error message). + */ + if (unlikely(error)) + { + rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, rgi->gtid_info(), + "It was not possible to update the positions" + " of the relay log information: the slave may" + " be in an inconsistent state." + " Stopped in %s position %llu", + rli->group_relay_log_name, rli->group_relay_log_pos); + DBUG_RETURN(2); + } + } + else + { + /* + Make sure we do not erroneously update gtid_slave_pos with a lingering + GTID from this failed event group (MDEV-4906). + */ + rgi->gtid_pending= false; + } + + DBUG_RETURN(exec_res ? 1 : 0); +} + + +/** + Applies the given event and advances the relay log position. + + This is needed by the sql thread to execute events from the binlog, + and by clients executing BINLOG statements. Conceptually, this + function does: + + @code + ev->apply_event(rli); + ev->update_pos(rli); + @endcode + + It also does the following maintainance: + + - Initializes the thread's server_id and time; and the event's + thread. + + - If !rli->belongs_to_client() (i.e., if it belongs to the slave + sql thread instead of being used for executing BINLOG + statements), it does the following things: (1) skips events if it + is needed according to the server id or slave_skip_counter; (2) + unlocks rli->data_lock; (3) sleeps if required by 'CHANGE MASTER + TO MASTER_DELAY=X'; (4) maintains the running state of the sql + thread (rli->thread_state). + + - Reports errors as needed. + + @param ev The event to apply. + + @param thd The client thread that executes the event (i.e., the + slave sql thread if called from a replication slave, or the client + thread if called to execute a BINLOG statement). + + @param rli The relay log info (i.e., the slave's rli if called from + a replication slave, or the client's thd->rli_fake if called to + execute a BINLOG statement). + + @retval 0 OK. + + @retval 1 Error calling ev->apply_event(). + + @retval 2 No error calling ev->apply_event(), but error calling + ev->update_pos(). + + This function is only used in non-parallel replication, where it is called + with rli->data_lock held; this lock is released during this function. +*/ +int +apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi) +{ + Relay_log_info* rli= rgi->rli; + mysql_mutex_assert_owner(&rli->data_lock); + int reason= apply_event_and_update_pos_setup(ev, thd, rgi); + if (reason == Log_event::EVENT_SKIP_COUNT) + { + DBUG_ASSERT(rli->slave_skip_counter > 0); + rli->slave_skip_counter--; + } + + if (reason == Log_event::EVENT_SKIP_NOT) + { + // Sleeps if needed, and unlocks rli->data_lock. + if (sql_delay_event(ev, thd, rgi)) + return 0; + } + else + mysql_mutex_unlock(&rli->data_lock); + + return apply_event_and_update_pos_apply(ev, thd, rgi, reason); +} + + +/* + The version of above apply_event_and_update_pos() used in parallel + replication. Unlike the non-parallel case, this function is called without + rli->data_lock held. +*/ +int +apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd, + rpl_group_info *rgi) +{ + mysql_mutex_assert_not_owner(&rgi->rli->data_lock); + int reason= apply_event_and_update_pos_setup(ev, thd, rgi); + /* + In parallel replication, sql_slave_skip_counter is handled in the SQL + driver thread, so 23 should never see EVENT_SKIP_COUNT here. + */ + DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT); + /* + Calling sql_delay_event() was handled in the SQL driver thread when + doing parallel replication. + */ + return apply_event_and_update_pos_apply(ev, thd, rgi, reason); +} + + +/** + Keep the relay log transaction state up to date. + + The state reflects how things are after the given event, that has just been + read from the relay log, is executed. + + This is only needed to ensure we: + - Don't abort the sql driver thread in the middle of an event group. + - Don't rotate the io thread in the middle of a statement or transaction. + The mechanism is that the io thread, when it needs to rotate the relay + log, will wait until the sql driver has read all the cached events + and then continue reading events one by one from the master until + the sql threads signals that log doesn't have an active group anymore. + + There are two possible cases. We keep them as 2 separate flags mainly + to make debugging easier. + + - IN_STMT is set when we have read an event that should be used + together with the next event. This is for example setting a + variable that is used when executing the next statement. + - IN_TRANSACTION is set when we are inside a BEGIN...COMMIT group + + To test the state one should use the is_in_group() function. +*/ + +inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) +{ + Log_event_type typ= ev->get_type_code(); + + /* check if we are in a multi part event */ + if (ev->is_part_of_group()) + rli->set_flag(Relay_log_info::IN_STMT); + else if (Log_event::is_group_event(typ)) + { + /* + If it was not a is_part_of_group() and not a group event (like + rotate) then we can reset the IN_STMT flag. We have the above + if only to allow us to have a rotate element anywhere. + */ + rli->clear_flag(Relay_log_info::IN_STMT); + } + + /* Check for an event that starts or stops a transaction */ + if (LOG_EVENT_IS_QUERY(typ)) + { + Query_log_event *qev= (Query_log_event*) ev; + /* + Trivial optimization to avoid the following somewhat expensive + checks. + */ + if (qev->q_len <= sizeof("ROLLBACK")) + { + if (qev->is_begin()) + rli->set_flag(Relay_log_info::IN_TRANSACTION); + if (qev->is_commit() || qev->is_rollback()) + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + } + } + if (typ == XID_EVENT || typ == XA_PREPARE_LOG_EVENT) + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + if (typ == GTID_EVENT && + !(((Gtid_log_event*) ev)->flags2 & Gtid_log_event::FL_STANDALONE)) + { + /* This GTID_EVENT will generate a BEGIN event */ + rli->set_flag(Relay_log_info::IN_TRANSACTION); + } + + DBUG_PRINT("info", ("event: %u IN_STMT: %d IN_TRANSACTION: %d", + (uint) typ, + rli->get_flag(Relay_log_info::IN_STMT), + rli->get_flag(Relay_log_info::IN_TRANSACTION))); +} + + +/** + Top-level function for executing the next event in the relay log. + This is called from the SQL thread. + + This function reads the event from the relay log, executes it, and + advances the relay log position. It also handles errors, etc. + + This function may fail to apply the event for the following reasons: + + - The position specfied by the UNTIL condition of the START SLAVE + command is reached. + + - It was not possible to read the event from the log. + + - The slave is killed. + + - An error occurred when applying the event, and the event has been + tried slave_trans_retries times. If the event has been retried + fewer times, 0 is returned. + + - init_master_info or init_relay_log_pos failed. (These are called + if a failure occurs when applying the event.) + + - An error occurred when updating the binlog position. + + @retval 0 The event was applied. + + @retval 1 The event was not applied. +*/ + +static int exec_relay_log_event(THD* thd, Relay_log_info* rli, + rpl_group_info *serial_rgi) +{ + ulonglong event_size; + DBUG_ENTER("exec_relay_log_event"); + + /* + We acquire this mutex since we need it for all operations except + event execution. But we will release it in places where we will + wait for something for example inside of next_event(). + */ + mysql_mutex_lock(&rli->data_lock); + + Log_event *ev= next_event(serial_rgi, &event_size); + + if (sql_slave_killed(serial_rgi)) + { + mysql_mutex_unlock(&rli->data_lock); + delete ev; + DBUG_RETURN(1); + } + if (ev) + { +#ifdef WITH_WSREP + if (wsrep_before_statement(thd)) + { + WSREP_INFO("Wsrep before statement error"); + DBUG_RETURN(1); + } +#endif /* WITH_WSREP */ + int exec_res; + Log_event_type typ= ev->get_type_code(); + + /* + Even if we don't execute this event, we keep the master timestamp, + so that seconds behind master shows correct delta (there are events + that are not replayed, so we keep falling behind). + + If it is an artificial event, or a relay log event (IO thread generated + event) or ev->when is set to 0, we don't update the + last_master_timestamp. + + In parallel replication, we might queue a large number of events, and + the user might be surprised to see a claim that the slave is up to date + long before those queued events are actually executed. + */ + if (!rli->mi->using_parallel() && + !(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) + { + rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; + DBUG_ASSERT(rli->last_master_timestamp >= 0); + } + + /* + This tests if the position of the beginning of the current event + hits the UNTIL barrier. + */ + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && + (ev->server_id != global_system_variables.server_id || + rli->replicate_same_server_id) && + rli->is_until_satisfied(ev)) + { + /* + Setting abort_slave flag because we do not want additional + message about error in query execution to be printed. + */ + rli->abort_slave= 1; + rli->stop_for_until= true; + mysql_mutex_unlock(&rli->data_lock); +#ifdef WITH_WSREP + wsrep_after_statement(thd); +#endif /* WITH_WSREP */ + delete ev; + DBUG_RETURN(1); + } + + { /** + The following failure injecion works in cooperation with tests + setting @@global.debug= 'd,incomplete_group_in_relay_log'. + Xid or Commit events are not executed to force the slave sql + read hanging if the realy log does not have any more events. + */ + DBUG_EXECUTE_IF("incomplete_group_in_relay_log", + if ((typ == XID_EVENT) || + (LOG_EVENT_IS_QUERY(typ) && + strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) + { + DBUG_ASSERT(thd->transaction->all.modified_non_trans_table); + rli->abort_slave= 1; + mysql_mutex_unlock(&rli->data_lock); + delete ev; + serial_rgi->inc_event_relay_log_pos(); + DBUG_RETURN(0); + };); + } + + update_state_of_relay_log(rli, ev); + + if (rli->mi->using_parallel()) + { + int res= rli->parallel.do_event(serial_rgi, ev, event_size); + /* + In parallel replication, we need to update the relay log position + immediately so that it will be the correct position from which to + read the next event. + */ + if (res == 0) + rli->event_relay_log_pos= rli->future_event_relay_log_pos; + if (res >= 0) + { +#ifdef WITH_WSREP + wsrep_after_statement(thd); +#endif /* WITH_WSREP */ + DBUG_RETURN(res); + } + /* + Else we proceed to execute the event non-parallel. + This is the case for pre-10.0 events without GTID, and for handling + slave_skip_counter. + */ + if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) + { + /* + Ignore FD's timestamp as it does not reflect the slave execution + state but likely to reflect a deep past. Consequently when the first + data modification event execution last long all this time + Seconds_Behind_Master is zero. + */ + if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; + + DBUG_ASSERT(rli->last_master_timestamp >= 0); + } + } + + if (typ == GTID_EVENT) + { + Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev); + + /* + For GTID, allocate a new sub_id for the given domain_id. + The sub_id must be allocated in increasing order of binlog order. + */ + if (event_group_new_gtid(serial_rgi, gev)) + { + sql_print_error("Error reading relay log event: %s", "slave SQL thread " + "aborted because of out-of-memory error"); + mysql_mutex_unlock(&rli->data_lock); + delete ev; +#ifdef WITH_WSREP + wsrep_after_statement(thd); +#endif /* WITH_WSREP */ + DBUG_RETURN(1); + } + + if (opt_gtid_ignore_duplicates && + rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + int res= rpl_global_gtid_slave_state->check_duplicate_gtid + (&serial_rgi->current_gtid, serial_rgi); + if (res < 0) + { + sql_print_error("Error processing GTID event: %s", "slave SQL " + "thread aborted because of out-of-memory error"); + mysql_mutex_unlock(&rli->data_lock); + delete ev; +#ifdef WITH_WSREP + wsrep_after_statement(thd); +#endif /* WITH_WSREP */ + DBUG_RETURN(1); + } + /* + If we need to skip this event group (because the GTID was already + applied), then do it using the code for slave_skip_counter, which + is able to handle skipping until the end of the event group. + */ + if (!res) + rli->slave_skip_counter= 1; + } + } + + serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; + serial_rgi->event_relay_log_name= rli->event_relay_log_name; + serial_rgi->event_relay_log_pos= rli->event_relay_log_pos; + exec_res= apply_event_and_update_pos(ev, thd, serial_rgi); + +#ifdef WITH_WSREP + WSREP_DEBUG("apply_event_and_update_pos() result: %d", exec_res); +#endif /* WITH_WSREP */ + + delete_or_keep_event_post_apply(serial_rgi, typ, ev); + + /* + update_log_pos failed: this should not happen, so we don't + retry. + */ + if (unlikely(exec_res == 2)) + { +#ifdef WITH_WSREP + wsrep_after_statement(thd); +#endif /* WITH_WSREP */ + DBUG_RETURN(1); + } +#ifdef WITH_WSREP + mysql_mutex_lock(&thd->LOCK_thd_data); + enum wsrep::client_error wsrep_error= thd->wsrep_cs().current_error(); + mysql_mutex_unlock(&thd->LOCK_thd_data); + if (wsrep_error == wsrep::e_success) +#endif /* WITH_WSREP */ + if (slave_trans_retries) + { + int UNINIT_VAR(temp_err); + if (unlikely(exec_res) && (temp_err= has_temporary_error(thd))) + { + const char *errmsg; + rli->clear_error(); + /* + We were in a transaction which has been rolled back because of a + temporary error; + let's seek back to BEGIN log event and retry it all again. + Note, if lock wait timeout (innodb_lock_wait_timeout exceeded) + there is no rollback since 5.0.13 (ref: manual). + We have to not only seek but also + + a) init_master_info(), to seek back to hot relay log's start + for later (for when we will come back to this hot log after + re-processing the possibly existing old logs where BEGIN is: + check_binlog_magic() will then need the cache to be at + position 0 (see comments at beginning of + init_master_info()). + b) init_relay_log_pos(), because the BEGIN may be an older relay log. + */ + if (serial_rgi->trans_retries < slave_trans_retries) + { + if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL)) + sql_print_error("Failed to initialize the master info structure"); + else if (init_relay_log_pos(rli, + rli->group_relay_log_name, + rli->group_relay_log_pos, + 1, &errmsg, 1)) + sql_print_error("Error initializing relay log position: %s", + errmsg); + else + { + exec_res= 0; + serial_rgi->cleanup_context(thd, 1); + /* chance for concurrent connection to get more locks */ + slave_sleep(thd, MY_MAX(MY_MIN(serial_rgi->trans_retries, + MAX_SLAVE_RETRY_PAUSE), + slave_trans_retry_interval), + sql_slave_killed, serial_rgi); + serial_rgi->trans_retries++; + mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS + rli->retried_trans++; + statistic_increment(slave_retried_transactions, LOCK_status); + mysql_mutex_unlock(&rli->data_lock); + DBUG_PRINT("info", ("Slave retries transaction " + "rgi->trans_retries: %lu", + serial_rgi->trans_retries)); + } + } + else + sql_print_error("Slave SQL thread retried transaction %lu time(s) " + "in vain, giving up. Consider raising the value of " + "the slave_transaction_retries variable.", + slave_trans_retries); + } + else if ((exec_res && !temp_err) || + (opt_using_transactions && + rli->group_relay_log_pos == rli->event_relay_log_pos)) + { + /* + Only reset the retry counter if the entire group succeeded + or failed with a non-transient error. On a successful + event, the execution will proceed as usual; in the case of a + non-transient error, the slave will stop with an error. + */ + serial_rgi->trans_retries= 0; // restart from fresh + DBUG_PRINT("info", ("Resetting retry counter, rgi->trans_retries: %lu", + serial_rgi->trans_retries)); + } + } + + rli->executed_entries++; +#ifdef WITH_WSREP + wsrep_after_statement(thd); +#endif /* WITH_WSREP */ + DBUG_RETURN(exec_res); + } + mysql_mutex_unlock(&rli->data_lock); + rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_RELAY_LOG_READ_FAILURE), "\ +Could not parse relay log event entry. The possible reasons are: the master's \ +binary log is corrupted (you can check this by running 'mysqlbinlog' on the \ +binary log), the slave's relay log is corrupted (you can check this by running \ +'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \ +or slave's MySQL code. If you want to check the master's binary log or slave's \ +relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \ +on this slave.\ +"); + DBUG_RETURN(1); +} + + +static bool check_io_slave_killed(Master_info *mi, const char *info) +{ + if (io_slave_killed(mi)) + { + if (info && global_system_variables.log_warnings) + sql_print_information("%s", info); + return TRUE; + } + return FALSE; +} + +/** + @brief Try to reconnect slave IO thread. + + @details Terminates current connection to master, sleeps for + @c mi->connect_retry msecs and initiates new connection with + @c safe_reconnect(). Variable pointed by @c retry_count is increased - + if it exceeds @c master_retry_count then connection is not re-established + and function signals error. + Unless @c suppres_warnings is TRUE, a warning is put in the server error log + when reconnecting. The warning message and messages used to report errors + are taken from @c messages array. In case @c master_retry_count is exceeded, + no messages are added to the log. + + @param[in] thd Thread context. + @param[in] mysql MySQL connection. + @param[in] mi Master connection information. + @param[in,out] retry_count Number of attempts to reconnect. + @param[in] suppress_warnings TRUE when a normal net read timeout + has caused to reconnecting. + @param[in] messages Messages to print/log, see + reconnect_messages[] array. + + @retval 0 OK. + @retval 1 There was an error. +*/ + +static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, + uint *retry_count, bool suppress_warnings, + const char *messages[SLAVE_RECON_MSG_MAX]) +{ + mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT; + thd->proc_info= messages[SLAVE_RECON_MSG_WAIT]; +#ifdef SIGNAL_WITH_VIO_CLOSE + thd->clear_active_vio(); +#endif + end_server(mysql); + if ((*retry_count)++) + { + if (*retry_count > master_retry_count) + return 1; // Don't retry forever + slave_sleep(thd, mi->connect_retry, io_slave_killed, mi); + } + if (check_io_slave_killed(mi, messages[SLAVE_RECON_MSG_KILLED_WAITING])) + return 1; + thd->proc_info = messages[SLAVE_RECON_MSG_AFTER]; + if (!suppress_warnings) + { + char buf[256]; + StringBuffer<100> tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + mi->gtid_current_pos.append_to_string(&tmp); + if (mi->events_queued_since_last_gtid == 0) + tmp.append(STRING_WITH_LEN("'")); + else + { + tmp.append(STRING_WITH_LEN("', GTID event skip ")); + tmp.append_ulonglong((ulonglong)mi->events_queued_since_last_gtid); + } + } + my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED], + IO_RPL_LOG_NAME, mi->master_log_pos, + tmp.c_ptr_safe()); + /* + Raise a warining during registering on master/requesting dump. + Log a message reading event. + */ + if (messages[SLAVE_RECON_MSG_COMMAND][0]) + { + mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_MASTER_COM_FAILURE), + messages[SLAVE_RECON_MSG_COMMAND], buf); + } + else + { + sql_print_information("%s", buf); + } + } + if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(mi)) + { + if (global_system_variables.log_warnings) + sql_print_information("%s", messages[SLAVE_RECON_MSG_KILLED_AFTER]); + return 1; + } + return 0; +} + + +/** + Slave IO thread entry point. + + @param arg Pointer to Master_info struct that holds information for + the IO thread. + + @return Always 0. +*/ +pthread_handler_t handle_slave_io(void *arg) +{ + THD *thd; // needs to be first for thread_stack + MYSQL *mysql; + Master_info *mi = (Master_info*)arg; + Relay_log_info *rli= &mi->rli; + uint retry_count; + bool suppress_warnings; + int ret; + rpl_io_thread_info io_info; +#ifndef DBUG_OFF + mi->dbug_do_disconnect= false; +#endif + // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff + my_thread_init(); + DBUG_ENTER("handle_slave_io"); + + DBUG_ASSERT(mi->inited); + mysql= NULL ; + retry_count= 0; + + thd= new THD(next_thread_id()); // note that contructor of THD uses DBUG_ ! + + mysql_mutex_lock(&mi->run_lock); + /* Inform waiting threads that slave has started */ + mi->slave_run_id++; + +#ifndef DBUG_OFF + mi->events_till_disconnect = disconnect_slave_event_count; +#endif + + THD_CHECK_SENTRY(thd); + mi->io_thd = thd; + + thd->set_psi(PSI_CALL_get_thread()); + + pthread_detach_this_thread(); + thd->thread_stack= (char*) &thd; // remember where our stack is + mi->clear_error(); + if (init_slave_thread(thd, mi, SLAVE_THD_IO)) + { + mysql_cond_broadcast(&mi->start_cond); + sql_print_error("Failed during slave I/O thread initialization"); + goto err_during_init; + } + thd->system_thread_info.rpl_io_info= &io_info; + server_threads.insert(thd); + mi->slave_running = MYSQL_SLAVE_RUN_NOT_CONNECT; + mi->abort_slave = 0; + mysql_mutex_unlock(&mi->run_lock); + mysql_cond_broadcast(&mi->start_cond); + mi->rows_event_tracker.reset(); + + DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu", + mi->master_log_name, mi->master_log_pos)); + + /* This must be called before run any binlog_relay_io hooks */ + my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi); + + /* Load the set of seen GTIDs, if we did not already. */ + if (rpl_load_gtid_slave_state(thd)) + { + mi->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Unable to load replication GTID slave state from mysql.%s: %s", + rpl_gtid_slave_state_table_name.str, + thd->get_stmt_da()->message()); + /* + If we are using old-style replication, we can continue, even though we + then will not be able to record the GTIDs we receive. But if using GTID, + we must give up. + */ + if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode) + goto err; + } + + thd->variables.wsrep_on= 0; + if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) + || repl_semisync_slave.slave_start(mi)) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + ER_THD(thd, ER_SLAVE_FATAL_ERROR), + "Failed to run 'thread_start' hook"); + goto err; + } + + if (!(mi->mysql = mysql = mysql_init(NULL))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + ER_THD(thd, ER_SLAVE_FATAL_ERROR), "error in mysql_init()"); + goto err; + } + + THD_STAGE_INFO(thd, stage_connecting_to_master); + // we can get killed during safe_connect + if (!safe_connect(thd, mysql, mi)) + { + if (mi->using_gtid == Master_info::USE_GTID_NO) + sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," + "replication started in log '%s' at position %llu", + mi->user, mi->host, mi->port, + IO_RPL_LOG_NAME, mi->master_log_pos); + else + { + StringBuffer<100> tmp; + mi->gtid_current_pos.to_string(&tmp); + sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," + "replication starts at GTID position '%s'", + mi->user, mi->host, mi->port, tmp.c_ptr_safe()); + } + } + else + { + sql_print_information("Slave I/O thread killed while connecting to master"); + goto err; + } + +connected: + + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* + When the IO thread (re)connects to the master using GTID, it will + connect at the start of an event group. But the IO thread may have + previously logged part of the following event group to the relay + log. + + When the IO and SQL thread are started together, we erase any previous + relay logs, but this is not possible/desirable while the SQL thread is + running. To avoid duplicating partial event groups in the relay logs in + this case, we remember the count of events in any partially logged event + group before the reconnect, and then here at connect we set up a counter + to skip the already-logged part of the group. + */ + mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid; + mi->gtid_event_seen= false; + /* + Reset stale state of the rows-event group tracker at reconnect. + */ + mi->rows_event_tracker.reset(); + } + +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF("dbug.before_get_running_status_yes", + { + const char act[]= + "now " + "wait_for signal.io_thread_let_running"; + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action(thd, + STRING_WITH_LEN(act))); + };); +#endif + + mysql_mutex_lock(&mi->run_lock); + mi->slave_running= MYSQL_SLAVE_RUN_CONNECT; + mysql_mutex_unlock(&mi->run_lock); + + thd->slave_net = &mysql->net; + THD_STAGE_INFO(thd, stage_checking_master_version); + ret= get_master_version_and_clock(mysql, mi); + if (ret == 1) + /* Fatal error */ + goto err; + + if (ret == 2) + { + if (check_io_slave_killed(mi, "Slave I/O thread killed " + "while calling get_master_version_and_clock(...)")) + goto err; + suppress_warnings= FALSE; + /* + Try to reconnect because the error was caused by a transient network + problem + */ + if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, + reconnect_messages[SLAVE_RECON_ACT_REG])) + goto err; + goto connected; + } + + if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1) + { + /* + Register ourselves with the master. + */ + THD_STAGE_INFO(thd, stage_registering_slave_on_master); + if (register_slave_on_master(mysql, mi, &suppress_warnings)) + { + if (!check_io_slave_killed(mi, "Slave I/O thread killed " + "while registering slave on master")) + { + sql_print_error("Slave I/O thread couldn't register on master"); + if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, + reconnect_messages[SLAVE_RECON_ACT_REG])) + goto err; + } + else + goto err; + goto connected; + } + DBUG_EXECUTE_IF("fail_com_register_slave", goto err;); + } + + DBUG_PRINT("info",("Starting reading binary log from master")); + thd->set_command(COM_SLAVE_IO); + while (!io_slave_killed(mi)) + { + THD_STAGE_INFO(thd, stage_requesting_binlog_dump); + if (request_dump(thd, mysql, mi, &suppress_warnings)) + { + sql_print_error("Failed on request_dump()"); + if (check_io_slave_killed(mi, NullS) || + try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, + reconnect_messages[SLAVE_RECON_ACT_DUMP])) + goto err; + goto connected; + } + + const char *event_buf; + + mi->slave_running= MYSQL_SLAVE_RUN_READING; + DBUG_ASSERT(mi->last_error().number == 0); + ulonglong lastchecktime = my_hrtime().val; + ulonglong tokenamount = opt_read_binlog_speed_limit*1024; + while (!io_slave_killed(mi)) + { + ulong event_len, network_read_len = 0; + /* + We say "waiting" because read_event() will wait if there's nothing to + read. But if there's something to read, it will not wait. The + important thing is to not confuse users by saying "reading" whereas + we're in fact receiving nothing. + */ + THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); + event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len); + if (check_io_slave_killed(mi, NullS)) + goto err; + + if (unlikely(event_len == packet_error)) + { + uint mysql_error_number= mysql_errno(mysql); + switch (mysql_error_number) { + case CR_NET_PACKET_TOO_LARGE: + sql_print_error("\ +Log entry on master is longer than slave_max_allowed_packet (%lu) on \ +slave. If the entry is correct, restart the server with a higher value of \ +slave_max_allowed_packet", + slave_max_allowed_packet); + mi->report(ERROR_LEVEL, ER_NET_PACKET_TOO_LARGE, NULL, + "%s", "Got a packet bigger than 'slave_max_allowed_packet' bytes"); + goto err; + case ER_MASTER_FATAL_ERROR_READING_BINLOG: + mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG, NULL, + ER_THD(thd, ER_MASTER_FATAL_ERROR_READING_BINLOG), + mysql_error_number, mysql_error(mysql)); + goto err; + case ER_OUT_OF_RESOURCES: + sql_print_error("\ +Stopping slave I/O thread due to out-of-memory error from master"); + mi->report(ERROR_LEVEL, ER_OUT_OF_RESOURCES, NULL, + "%s", ER_THD(thd, ER_OUT_OF_RESOURCES)); + goto err; + } + if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, + reconnect_messages[SLAVE_RECON_ACT_EVENT])) + goto err; + goto connected; + } // if (event_len == packet_error) + + retry_count=0; // ok event, reset retry counter + THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); + event_buf= (const char*)mysql->net.read_pos + 1; + mi->semi_ack= 0; + if (repl_semisync_slave. + slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len, + &(mi->semi_ack), &event_buf, &event_len)) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + ER_THD(thd, ER_SLAVE_FATAL_ERROR), + "Failed to run 'after_read_event' hook"); + goto err; + } + + /* Control the binlog read speed of master + when read_binlog_speed_limit is non-zero + */ + ulonglong speed_limit_in_bytes = opt_read_binlog_speed_limit * 1024; + if (speed_limit_in_bytes) + { + /* Prevent the tokenamount become a large value, + for example, the IO thread doesn't work for a long time + */ + if (tokenamount > speed_limit_in_bytes * 2) + { + lastchecktime = my_hrtime().val; + tokenamount = speed_limit_in_bytes * 2; + } + + do + { + ulonglong currenttime = my_hrtime().val; + tokenamount += (currenttime - lastchecktime) * speed_limit_in_bytes / (1000*1000); + lastchecktime = currenttime; + if(tokenamount < network_read_len) + { + ulonglong duration =1000ULL*1000 * (network_read_len - tokenamount) / speed_limit_in_bytes; + time_t second_time = (time_t)(duration / (1000 * 1000)); + uint micro_time = duration % (1000 * 1000); + + // at least sleep 1000 micro second + my_sleep(MY_MAX(micro_time,1000)); + + /* + If it sleep more than one second, + it should use slave_sleep() to avoid the STOP SLAVE hang. + */ + if (second_time) + slave_sleep(thd, second_time, io_slave_killed, mi); + + } + }while(tokenamount < network_read_len); + tokenamount -= network_read_len; + } + + if (queue_event(mi, event_buf, event_len)) + { + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "could not queue event from master"); + goto err; + } + + if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK)) + { + /* + We deliberately ignore the error in slave_reply, such error should + not cause the slave IO thread to stop, and the error messages are + already reported. + */ + (void)repl_semisync_slave.slave_reply(mi); + } + + if (mi->using_gtid == Master_info::USE_GTID_NO && + /* + If rpl_semi_sync_slave_delay_master is enabled, we will flush + master info only when ack is needed. This may lead to at least one + group transaction delay but affords better performance improvement. + */ + (!repl_semisync_slave.get_slave_enabled() || + (!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) || + (mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) && + (DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) || + flush_master_info(mi, TRUE, TRUE))) + { + sql_print_error("Failed to flush master info file"); + goto err; + } + /* + See if the relay logs take too much space. + We don't lock mi->rli.log_space_lock here; this dirty read saves time + and does not introduce any problem: + - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so + the clean value is 0), then we are reading only one more event as we + should, and we'll block only at the next event. No big deal. + - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just + after (so the clean value is 1), then we are going into + wait_for_relay_log_space() for no reason, but this function + will do a clean read, notice the clean value and exit + immediately. + */ +#ifndef DBUG_OFF + { + DBUG_PRINT("info", ("log_space_limit=%llu log_space_total=%llu " + "ignore_log_space_limit=%d", + rli->log_space_limit, uint64(rli->log_space_total), + (int) rli->ignore_log_space_limit)); + } +#endif + + if (rli->log_space_limit && rli->log_space_limit < + rli->log_space_total && + !rli->ignore_log_space_limit) + if (wait_for_relay_log_space(rli)) + { + sql_print_error("Slave I/O thread aborted while waiting for relay \ +log space"); + goto err; + } + } + } + + // error = 0; +err: + // print the current replication position + if (mi->using_gtid == Master_info::USE_GTID_NO) + { + sql_print_information("Slave I/O thread exiting, read up to log '%s', " + "position %llu", IO_RPL_LOG_NAME, mi->master_log_pos); + sql_print_information("master was %s:%d", mi->host, mi->port); + } + else + { + StringBuffer<100> tmp; + mi->gtid_current_pos.to_string(&tmp); + sql_print_information("Slave I/O thread exiting, read up to log '%s', " + "position %llu; GTID position %s", + IO_RPL_LOG_NAME, mi->master_log_pos, + tmp.c_ptr_safe()); + sql_print_information("master was %s:%d", mi->host, mi->port); + } + repl_semisync_slave.slave_stop(mi); + thd->reset_query(); + thd->reset_db(&null_clex_str); + if (mysql) + { + /* + Here we need to clear the active VIO before closing the + connection with the master. The reason is that THD::awake() + might be called from terminate_slave_thread() because somebody + issued a STOP SLAVE. If that happends, the close_active_vio() + can be called in the middle of closing the VIO associated with + the 'mysql' object, causing a crash. + */ +#ifdef SIGNAL_WITH_VIO_CLOSE + thd->clear_active_vio(); +#endif + mysql_close(mysql); + mi->mysql=0; + } + write_ignored_events_info_to_relay_log(thd, mi); + if (mi->using_gtid != Master_info::USE_GTID_NO) + flush_master_info(mi, TRUE, TRUE); + THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); + thd->add_status_to_global(); + server_threads.erase(thd); + mysql_mutex_lock(&mi->run_lock); + +err_during_init: + /* Forget the relay log's format */ + delete mi->rli.relay_log.description_event_for_queue; + mi->rli.relay_log.description_event_for_queue= 0; + // TODO: make rpl_status part of Master_info + change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); + + thd->assert_not_linked(); + delete thd; + + mi->abort_slave= 0; + mi->slave_running= MYSQL_SLAVE_NOT_RUN; + mi->io_thd= 0; + /* + Note: the order of the two following calls (first broadcast, then unlock) + is important. Otherwise a killer_thread can execute between the calls and + delete the mi structure leading to a crash! (see BUG#25306 for details) + */ + mysql_cond_broadcast(&mi->stop_cond); // tell the world we are done + DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5);); + mysql_mutex_unlock(&mi->run_lock); + + DBUG_LEAVE; // Must match DBUG_ENTER() + my_thread_end(); + ERR_remove_state(0); + pthread_exit(0); + return 0; // Avoid compiler warnings +} + +/* + Check the temporary directory used by commands like + LOAD DATA INFILE. + + As the directory never changes during a mysqld run, we only + test this once and cache the result. This also resolve a race condition + when this can be run by multiple threads at the same time. + */ + +static bool check_temp_dir_run= 0; +static int check_temp_dir_result= 0; + +static +int check_temp_dir(char* tmp_file) +{ + File fd; + int result= 1; // Assume failure + MY_DIR *dirp; + char tmp_dir[FN_REFLEN]; + size_t tmp_dir_size; + DBUG_ENTER("check_temp_dir"); + + /* This look is safe to use as this function is only called once */ + mysql_mutex_lock(&LOCK_start_thread); + if (check_temp_dir_run) + { + if ((result= check_temp_dir_result)) + my_message(result, tmp_file, MYF(0)); + goto end; + } + check_temp_dir_run= 1; + + /* + Get the directory from the temporary file. + */ + dirname_part(tmp_dir, tmp_file, &tmp_dir_size); + + /* + Check if the directory exists. + */ + if (!(dirp=my_dir(tmp_dir,MYF(MY_WME)))) + goto end; + my_dirend(dirp); + + /* + Check permissions to create a file. We use O_TRUNC to ensure that + things works even if we happen to have and old file laying around. + */ + if ((fd= mysql_file_create(key_file_misc, + tmp_file, CREATE_MODE, + O_WRONLY | O_BINARY | O_TRUNC | O_NOFOLLOW, + MYF(MY_WME))) < 0) + goto end; + + result= 0; // Directory name ok + /* + Clean up. + */ + mysql_file_close(fd, MYF(0)); + mysql_file_delete(key_file_misc, tmp_file, MYF(0)); + +end: + mysql_mutex_unlock(&LOCK_start_thread); + DBUG_RETURN(result); +} + + +void +slave_output_error_info(rpl_group_info *rgi, THD *thd) +{ + /* + retrieve as much info as possible from the thd and, error + codes and warnings and print this to the error log as to + allow the user to locate the error + */ + Relay_log_info *rli= rgi->rli; + uint32 const last_errno= rli->last_error().number; + + if (unlikely(thd->is_error())) + { + char const *const errmsg= thd->get_stmt_da()->message(); + + DBUG_PRINT("info", + ("thd->get_stmt_da()->sql_errno()=%d; rli->last_error.number=%d", + thd->get_stmt_da()->sql_errno(), last_errno)); + if (last_errno == 0) + { + /* + This function is reporting an error which was not reported + while executing exec_relay_log_event(). + */ + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), + rgi->gtid_info(), "%s", errmsg); + } + else if (last_errno != thd->get_stmt_da()->sql_errno()) + { + /* + * An error was reported while executing exec_relay_log_event() + * however the error code differs from what is in the thread. + * This function prints out more information to help finding + * what caused the problem. + */ + sql_print_error("Slave (additional info): %s Error_code: %d", + errmsg, thd->get_stmt_da()->sql_errno()); + } + } + + /* Print any warnings issued */ + Diagnostics_area::Sql_condition_iterator it= + thd->get_stmt_da()->sql_conditions(); + const Sql_condition *err; + /* + Added controlled slave thread cancel for replication + of user-defined variables. + */ + bool udf_error = false; + while ((err= it++)) + { + if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY) + udf_error = true; + sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); + } + if (unlikely(udf_error)) + { + StringBuffer<100> tmp; + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_error("Error loading user-defined library, slave SQL " + "thread aborted. Install the missing library, and restart the " + "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " + "position %llu%s", RPL_LOG_NAME, rli->group_master_log_pos, + tmp.c_ptr_safe()); + } + else + { + StringBuffer<100> tmp; + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_error("Error running query, slave SQL thread aborted. " + "Fix the problem, and restart the slave SQL thread " + "with \"SLAVE START\". We stopped at log '%s' position " + "%llu%s", RPL_LOG_NAME, rli->group_master_log_pos, + tmp.c_ptr_safe()); + } +} + + +/** + Slave SQL thread entry point. + + @param arg Pointer to Relay_log_info object that holds information + for the SQL thread. + + @return Always 0. +*/ +pthread_handler_t handle_slave_sql(void *arg) +{ + THD *thd; /* needs to be first for thread_stack */ + char saved_log_name[FN_REFLEN]; + char saved_master_log_name[FN_REFLEN]; + my_off_t UNINIT_VAR(saved_log_pos); + my_off_t UNINIT_VAR(saved_master_log_pos); + String saved_skip_gtid_pos; + my_off_t saved_skip= 0; + Master_info *mi= ((Master_info*)arg); + Relay_log_info* rli = &mi->rli; + my_bool wsrep_node_dropped __attribute__((unused)) = FALSE; + const char *errmsg; + rpl_group_info *serial_rgi; + rpl_sql_thread_info sql_info(mi->rpl_filter); + + // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff + my_thread_init(); + DBUG_ENTER("handle_slave_sql"); + +#ifdef WITH_WSREP + wsrep_restart_point: +#endif + + serial_rgi= new rpl_group_info(rli); + thd = new THD(next_thread_id()); // note that contructor of THD uses DBUG_ ! + thd->thread_stack = (char*)&thd; // remember where our stack is + thd->system_thread_info.rpl_sql_info= &sql_info; + + DBUG_ASSERT(rli->inited); + DBUG_ASSERT(rli->mi == mi); + mysql_mutex_lock(&rli->run_lock); + DBUG_ASSERT(!rli->slave_running); + errmsg= 0; +#ifndef DBUG_OFF + rli->events_till_abort = abort_slave_event_count; +#endif + + /* + THD for the sql driver thd. In parallel replication this is the thread + that reads things from the relay log and calls rpl_parallel::do_event() + to execute queries. + + In single thread replication this is the THD for the thread that is + executing SQL queries too. + */ + serial_rgi->thd= rli->sql_driver_thd= thd; + + thd->set_psi(PSI_CALL_get_thread()); + + /* Inform waiting threads that slave has started */ + rli->slave_run_id++; + rli->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT; + + pthread_detach_this_thread(); + + if (opt_slave_parallel_threads > 0 && + rpl_parallel_activate_pool(&global_rpl_thread_pool)) + { + mysql_cond_broadcast(&rli->start_cond); + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + "Failed during parallel slave pool activation"); + goto err_during_init; + } + + if (init_slave_thread(thd, mi, SLAVE_THD_SQL)) + { + /* + TODO: this is currently broken - slave start and change master + will be stuck if we fail here + */ + mysql_cond_broadcast(&rli->start_cond); + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + "Failed during slave thread initialization"); + goto err_during_init; + } + thd->init_for_queries(); + thd->rgi_slave= serial_rgi; + if ((serial_rgi->deferred_events_collecting= mi->rpl_filter->is_on())) + { + serial_rgi->deferred_events= new Deferred_log_events(rli); + } + + /* + binlog_annotate_row_events must be TRUE only after an Annotate_rows event + has been received and only till the last corresponding rbr event has been + applied. In all other cases it must be FALSE. + */ + thd->variables.binlog_annotate_row_events= 0; + + /* Ensure that slave can exeute any alter table it gets from master */ + thd->variables.alter_algorithm= (ulong) Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT; + + server_threads.insert(thd); + /* + We are going to set slave_running to 1. Assuming slave I/O thread is + alive and connected, this is going to make Seconds_Behind_Master be 0 + i.e. "caught up". Even if we're just at start of thread. Well it's ok, at + the moment we start we can think we are caught up, and the next second we + start receiving data so we realize we are not caught up and + Seconds_Behind_Master grows. No big deal. + */ + rli->abort_slave = 0; + rli->stop_for_until= false; + mysql_mutex_unlock(&rli->run_lock); + mysql_cond_broadcast(&rli->start_cond); + + /* + Reset errors for a clean start (otherwise, if the master is idle, the SQL + thread may execute no Query_log_event, so the error will remain even + though there's no problem anymore). Do not reset the master timestamp + (imagine the slave has caught everything, the STOP SLAVE and START SLAVE: + as we are not sure that we are going to receive a query, we want to + remember the last master timestamp (to say how many seconds behind we are + now. + But the master timestamp is reset by RESET SLAVE & CHANGE MASTER. + */ + rli->clear_error(); + rli->parallel.reset(); + + //tell the I/O thread to take relay_log_space_limit into account from now on + rli->ignore_log_space_limit= 0; + + serial_rgi->gtid_sub_id= 0; + serial_rgi->gtid_pending= false; + if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() && + rli->restart_gtid_pos.count() > 0) + { + /* + With parallel replication in GTID mode, if we have a multi-domain GTID + position, we need to start some way back in the relay log and skip any + GTID that was already applied before. Since event groups can be split + across multiple relay logs, this earlier starting point may be in the + middle of an already applied event group, so we also need to skip any + remaining part of such group. + */ + rli->gtid_skip_flag = GTID_SKIP_TRANSACTION; + } + else + rli->gtid_skip_flag = GTID_SKIP_NOT; + if (init_relay_log_pos(rli, + rli->group_relay_log_name, + rli->group_relay_log_pos, + 1 /*need data lock*/, &errmsg, + 1 /*look for a description_event*/)) + { + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + "Error initializing relay log position: %s", errmsg); + goto err_before_start; + } + rli->reset_inuse_relaylog(); + if (rli->alloc_inuse_relaylog(rli->group_relay_log_name)) + goto err_before_start; + + strcpy(rli->future_event_master_log_name, rli->group_master_log_name); + THD_CHECK_SENTRY(thd); +#ifndef DBUG_OFF + { + DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu " + "rli->event_relay_log_pos=%llu", + my_b_tell(rli->cur_log), rli->event_relay_log_pos)); + DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); + /* + Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the + correct position when it's called just after my_b_seek() (the questionable + stuff is those "seek is done on next read" comments in the my_b_seek() + source code). + The crude reality is that this assertion randomly fails whereas + replication seems to work fine. And there is no easy explanation why it + fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of + init_relay_log_pos() called above). Maybe the assertion would be + meaningful if we held rli->data_lock between the my_b_seek() and the + DBUG_ASSERT(). + */ +#ifdef SHOULD_BE_CHECKED + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); +#endif + } +#endif + + DBUG_PRINT("master_info",("log_file_name: %s position: %llu", + rli->group_master_log_name, + rli->group_master_log_pos)); + if (global_system_variables.log_warnings) + { + StringBuffer<100> tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, + mi->using_gtid==Master_info::USE_GTID_CURRENT_POS); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_information("Slave SQL thread initialized, starting replication " + "in log '%s' at position %llu, relay log '%s' " + "position: %llu%s", RPL_LOG_NAME, + rli->group_master_log_pos, rli->group_relay_log_name, + rli->group_relay_log_pos, tmp.c_ptr_safe()); + } + + if (check_temp_dir(rli->slave_patternload_file)) + { + check_temp_dir_result= thd->get_stmt_da()->sql_errno(); + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Unable to use slave's temporary directory %s - %s", + slave_load_tmpdir, thd->get_stmt_da()->message()); + goto err; + } + else + check_temp_dir_result= 0; + + /* Load the set of seen GTIDs, if we did not already. */ + if (rpl_load_gtid_slave_state(thd)) + { + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Unable to load replication GTID slave state from mysql.%s: %s", + rpl_gtid_slave_state_table_name.str, + thd->get_stmt_da()->message()); + /* + If we are using old-style replication, we can continue, even though we + then will not be able to record the GTIDs we receive. But if using GTID, + we must give up. + */ + if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode) + goto err; + } + /* Re-load the set of mysql.gtid_slave_posXXX tables available. */ + if (find_gtid_slave_pos_tables(thd)) + { + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Error processing replication GTID position tables: %s", + thd->get_stmt_da()->message()); + goto err; + } + + /* execute init_slave variable */ + if (opt_init_slave.length) + { + execute_init_command(thd, &opt_init_slave, &LOCK_sys_init_slave); + if (unlikely(thd->is_slave_error)) + { + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Slave SQL thread aborted. Can't execute init_slave query"); + goto err; + } + } + + /* + First check until condition - probably there is nothing to execute. We + do not want to wait for next event in this case. + */ + mysql_mutex_lock(&rli->data_lock); + if (rli->slave_skip_counter) + { + strmake_buf(saved_log_name, rli->group_relay_log_name); + strmake_buf(saved_master_log_name, rli->group_master_log_name); + saved_log_pos= rli->group_relay_log_pos; + saved_master_log_pos= rli->group_master_log_pos; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + saved_skip_gtid_pos.append(STRING_WITH_LEN(", GTID '")); + rpl_append_gtid_state(&saved_skip_gtid_pos, false); + saved_skip_gtid_pos.append(STRING_WITH_LEN("'; ")); + } + saved_skip= rli->slave_skip_counter; + } + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && + rli->is_until_satisfied(NULL)) + { + sql_print_information("Slave SQL thread stopped because it reached its" + " UNTIL position %llu in %s %s file", + rli->until_pos(), rli->until_name(), + rli->until_condition == + Relay_log_info::UNTIL_MASTER_POS ? + "binlog" : "relaylog"); + mysql_mutex_unlock(&rli->data_lock); + goto err; + } + mysql_mutex_unlock(&rli->data_lock); +#ifdef WITH_WSREP + wsrep_open(thd); + if (wsrep_before_command(thd)) + { + WSREP_WARN("Slave SQL wsrep_before_command() failed"); + goto err; + } +#endif /* WITH_WSREP */ + /* Read queries from the IO/THREAD until this thread is killed */ + + thd->set_command(COM_SLAVE_SQL); + while (!sql_slave_killed(serial_rgi)) + { + THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log); + THD_CHECK_SENTRY(thd); + + if (saved_skip && rli->slave_skip_counter == 0) + { + StringBuffer<100> tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN(", GTID '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'; ")); + } + + sql_print_information("'SQL_SLAVE_SKIP_COUNTER=%ld' executed at " + "relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', " + "master_log_pos='%ld'%s and new position at " + "relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', " + "master_log_pos='%ld'%s ", + (ulong) saved_skip, saved_log_name, (ulong) saved_log_pos, + saved_master_log_name, (ulong) saved_master_log_pos, + saved_skip_gtid_pos.c_ptr_safe(), + rli->group_relay_log_name, (ulong) rli->group_relay_log_pos, + rli->group_master_log_name, (ulong) rli->group_master_log_pos, + tmp.c_ptr_safe()); + saved_skip= 0; + saved_skip_gtid_pos.free(); + } + + if (exec_relay_log_event(thd, rli, serial_rgi)) + { +#ifdef WITH_WSREP + if (WSREP(thd)) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + + if (thd->wsrep_cs().current_error()) + { + wsrep_node_dropped = TRUE; + rli->abort_slave = TRUE; + } + mysql_mutex_unlock(&thd->LOCK_thd_data); + } +#endif /* WITH_WSREP */ + + DBUG_PRINT("info", ("exec_relay_log_event() failed")); + // do not scare the user if SQL thread was simply killed or stopped + if (!sql_slave_killed(serial_rgi)) + { + slave_output_error_info(serial_rgi, thd); + if (WSREP(thd) && rli->last_error().number == ER_UNKNOWN_COM_ERROR) + { + wsrep_node_dropped= TRUE; + } + } + goto err; + } + } + + err: + if (mi->using_parallel()) + rli->parallel.wait_for_done(thd, rli); + /* Gtid_list_log_event::do_apply_event has already reported the GTID until */ + if (rli->stop_for_until && rli->until_condition != Relay_log_info::UNTIL_GTID) + { + if (global_system_variables.log_warnings > 2) + sql_print_information("Slave SQL thread UNTIL stop was requested at position " + "%llu in %s %s file", + rli->until_log_pos, rli->until_log_name, + rli->until_condition == + Relay_log_info::UNTIL_MASTER_POS ? + "binlog" : "relaylog"); + sql_print_information("Slave SQL thread stopped because it reached its" + " UNTIL position %llu in %s %s file", + rli->until_pos(), rli->until_name(), + rli->until_condition == + Relay_log_info::UNTIL_MASTER_POS ? + "binlog" : "relaylog"); + + }; + /* Thread stopped. Print the current replication position to the log */ + { + StringBuffer<100> tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_information("Slave SQL thread exiting, replication stopped in " + "log '%s' at position %llu%s", RPL_LOG_NAME, + rli->group_master_log_pos, tmp.c_ptr_safe()); + sql_print_information("master was %s:%d", mi->host, mi->port); + } +#ifdef WITH_WSREP + wsrep_after_command_before_result(thd); + wsrep_after_command_after_result(thd); +#endif /* WITH_WSREP */ + + err_before_start: + + /* + Some events set some playgrounds, which won't be cleared because thread + stops. Stopping of this thread may not be known to these events ("stop" + request is detected only by the present function, not by events), so we + must "proactively" clear playgrounds: + */ + thd->clear_error(); + serial_rgi->cleanup_context(thd, 1); + /* + Some extra safety, which should not been needed (normally, event deletion + should already have done these assignments (each event which sets these + variables is supposed to set them to 0 before terminating)). + */ + thd->catalog= 0; + thd->reset_query(); + thd->reset_db(&null_clex_str); + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + ulong domain_count; + my_bool save_log_all_errors= thd->log_all_errors; + + /* + We don't need to check return value for rli->flush() + as any errors should be logged to stderr + */ + thd->log_all_errors= 1; + rli->flush(); + thd->log_all_errors= save_log_all_errors; + if (mi->using_parallel()) + { + /* + In parallel replication GTID mode, we may stop with different domains + at different positions in the relay log. + + To handle this when we restart the SQL thread, mark the current + per-domain position in the Relay_log_info. + */ + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + domain_count= rpl_global_gtid_slave_state->count(); + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (domain_count > 1) + { + inuse_relaylog *ir; + + /* + Load the starting GTID position, so that we can skip already applied + GTIDs when we restart the SQL thread. And set the start position in + the relay log back to a known safe place to start (prior to any not + yet applied transaction in any domain). + */ + rli->restart_gtid_pos.load(rpl_global_gtid_slave_state, NULL, 0); + if ((ir= rli->inuse_relaylog_list)) + { + rpl_gtid *gtid= ir->relay_log_state; + uint32 count= ir->relay_log_state_count; + while (count > 0) + { + process_gtid_for_restart_pos(rli, gtid); + ++gtid; + --count; + } + strmake_buf(rli->group_relay_log_name, ir->name); + rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE; + rli->relay_log_state.load(ir->relay_log_state, ir->relay_log_state_count); + } + } + } + } + THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); + thd->add_status_to_global(); + server_threads.erase(thd); + mysql_mutex_lock(&rli->run_lock); + +err_during_init: + /* We need data_lock, at least to wake up any waiting master_pos_wait() */ + mysql_mutex_lock(&rli->data_lock); + DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT); // tracking buffer overrun + /* When master_pos_wait() wakes up it will check this and terminate */ + rli->slave_running= MYSQL_SLAVE_NOT_RUN; + /* Forget the relay log's format */ + delete rli->relay_log.description_event_for_exec; + rli->relay_log.description_event_for_exec= 0; + rli->reset_inuse_relaylog(); + /* Wake up master_pos_wait() */ + mysql_mutex_unlock(&rli->data_lock); + DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions")); + mysql_cond_broadcast(&rli->data_cond); + rli->ignore_log_space_limit= 0; /* don't need any lock */ + /* we die so won't remember charset - re-update them on next thread start */ + thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); + + /* + TODO: see if we can do this conditionally in next_event() instead + to avoid unneeded position re-init + + We only reset THD::temporary_tables to 0 here and not free it, as this + could be used by slave through Relay_log_info::save_temporary_tables. + */ + thd->temporary_tables= 0; + rli->sql_driver_thd= 0; + thd->rgi_fake= thd->rgi_slave= NULL; + +#ifdef WITH_WSREP + /* + If slave stopped due to node going non primary, we set global flag to + trigger automatic restart of slave when node joins back to cluster. + */ + if (WSREP(thd) && wsrep_node_dropped && wsrep_restart_slave) + { + if (wsrep_ready_get()) + { + WSREP_INFO("Slave error due to node temporarily non-primary" + "SQL slave will continue"); + wsrep_node_dropped= FALSE; + mysql_mutex_unlock(&rli->run_lock); + goto wsrep_restart_point; + } + else + { + WSREP_INFO("Slave error due to node going non-primary"); + WSREP_INFO("wsrep_restart_slave was set and therefore slave will be " + "automatically restarted when node joins back to cluster"); + wsrep_restart_slave_activated= TRUE; + } + } + wsrep_close(thd); +#endif /* WITH_WSREP */ + + /* + Note: the order of the broadcast and unlock calls below (first + broadcast, then unlock) is important. Otherwise a killer_thread can + execute between the calls and delete the mi structure leading to a + crash! (see BUG#25306 for details) + */ + mysql_cond_broadcast(&rli->stop_cond); + DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5);); + mysql_mutex_unlock(&rli->run_lock); // tell the world we are done + + rpl_parallel_resize_pool_if_no_slaves(); + + delete serial_rgi; + delete thd; + + DBUG_LEAVE; // Must match DBUG_ENTER() + my_thread_end(); + ERR_remove_state(0); + pthread_exit(0); + return 0; // Avoid compiler warnings +} + + +/* + process_io_create_file() +*/ + +static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) +{ + int error = 1; + ulong num_bytes; + bool cev_not_written; + THD *thd = mi->io_thd; + NET *net = &mi->mysql->net; + DBUG_ENTER("process_io_create_file"); + + if (unlikely(!cev->is_valid())) + DBUG_RETURN(1); + + if (!mi->rpl_filter->db_ok(cev->db)) + { + skip_load_data_infile(net); + DBUG_RETURN(0); + } + DBUG_ASSERT(cev->inited_from_old); + thd->file_id = cev->file_id = mi->file_id++; + thd->variables.server_id = cev->server_id; + cev_not_written = 1; + + if (unlikely(net_request_file(net,cev->fname))) + { + sql_print_error("Slave I/O: failed requesting download of '%s'", + cev->fname); + goto err; + } + + /* + This dummy block is so we could instantiate Append_block_log_event + once and then modify it slightly instead of doing it multiple times + in the loop + */ + { + Append_block_log_event aev(thd,0,0,0,0); + + for (;;) + { + if (unlikely((num_bytes=my_net_read(net)) == packet_error)) + { + sql_print_error("Network read error downloading '%s' from master", + cev->fname); + goto err; + } + if (unlikely(!num_bytes)) /* eof */ + { + /* 3.23 master wants it */ + net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); + /* + If we wrote Create_file_log_event, then we need to write + Execute_load_log_event. If we did not write Create_file_log_event, + then this is an empty file and we can just do as if the LOAD DATA + INFILE had not existed, i.e. write nothing. + */ + if (unlikely(cev_not_written)) + break; + Execute_load_log_event xev(thd,0,0); + xev.log_pos = cev->log_pos; + if (unlikely(mi->rli.relay_log.append(&xev))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "error writing Exec_load event to relay log"); + goto err; + } + mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); + break; + } + if (unlikely(cev_not_written)) + { + cev->block = net->read_pos; + cev->block_len = num_bytes; + if (unlikely(mi->rli.relay_log.append(cev))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "error writing Create_file event to relay log"); + goto err; + } + cev_not_written=0; + mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); + } + else + { + aev.block = net->read_pos; + aev.block_len = num_bytes; + aev.log_pos = cev->log_pos; + if (unlikely(mi->rli.relay_log.append(&aev))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, + ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "error writing Append_block event to relay log"); + goto err; + } + mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ; + } + } + } + error=0; +err: + DBUG_RETURN(error); +} + + +/* + Start using a new binary log on the master + + SYNOPSIS + process_io_rotate() + mi master_info for the slave + rev The rotate log event read from the binary log + + DESCRIPTION + Updates the master info with the place in the next binary + log where we should start reading. + Rotate the relay log to avoid mixed-format relay logs. + + NOTES + We assume we already locked mi->data_lock + + RETURN VALUES + 0 ok + 1 Log event is illegal + +*/ + +static int process_io_rotate(Master_info *mi, Rotate_log_event *rev) +{ + DBUG_ENTER("process_io_rotate"); + mysql_mutex_assert_owner(&mi->data_lock); + + if (unlikely(!rev->is_valid())) + DBUG_RETURN(1); + + /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */ + memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1); + mi->master_log_pos= rev->pos; + DBUG_PRINT("info", ("master_log_pos: '%s' %lu", + mi->master_log_name, (ulong) mi->master_log_pos)); +#ifndef DBUG_OFF + /* + If we do not do this, we will be getting the first + rotate event forever, so we need to not disconnect after one. + */ + if (disconnect_slave_event_count) + mi->events_till_disconnect++; +#endif + + /* + If description_event_for_queue is format <4, there is conversion in the + relay log to the slave's format (4). And Rotate can mean upgrade or + nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so + no need to reset description_event_for_queue now. And if it's nothing (same + master version as before), no need (still using the slave's format). + */ + if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4) + { + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + mi->rli.relay_log.relay_log_checksum_alg); + + delete mi->rli.relay_log.description_event_for_queue; + /* start from format 3 (MySQL 4.0) again */ + mi->rli.relay_log.description_event_for_queue= new + Format_description_log_event(3); + mi->rli.relay_log.description_event_for_queue->checksum_alg= + mi->rli.relay_log.relay_log_checksum_alg; + } + /* + Rotate the relay log makes binlog format detection easier (at next slave + start or mysqlbinlog) + */ + DBUG_RETURN(rotate_relay_log(mi) /* will take the right mutexes */); +} + +/* + Reads a 3.23 event and converts it to the slave's format. This code was + copied from MySQL 4.0. +*/ +static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, + ulong event_len) +{ + const char *errmsg = 0; + ulong inc_pos; + bool ignore_event= 0; + char *tmp_buf = 0; + Relay_log_info *rli= &mi->rli; + DBUG_ENTER("queue_binlog_ver_1_event"); + + /* + If we get Load event, we need to pass a non-reusable buffer + to read_log_event, so we do a trick + */ + if ((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) + { + if (unlikely(!(tmp_buf=(char*)my_malloc(key_memory_binlog_ver_1_event, + event_len+1,MYF(MY_WME))))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed"); + DBUG_RETURN(1); + } + memcpy(tmp_buf,buf,event_len); + /* + Create_file constructor wants a 0 as last char of buffer, this 0 will + serve as the string-termination char for the file's name (which is at the + end of the buffer) + We must increment event_len, otherwise the event constructor will not see + this end 0, which leads to segfault. + */ + tmp_buf[event_len++]=0; + int4store(tmp_buf+EVENT_LEN_OFFSET, event_len); + buf = (const char*)tmp_buf; + } + /* + This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to + send the loaded file, and write it to the relay log in the form of + Append_block/Exec_load (the SQL thread needs the data, as that thread is not + connected to the master). + */ + Log_event *ev= + Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, 0); + if (unlikely(!ev)) + { + sql_print_error("Read invalid event from master: '%s',\ + master could be corrupt but a more likely cause of this is a bug", + errmsg); + my_free(tmp_buf); + DBUG_RETURN(1); + } + + mysql_mutex_lock(&mi->data_lock); + ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */ + switch (ev->get_type_code()) { + case STOP_EVENT: + ignore_event= 1; + inc_pos= event_len; + break; + case ROTATE_EVENT: + if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev))) + { + delete ev; + mysql_mutex_unlock(&mi->data_lock); + DBUG_RETURN(1); + } + inc_pos= 0; + break; + case CREATE_FILE_EVENT: + /* + Yes it's possible to have CREATE_FILE_EVENT here, even if we're in + queue_old_event() which is for 3.23 events which don't comprise + CREATE_FILE_EVENT. This is because read_log_event() above has just + transformed LOAD_EVENT into CREATE_FILE_EVENT. + */ + { + /* We come here when and only when tmp_buf != 0 */ + DBUG_ASSERT(tmp_buf != 0); + inc_pos=event_len; + ev->log_pos+= inc_pos; + int error = process_io_create_file(mi,(Create_file_log_event*)ev); + delete ev; + mi->master_log_pos += inc_pos; + DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); + mysql_mutex_unlock(&mi->data_lock); + my_free(tmp_buf); + DBUG_RETURN(error); + } + default: + inc_pos= event_len; + break; + } + if (likely(!ignore_event)) + { + if (ev->log_pos) + /* + Don't do it for fake Rotate events (see comment in + Log_event::Log_event(const char* buf...) in log_event.cc). + */ + ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */ + if (unlikely(rli->relay_log.append(ev))) + { + delete ev; + mysql_mutex_unlock(&mi->data_lock); + DBUG_RETURN(1); + } + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + delete ev; + mi->master_log_pos+= inc_pos; + DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); + mysql_mutex_unlock(&mi->data_lock); + DBUG_RETURN(0); +} + +/* + Reads a 4.0 event and converts it to the slave's format. This code was copied + from queue_binlog_ver_1_event(), with some affordable simplifications. +*/ +static int queue_binlog_ver_3_event(Master_info *mi, const char *buf, + ulong event_len) +{ + const char *errmsg = 0; + ulong inc_pos; + char *tmp_buf = 0; + Relay_log_info *rli= &mi->rli; + DBUG_ENTER("queue_binlog_ver_3_event"); + + /* read_log_event() will adjust log_pos to be end_log_pos */ + Log_event *ev= + Log_event::read_log_event(buf,event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, 0); + if (unlikely(!ev)) + { + sql_print_error("Read invalid event from master: '%s',\ + master could be corrupt but a more likely cause of this is a bug", + errmsg); + my_free(tmp_buf); + DBUG_RETURN(1); + } + mysql_mutex_lock(&mi->data_lock); + switch (ev->get_type_code()) { + case STOP_EVENT: + goto err; + case ROTATE_EVENT: + if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev))) + { + delete ev; + mysql_mutex_unlock(&mi->data_lock); + DBUG_RETURN(1); + } + inc_pos= 0; + break; + default: + inc_pos= event_len; + break; + } + + if (unlikely(rli->relay_log.append(ev))) + { + delete ev; + mysql_mutex_unlock(&mi->data_lock); + DBUG_RETURN(1); + } + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + delete ev; + mi->master_log_pos+= inc_pos; +err: + DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); + mysql_mutex_unlock(&mi->data_lock); + DBUG_RETURN(0); +} + +/* + queue_old_event() + + Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0 + (exactly, slave's) format. To do the conversion, we create a 5.0 event from + the 3.23/4.0 bytes, then write this event to the relay log. + + TODO: + Test this code before release - it has to be tested on a separate + setup with 3.23 master or 4.0 master +*/ + +static int queue_old_event(Master_info *mi, const char *buf, + ulong event_len) +{ + DBUG_ENTER("queue_old_event"); + + switch (mi->rli.relay_log.description_event_for_queue->binlog_version) + { + case 1: + DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len)); + case 3: + DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len)); + default: /* unsupported format; eg version 2 */ + DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()", + mi->rli.relay_log.description_event_for_queue->binlog_version)); + DBUG_RETURN(1); + } +} + +/* + queue_event() + + If the event is 3.23/4.0, passes it to queue_old_event() which will convert + it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is + no format conversion, it's pure read/write of bytes. + So a 5.0.0 slave's relay log can contain events in the slave's format or in + any >=5.0.0 format. +*/ + +static int queue_event(Master_info* mi,const char* buf, ulong event_len) +{ + int error= 0; + StringBuffer<1024> error_msg; + ulonglong inc_pos= 0; + ulonglong event_pos; + Relay_log_info *rli= &mi->rli; + mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); + ulong s_id; + bool unlock_data_lock= TRUE; + bool gtid_skip_enqueue= false; + bool got_gtid_event= false; + rpl_gtid event_gtid; + static uint dbug_rows_event_count __attribute__((unused))= 0; + bool is_compress_event = false; + char* new_buf = NULL; + char new_buf_arr[4096]; + bool is_malloc = false; + bool is_rows_event= false; + /* + FD_q must have been prepared for the first R_a event + inside get_master_version_and_clock() + Show-up of FD:s affects checksum_alg at once because + that changes FD_queue. + */ + enum enum_binlog_checksum_alg checksum_alg= + mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ? + mi->checksum_alg_before_fd : mi->rli.relay_log.relay_log_checksum_alg; + + char *save_buf= NULL; // needed for checksumming the fake Rotate event + char rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN]; + + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + + DBUG_ENTER("queue_event"); + /* + FD_queue checksum alg description does not apply in a case of + FD itself. The one carries both parts of the checksum data. + */ + if (buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) + { + checksum_alg= get_checksum_alg(buf, event_len); + } + else if (buf[EVENT_TYPE_OFFSET] == START_EVENT_V3) + { + // checksum behaviour is similar to the pre-checksum FD handling + mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; + mi->rli.relay_log.description_event_for_queue->checksum_alg= + mi->rli.relay_log.relay_log_checksum_alg= checksum_alg= + BINLOG_CHECKSUM_ALG_OFF; + } + + // does not hold always because of old binlog can work with NM + // DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + + // should hold unless manipulations with RL. Tests that do that + // will have to refine the clause. + DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg != + BINLOG_CHECKSUM_ALG_UNDEF); + + // Emulate the network corruption + DBUG_EXECUTE_IF("corrupt_queue_event", + if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + { + char *debug_event_buf_c = (char*) buf; + int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); + debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos]; + DBUG_PRINT("info", ("Corrupt the event at queue_event: byte on position %d", debug_cor_pos)); + DBUG_SET("-d,corrupt_queue_event"); + } + ); + + if (event_checksum_test((uchar *) buf, event_len, checksum_alg)) + { + error= ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE; + unlock_data_lock= FALSE; + goto err; + } + + if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && + (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) + DBUG_RETURN(queue_old_event(mi,buf,event_len)); + +#ifdef ENABLED_DEBUG_SYNC + /* + A (+d,dbug.rows_events_to_delay_relay_logging)-test is supposed to + create a few Write_log_events and after receiving the 1st of them + the IO thread signals to launch the SQL thread, and sets itself to + wait for a release signal. + */ + DBUG_EXECUTE_IF("dbug.rows_events_to_delay_relay_logging", + if ((buf[EVENT_TYPE_OFFSET] == WRITE_ROWS_EVENT_V1 || + buf[EVENT_TYPE_OFFSET] == WRITE_ROWS_EVENT) && + ++dbug_rows_event_count == 2) + { + const char act[]= + "now SIGNAL start_sql_thread " + "WAIT_FOR go_on_relay_logging"; + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action(current_thd, + STRING_WITH_LEN(act))); + dbug_rows_event_count = 0; + };); +#endif + mysql_mutex_lock(&mi->data_lock); + + switch ((uchar)buf[EVENT_TYPE_OFFSET]) { + case STOP_EVENT: + /* + We needn't write this event to the relay log. Indeed, it just indicates a + master server shutdown. The only thing this does is cleaning. But + cleaning is already done on a per-master-thread basis (as the master + server is shutting down cleanly, it has written all DROP TEMPORARY TABLE + prepared statements' deletion are TODO only when we binlog prep stmts). + + We don't even increment mi->master_log_pos, because we may be just after + a Rotate event. Btw, in a few milliseconds we are going to have a Start + event from the next binlog (unless the master is presently running + without --log-bin). + */ + goto err; + case ROTATE_EVENT: + { + Rotate_log_event rev(buf, checksum_alg != BINLOG_CHECKSUM_ALG_OFF ? + event_len - BINLOG_CHECKSUM_LEN : event_len, + mi->rli.relay_log.description_event_for_queue); + + if (unlikely(mi->gtid_reconnect_event_skip_count) && + unlikely(!mi->gtid_event_seen) && + rev.is_artificial_event() && + (mi->prev_master_id != mi->master_id || + strcmp(rev.new_log_ident, mi->master_log_name) != 0)) + { + /* + Artificial Rotate_log_event is the first event we receive at the start + of each master binlog file. It gives the name of the new binlog file. + + Normally, we already have this name from the real rotate event at the + end of the previous binlog file (unless we are making a new connection + using GTID). But if the master server restarted/crashed, there is no + rotate event at the end of the prior binlog file, so the name is new. + + We use this fact to handle a special case of master crashing. If the + master crashed while writing the binlog, it might end with a partial + event group lacking the COMMIT/XID event, which must be rolled + back. If the slave IO thread happens to get a disconnect in the middle + of exactly this event group, it will try to reconnect at the same GTID + and skip already fetched events. However, that GTID did not commit on + the master before the crash, so it does not really exist, and the + master will connect the slave at the next following GTID starting in + the next binlog. This could confuse the slave and make it mix the + start of one event group with the end of another. + + But we detect this case here, by noticing the change of binlog name + which detects the missing rotate event at the end of the previous + binlog file. In this case, we reset the counters to make us not skip + the next event group, and queue an artificial Format Description + event. The previously fetched incomplete event group will then be + rolled back when the Format Description event is executed by the SQL + thread. + + A similar case is if the reconnect somehow connects to a different + master server (like due to a network proxy or IP address takeover). + We detect this case by noticing a change of server_id and in this + case likewise rollback the partially received event group. + */ + Format_description_log_event fdle(4); + + if (mi->prev_master_id != mi->master_id) + sql_print_warning("The server_id of master server changed in the " + "middle of GTID %u-%u-%llu. Assuming a change of " + "master server, so rolling back the previously " + "received partial transaction. Expected: %lu, " + "received: %lu", mi->last_queued_gtid.domain_id, + mi->last_queued_gtid.server_id, + mi->last_queued_gtid.seq_no, + mi->prev_master_id, mi->master_id); + else if (strcmp(rev.new_log_ident, mi->master_log_name) != 0) + sql_print_warning("Unexpected change of master binlog file name in the " + "middle of GTID %u-%u-%llu, assuming that master has " + "crashed and rolling back the transaction. Expected: " + "'%s', received: '%s'", + mi->last_queued_gtid.domain_id, + mi->last_queued_gtid.server_id, + mi->last_queued_gtid.seq_no, + mi->master_log_name, rev.new_log_ident); + + mysql_mutex_lock(log_lock); + if (likely(!rli->relay_log.write_event(&fdle) && + !rli->relay_log.flush_and_sync(NULL))) + { + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + else + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + mysql_mutex_unlock(log_lock); + goto err; + } + rli->relay_log.signal_relay_log_update(); + mysql_mutex_unlock(log_lock); + + mi->gtid_reconnect_event_skip_count= 0; + mi->events_queued_since_last_gtid= 0; + } + mi->prev_master_id= mi->master_id; + + if (unlikely(process_io_rotate(mi, &rev))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + /* + Checksum special cases for the fake Rotate (R_f) event caused by the protocol + of events generation and serialization in RL where Rotate of master is + queued right next to FD of slave. + Since it's only FD that carries the alg desc of FD_s has to apply to R_m. + Two special rules apply only to the first R_f which comes in before any FD_m. + The 2nd R_f should be compatible with the FD_s that must have taken over + the last seen FD_m's (A). + + RSC_1: If OM \and fake Rotate \and slave is configured to + to compute checksum for its first FD event for RL + the fake Rotate gets checksummed here. + */ + if (uint4korr(&buf[0]) == 0 && checksum_alg == BINLOG_CHECKSUM_ALG_OFF && + mi->rli.relay_log.relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_OFF) + { + ha_checksum rot_crc= 0; + event_len += BINLOG_CHECKSUM_LEN; + memcpy(rot_buf, buf, event_len - BINLOG_CHECKSUM_LEN); + int4store(&rot_buf[EVENT_LEN_OFFSET], + uint4korr(&rot_buf[EVENT_LEN_OFFSET]) + BINLOG_CHECKSUM_LEN); + rot_crc= my_checksum(rot_crc, (const uchar *) rot_buf, + event_len - BINLOG_CHECKSUM_LEN); + int4store(&rot_buf[event_len - BINLOG_CHECKSUM_LEN], rot_crc); + DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET])); + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + mi->rli.relay_log.relay_log_checksum_alg); + /* the first one */ + DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); + save_buf= (char *) buf; + buf= rot_buf; + } + else + /* + RSC_2: If NM \and fake Rotate \and slave does not compute checksum + the fake Rotate's checksum is stripped off before relay-logging. + */ + if (uint4korr(&buf[0]) == 0 && checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + mi->rli.relay_log.relay_log_checksum_alg == BINLOG_CHECKSUM_ALG_OFF) + { + event_len -= BINLOG_CHECKSUM_LEN; + memcpy(rot_buf, buf, event_len); + int4store(&rot_buf[EVENT_LEN_OFFSET], + uint4korr(&rot_buf[EVENT_LEN_OFFSET]) - BINLOG_CHECKSUM_LEN); + DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET])); + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + mi->rli.relay_log.relay_log_checksum_alg); + /* the first one */ + DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); + save_buf= (char *) buf; + buf= rot_buf; + } + /* + Now the I/O thread has just changed its mi->master_log_name, so + incrementing mi->master_log_pos is nonsense. + */ + inc_pos= 0; + break; + } + case FORMAT_DESCRIPTION_EVENT: + { + /* + Create an event, and save it (when we rotate the relay log, we will have + to write this event again). + */ + /* + We are the only thread which reads/writes description_event_for_queue. + The relay_log struct does not move (though some members of it can + change), so we needn't any lock (no rli->data_lock, no log lock). + */ + Format_description_log_event* tmp; + const char* errmsg; + // mark it as undefined that is irrelevant anymore + mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; + if (!(tmp= (Format_description_log_event*) + Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, + 1))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + tmp->copy_crypto_data(mi->rli.relay_log.description_event_for_queue); + delete mi->rli.relay_log.description_event_for_queue; + mi->rli.relay_log.description_event_for_queue= tmp; + if (tmp->checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) + tmp->checksum_alg= BINLOG_CHECKSUM_ALG_OFF; + + /* installing new value of checksum Alg for relay log */ + mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg; + + /* + Do not queue any format description event that we receive after a + reconnect where we are skipping over a partial event group received + before the reconnect. + + (If we queued such an event, and it was the first format_description + event after master restart, the slave SQL thread would think that + the partial event group before it in the relay log was from a + previous master crash and should be rolled back). + */ + if (unlikely(mi->gtid_reconnect_event_skip_count && !mi->gtid_event_seen)) + gtid_skip_enqueue= true; + + /* + Though this does some conversion to the slave's format, this will + preserve the master's binlog format version, and number of event types. + */ + /* + If the event was not requested by the slave (the slave did not ask for + it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos + */ + inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; + DBUG_PRINT("info",("binlog format is now %d", + mi->rli.relay_log.description_event_for_queue->binlog_version)); + + } + break; + + case HEARTBEAT_LOG_EVENT: + { + /* + HB (heartbeat) cannot come before RL (Relay) + */ + Heartbeat_log_event hb(buf, + mi->rli.relay_log.relay_log_checksum_alg + != BINLOG_CHECKSUM_ALG_OFF ? + event_len - BINLOG_CHECKSUM_LEN : event_len, + mi->rli.relay_log.description_event_for_queue); + if (!hb.is_valid()) + { + error= ER_SLAVE_HEARTBEAT_FAILURE; + error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;")); + error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); + error_msg.append(hb.get_log_ident(), (uint) hb.get_ident_len()); + error_msg.append(STRING_WITH_LEN(" log_pos ")); + error_msg.append_ulonglong(hb.log_pos); + goto err; + } + mi->received_heartbeats++; + /* + compare local and event's versions of log_file, log_pos. + + Heartbeat is sent only after an event corresponding to the corrdinates + the heartbeat carries. + Slave can not have a higher coordinate except in the only + special case when mi->master_log_name, master_log_pos have never + been updated by Rotate event i.e when slave does not have any history + with the master (and thereafter mi->master_log_pos is NULL). + + Slave can have lower coordinates, if some event from master was omitted. + + TODO: handling `when' for SHOW SLAVE STATUS' snds behind + */ + if (memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) || + mi->master_log_pos > hb.log_pos) { + /* missed events of heartbeat from the past */ + error= ER_SLAVE_HEARTBEAT_FAILURE; + error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;")); + error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); + error_msg.append(hb.get_log_ident(), (uint) hb.get_ident_len()); + error_msg.append(STRING_WITH_LEN(" log_pos ")); + error_msg.append_ulonglong(hb.log_pos); + goto err; + } + + /* + Heartbeat events doesn't count in the binlog size, so we don't have to + increment mi->master_log_pos + */ + goto skip_relay_logging; + } + break; + + case GTID_LIST_EVENT: + { + const char *errmsg; + Gtid_list_log_event *glev; + Log_event *tmp; + uint32 flags; + + if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, + opt_slave_sql_verify_checksum))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + glev= static_cast<Gtid_list_log_event *>(tmp); + event_pos= glev->log_pos; + flags= glev->gl_flags; + delete glev; + + /* + We use fake Gtid_list events to update the old-style position (among + other things). + + Early code created fake Gtid_list events with zero log_pos, those should + not modify old-style position. + */ + if (event_pos == 0 || event_pos <= mi->master_log_pos) + inc_pos= 0; + else + inc_pos= event_pos - mi->master_log_pos; + + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID && + flags & Gtid_list_log_event::FLAG_UNTIL_REACHED) + { + char str_buf[128]; + String str(str_buf, sizeof(str_buf), system_charset_info); + mi->rli.until_gtid_pos.to_string(&str); + sql_print_information("Slave I/O thread stops because it reached its" + " UNTIL master_gtid_pos %s", str.c_ptr_safe()); + mi->abort_slave= true; + } + } + break; + + case GTID_EVENT: + { + DBUG_EXECUTE_IF("kill_slave_io_after_2_events", + { + mi->dbug_do_disconnect= true; + mi->dbug_event_counter= 2; + };); + + uchar gtid_flag; + + if (Gtid_log_event::peek(buf, event_len, checksum_alg, + &event_gtid.domain_id, &event_gtid.server_id, + &event_gtid.seq_no, >id_flag, + rli->relay_log.description_event_for_queue)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + got_gtid_event= true; + if (mi->using_gtid == Master_info::USE_GTID_NO) + goto default_action; + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + if (likely(!mi->gtid_event_seen)) + { + mi->gtid_event_seen= true; + /* + If we are reconnecting, and we need to skip a partial event group + already queued to the relay log before the reconnect, then we check + that we actually get the same event group (same GTID) as before, so + we do not end up with half of one group and half another. + + The only way we should be able to receive a different GTID than what + we expect is if the binlog on the master (or more likely the whole + master server) was replaced with a different one, on the same IP + address, _and_ the new master happens to have domains in a different + order so we get the GTID from a different domain first. Still, it is + best to protect against this case. + */ + if (event_gtid.domain_id != mi->last_queued_gtid.domain_id || + event_gtid.server_id != mi->last_queued_gtid.server_id || + event_gtid.seq_no != mi->last_queued_gtid.seq_no) + { + bool first; + error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH; + error_msg.append(STRING_WITH_LEN("Expected: ")); + first= true; + rpl_slave_state_tostring_helper(&error_msg, &mi->last_queued_gtid, + &first); + error_msg.append(STRING_WITH_LEN(", received: ")); + first= true; + rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first); + goto err; + } + if (global_system_variables.log_warnings > 1) + { + bool first= true; + StringBuffer<1024> gtid_text; + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + sql_print_information("Slave IO thread is reconnected to " + "receive Gtid_log_event %s. It is to skip %llu " + "already received events including the gtid one", + gtid_text.ptr(), + mi->events_queued_since_last_gtid); + } + goto default_action; + } + else + { + bool first; + StringBuffer<1024> gtid_text; + + gtid_text.append(STRING_WITH_LEN("Last received gtid: ")); + first= true; + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + gtid_text.append(STRING_WITH_LEN(", currently received: ")); + first= true; + rpl_slave_state_tostring_helper(>id_text, &event_gtid, &first); + + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + sql_print_error("Slave IO thread has received a new Gtid_log_event " + "while skipping already logged events " + "after reconnect. %s. %llu remains to be skipped. " + "The number of originally read events was %llu", + gtid_text.ptr(), + mi->gtid_reconnect_event_skip_count, + mi->events_queued_since_last_gtid); + goto err; + } + } + mi->gtid_event_seen= true; + + /* + We have successfully queued to relay log everything before this GTID, so + in case of reconnect we can start from after any previous GTID. + (Normally we would have updated gtid_current_pos earlier at the end of + the previous event group, but better leave an extra check here for + safety). + */ + if (mi->events_queued_since_last_gtid) + { + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + } + mi->last_queued_gtid= event_gtid; + mi->last_queued_gtid_standalone= + (gtid_flag & Gtid_log_event::FL_STANDALONE) != 0; + + /* Should filter all the subsequent events in the current GTID group? */ + mi->domain_id_filter.do_filter(event_gtid.domain_id); + + ++mi->events_queued_since_last_gtid; + inc_pos= event_len; + } + break; + /* + Binlog compressed event should uncompress in IO thread + */ + case QUERY_COMPRESSED_EVENT: + inc_pos= event_len; + if (query_event_uncompress(rli->relay_log.description_event_for_queue, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, event_len, new_buf_arr, sizeof(new_buf_arr), + &is_malloc, (char **)&new_buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + buf = new_buf; + is_compress_event = true; + goto default_action; + + case WRITE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + inc_pos = event_len; + { + if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, event_len, new_buf_arr, sizeof(new_buf_arr), + &is_malloc, (char **)&new_buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + } + is_compress_event = true; + buf = new_buf; + /* + As we are uncertain about compressed V2 rows events, we don't track + them + */ + if (LOG_EVENT_IS_ROW_V2((Log_event_type) buf[EVENT_TYPE_OFFSET])) + goto default_action; + /* fall through */ + case WRITE_ROWS_EVENT_V1: + case UPDATE_ROWS_EVENT_V1: + case DELETE_ROWS_EVENT_V1: + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + { + is_rows_event= true; + mi->rows_event_tracker.update(mi->master_log_name, + mi->master_log_pos, + buf, + mi->rli.relay_log. + description_event_for_queue); + + DBUG_EXECUTE_IF("simulate_stmt_end_rows_event_loss", + { + mi->rows_event_tracker.stmt_end_seen= false; + }); + } + goto default_action; + +#ifndef DBUG_OFF + case XID_EVENT: + DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000", + { + /* Inject an event group that is missing its XID commit event. */ + if (mi->last_queued_gtid.domain_id == 0 && + mi->last_queued_gtid.seq_no == 1000) + goto skip_relay_logging; + }); + goto default_action; +#endif + case START_ENCRYPTION_EVENT: + if (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F) + { + /* + If the event was not requested by the slave (the slave did not ask for + it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos + */ + inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; + break; + } + /* fall through */ + default: + default_action: + DBUG_EXECUTE_IF("kill_slave_io_after_2_events", + { + if (mi->dbug_do_disconnect && + (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]) || + ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT)) + && (--mi->dbug_event_counter == 0)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + mi->dbug_do_disconnect= false; /* Safety */ + goto err; + } + };); + + DBUG_EXECUTE_IF("kill_slave_io_before_commit", + { + if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ + Query_log_event::peek_is_commit_rollback(buf, event_len, + checksum_alg))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + };); + + if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen) + { + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + --mi->gtid_reconnect_event_skip_count; + gtid_skip_enqueue= true; + } + else if (mi->events_queued_since_last_gtid) + ++mi->events_queued_since_last_gtid; + } + + if (!is_compress_event) + inc_pos= event_len; + + break; + } + + /* + Integrity of Rows- event group check. + A sequence of Rows- events must end with STMT_END_F flagged one. + Even when Heartbeat event interrupts Rows- events flow this must indicate a + malfunction e.g logging on the master. + */ + if (((uchar) buf[EVENT_TYPE_OFFSET] != HEARTBEAT_LOG_EVENT) && + !is_rows_event && + mi->rows_event_tracker.check_and_report(mi->master_log_name, + mi->master_log_pos)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + + /* + If we filter events master-side (eg. @@skip_replication), we will see holes + in the event positions from the master. If we see such a hole, adjust + mi->master_log_pos accordingly so we maintain the correct position (for + reconnect, MASTER_POS_WAIT(), etc.) + */ + if (inc_pos > 0 && + event_len >= LOG_POS_OFFSET+4 && + (event_pos= uint4korr(buf+LOG_POS_OFFSET)) > mi->master_log_pos + inc_pos) + { + inc_pos= event_pos - mi->master_log_pos; + DBUG_PRINT("info", ("Adjust master_log_pos %llu->%llu to account for " + "master-side filtering", + mi->master_log_pos + inc_pos, event_pos)); + } + + /* + If this event is originating from this server, don't queue it. + We don't check this for 3.23 events because it's simpler like this; 3.23 + will be filtered anyway by the SQL slave thread which also tests the + server id (we must also keep this test in the SQL thread, in case somebody + upgrades a 4.0 slave which has a not-filtered relay log). + + ANY event coming from ourselves can be ignored: it is obvious for queries; + for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves + (--log-slave-updates would not log that) unless this slave is also its + direct master (an unsupported, useless setup!). + */ + + mysql_mutex_lock(log_lock); + s_id= uint4korr(buf + SERVER_ID_OFFSET); + /* + Write the event to the relay log, unless we reconnected in the middle + of an event group and now need to skip the initial part of the group that + we already wrote before reconnecting. + */ + if (unlikely(gtid_skip_enqueue)) + { + mi->master_log_pos+= inc_pos; + if ((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT && + s_id == mi->master_id) + { + /* + If we write this master's description event in the middle of an event + group due to GTID reconnect, SQL thread will think that master crashed + in the middle of the group and roll back the first half, so we must not. + + But we still have to write an artificial copy of the masters description + event, to override the initial slave-version description event so that + SQL thread has the right information for parsing the events it reads. + */ + rli->relay_log.description_event_for_queue->created= 0; + rli->relay_log.description_event_for_queue->set_artificial_event(); + if (rli->relay_log.append_no_lock + (rli->relay_log.description_event_for_queue)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + else if (mi->gtid_reconnect_event_skip_count == 0) + { + /* + Add a fake rotate event so that SQL thread can see the old-style + position where we re-connected in the middle of a GTID event group. + */ + Rotate_log_event fake_rev(mi->master_log_name, 0, mi->master_log_pos, 0); + fake_rev.server_id= mi->master_id; + if (rli->relay_log.append_no_lock(&fake_rev)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + } + else + if ((s_id == global_system_variables.server_id && + !mi->rli.replicate_same_server_id) || + event_that_should_be_ignored(buf) || + /* + the following conjunction deals with IGNORE_SERVER_IDS, if set + If the master is on the ignore list, execution of + format description log events and rotate events is necessary. + */ + (mi->ignore_server_ids.elements > 0 && + mi->shall_ignore_server_id(s_id) && + /* everything is filtered out from non-master */ + (s_id != mi->master_id || + /* for the master meta information is necessary */ + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || + + /* + Check whether it needs to be filtered based on domain_id + (DO_DOMAIN_IDS/IGNORE_DOMAIN_IDS). + */ + (mi->domain_id_filter.is_group_filtered() && + Log_event::is_group_event((Log_event_type)(uchar) + buf[EVENT_TYPE_OFFSET]))) + { + /* + Do not write it to the relay log. + a) We still want to increment mi->master_log_pos, so that we won't + re-read this event from the master if the slave IO thread is now + stopped/restarted (more efficient if the events we are ignoring are big + LOAD DATA INFILE). + b) We want to record that we are skipping events, for the information of + the slave SQL thread, otherwise that thread may let + rli->group_relay_log_pos stay too small if the last binlog's event is + ignored. + But events which were generated by this slave and which do not exist in + the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment + mi->master_log_pos. + If the event is originated remotely and is being filtered out by + IGNORE_SERVER_IDS it increments mi->master_log_pos + as well as rli->group_relay_log_pos. + */ + if (!(s_id == global_system_variables.server_id && + !mi->rli.replicate_same_server_id) || + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) + { + mi->master_log_pos+= inc_pos; + memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); + DBUG_ASSERT(rli->ign_master_log_name_end[0]); + rli->ign_master_log_pos_end= mi->master_log_pos; + if (got_gtid_event) + rli->ign_gtids.update(&event_gtid); + } + // the slave SQL thread needs to re-check + rli->relay_log.signal_relay_log_update(); + DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", + (ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET))); + } + else + { + if (likely(!rli->relay_log.write_event_buffer((uchar*)buf, event_len))) + { + mi->master_log_pos+= inc_pos; + DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + else + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + } + rli->ign_master_log_name_end[0]= 0; // last event is not ignored + if (got_gtid_event) + rli->ign_gtids.remove_if_present(&event_gtid); + if (save_buf != NULL) + buf= save_buf; + } + mysql_mutex_unlock(log_lock); + + if (likely(!error) && + mi->using_gtid != Master_info::USE_GTID_NO && + mi->events_queued_since_last_gtid > 0 && + ( (mi->last_queued_gtid_standalone && + !Log_event::is_part_of_group((Log_event_type)(uchar) + buf[EVENT_TYPE_OFFSET])) || + (!mi->last_queued_gtid_standalone && + ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || + (uchar)buf[EVENT_TYPE_OFFSET] == XA_PREPARE_LOG_EVENT || + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ + Query_log_event::peek_is_commit_rollback(buf, event_len, + checksum_alg)))))) + { + /* + The whole of the current event group is queued. So in case of + reconnect we can start from after the current GTID. + */ + if (mi->gtid_reconnect_event_skip_count) + { + bool first= true; + StringBuffer<1024> gtid_text; + + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + sql_print_error("Slave IO thread received a terminal event from " + "group %s whose retrieval was interrupted " + "with reconnect. We still had %llu events to read. " + "The number of originally read events was %llu", + gtid_text.ptr(), + mi->gtid_reconnect_event_skip_count, + mi->events_queued_since_last_gtid); + } + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + + /* Reset the domain_id_filter flag. */ + mi->domain_id_filter.reset_filter(); + } + +skip_relay_logging: + +err: + if (unlock_data_lock) + mysql_mutex_unlock(&mi->data_lock); + DBUG_PRINT("info", ("error: %d", error)); + + /* + Do not print ER_SLAVE_RELAY_LOG_WRITE_FAILURE error here, as the caller + handle_slave_io() prints it on return. + */ + if (unlikely(error) && error != ER_SLAVE_RELAY_LOG_WRITE_FAILURE) + mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error), + error_msg.ptr()); + + if (unlikely(is_malloc)) + my_free((void *)new_buf); + + DBUG_RETURN(error); +} + + +void end_relay_log_info(Relay_log_info* rli) +{ + mysql_mutex_t *log_lock; + DBUG_ENTER("end_relay_log_info"); + + rli->error_on_rli_init_info= false; + if (!rli->inited) + DBUG_VOID_RETURN; + if (rli->info_fd >= 0) + { + end_io_cache(&rli->info_file); + mysql_file_close(rli->info_fd, MYF(MY_WME)); + rli->info_fd = -1; + } + if (rli->cur_log_fd >= 0) + { + end_io_cache(&rli->cache_buf); + mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + } + rli->inited = 0; + log_lock= rli->relay_log.get_log_lock(); + mysql_mutex_lock(log_lock); + rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + mysql_mutex_unlock(log_lock); + /* + Delete the slave's temporary tables from memory. + In the future there will be other actions than this, to ensure persistance + of slave's temp tables after shutdown. + */ + rli->close_temporary_tables(); + DBUG_VOID_RETURN; +} + + +/** + Hook to detach the active VIO before closing a connection handle. + + The client API might close the connection (and associated data) + in case it encounters a unrecoverable (network) error. This hook + is called from the client code before the VIO handle is deleted + allows the thread to detach the active vio so it does not point + to freed memory. + + Other calls to THD::clear_active_vio throughout this module are + redundant due to the hook but are left in place for illustrative + purposes. +*/ + +extern "C" void slave_io_thread_detach_vio() +{ +#ifdef SIGNAL_WITH_VIO_CLOSE + THD *thd= current_thd; + if (thd && thd->slave_thread) + thd->clear_active_vio(); +#endif +} + + +/* + Try to connect until successful or slave killed + + SYNPOSIS + safe_connect() + thd Thread handler for slave + mysql MySQL connection handle + mi Replication handle + + RETURN + 0 ok + # Error +*/ + +static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi) +{ + DBUG_ENTER("safe_connect"); + + DBUG_RETURN(connect_to_master(thd, mysql, mi, 0, 0)); +} + + +/* + SYNPOSIS + connect_to_master() + + IMPLEMENTATION + Try to connect until successful or slave killed or we have retried + master_retry_count times +*/ + +static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, + bool reconnect, bool suppress_warnings) +{ + int slave_was_killed; + int last_errno= -2; // impossible error + ulong err_count=0; + my_bool my_true= 1; + DBUG_ENTER("connect_to_master"); + set_slave_max_allowed_packet(thd, mysql); +#ifndef DBUG_OFF + mi->events_till_disconnect = disconnect_slave_event_count; +#endif + ulong client_flag= CLIENT_REMEMBER_OPTIONS; + if (opt_slave_compressed_protocol) + client_flag|= CLIENT_COMPRESS; /* We will use compression */ + + mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char*) &my_true); + +#ifdef HAVE_OPENSSL + if (mi->ssl) + { + mysql_ssl_set(mysql, + mi->ssl_key[0]?mi->ssl_key:0, + mi->ssl_cert[0]?mi->ssl_cert:0, + mi->ssl_ca[0]?mi->ssl_ca:0, + mi->ssl_capath[0]?mi->ssl_capath:0, + mi->ssl_cipher[0]?mi->ssl_cipher:0); + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &mi->ssl_verify_server_cert); + mysql_options(mysql, MYSQL_OPT_SSL_CRLPATH, + mi->ssl_crlpath[0] ? mi->ssl_crlpath : 0); + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &mi->ssl_verify_server_cert); + } +#endif + + /* + If server's default charset is not supported (like utf16, utf32) as client + charset, then set client charset to 'latin1' (default client charset). + */ + if (is_supported_parser_charset(default_charset_info)) + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + else + { + sql_print_information("'%s' can not be used as client character set. " + "'%s' will be used as default client character set " + "while connecting to master.", + default_charset_info->csname, + default_client_charset_info->csname); + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, + default_client_charset_info->csname); + } + + /* This one is not strictly needed but we have it here for completeness */ + mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); + + /* Set MYSQL_PLUGIN_DIR in case master asks for an external authentication plugin */ + if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr) + mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr); + + /* we disallow empty users */ + if (mi->user[0] == 0) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + ER_THD(thd, ER_SLAVE_FATAL_ERROR), + "Invalid (empty) username when attempting to " + "connect to the master server. Connection attempt " + "terminated."); + DBUG_RETURN(1); + } + while (!(slave_was_killed = io_slave_killed(mi)) && + (reconnect ? mysql_reconnect(mysql) != 0 : + mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, client_flag) == 0)) + { + /* Don't repeat last error */ + if ((int)mysql_errno(mysql) != last_errno) + { + last_errno=mysql_errno(mysql); + suppress_warnings= 0; + mi->report(ERROR_LEVEL, last_errno, NULL, + "error %s to master '%s@%s:%d'" + " - retry-time: %d maximum-retries: %lu message: %s", + (reconnect ? "reconnecting" : "connecting"), + mi->user, mi->host, mi->port, + mi->connect_retry, master_retry_count, + mysql_error(mysql)); + } + /* + By default we try forever. The reason is that failure will trigger + master election, so if the user did not set master_retry_count we + do not want to have election triggered on the first failure to + connect + */ + if (++err_count == master_retry_count) + { + slave_was_killed=1; + if (reconnect) + change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER); + break; + } + slave_sleep(thd,mi->connect_retry,io_slave_killed, mi); + } + + if (!slave_was_killed) + { + mi->clear_error(); // clear possible left over reconnect error + if (reconnect) + { + if (!suppress_warnings && global_system_variables.log_warnings) + sql_print_information("Slave: connected to master '%s@%s:%d'," + "replication resumed in log '%s' at " + "position %llu", mi->user, mi->host, mi->port, + IO_RPL_LOG_NAME, mi->master_log_pos); + } + else + { + change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE); + general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d", + mi->user, mi->host, mi->port); + } +#ifdef SIGNAL_WITH_VIO_CLOSE + thd->set_active_vio(mysql->net.vio); +#endif + } + mysql->reconnect= 1; + DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed)); + DBUG_RETURN(slave_was_killed); +} + + +/* + safe_reconnect() + + IMPLEMENTATION + Try to connect until successful or slave killed or we have retried + master_retry_count times +*/ + +static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, + bool suppress_warnings) +{ + DBUG_ENTER("safe_reconnect"); + DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings)); +} + + +#ifdef NOT_USED +MYSQL *rpl_connect_master(MYSQL *mysql) +{ + Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO); + bool allocated= false; + my_bool my_true= 1; + THD *thd; + + if (!mi) + { + sql_print_error("'rpl_connect_master' must be called in slave I/O thread context."); + return NULL; + } + thd= mi->io_thd; + if (!mysql) + { + if(!(mysql= mysql_init(NULL))) + { + sql_print_error("rpl_connect_master: failed in mysql_init()"); + return NULL; + } + allocated= true; + } + + /* + XXX: copied from connect_to_master, this function should not + change the slave status, so we cannot use connect_to_master + directly + + TODO: make this part a seperate function to eliminate duplication + */ + mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char*) &my_true); + +#ifdef HAVE_OPENSSL + if (mi->ssl) + { + mysql_ssl_set(mysql, + mi->ssl_key[0]?mi->ssl_key:0, + mi->ssl_cert[0]?mi->ssl_cert:0, + mi->ssl_ca[0]?mi->ssl_ca:0, + mi->ssl_capath[0]?mi->ssl_capath:0, + mi->ssl_cipher[0]?mi->ssl_cipher:0); + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &mi->ssl_verify_server_cert); + } +#endif + + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + /* This one is not strictly needed but we have it here for completeness */ + mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); + + if (mi->user == NULL + || mi->user[0] == 0 + || io_slave_killed( mi) + || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + if (!io_slave_killed( mi)) + sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)", + mysql_error(mysql), mysql_errno(mysql)); + + if (allocated) + mysql_close(mysql); // this will free the object + return NULL; + } + return mysql; +} +#endif + + +/* + Called when we notice that the current "hot" log got rotated under our feet. +*/ + +static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg) +{ + DBUG_ENTER("reopen_relay_log"); + DBUG_ASSERT(rli->cur_log != &rli->cache_buf); + DBUG_ASSERT(rli->cur_log_fd == -1); + + IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf; + if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name, + errmsg)) <0) + DBUG_RETURN(0); + /* + We want to start exactly where we was before: + relay_log_pos Current log pos + pending Number of bytes already processed from the event + */ + rli->event_relay_log_pos= MY_MAX(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE); + my_b_seek(cur_log,rli->event_relay_log_pos); + DBUG_RETURN(cur_log); +} + + +/** + Reads next event from the relay log. Should be called from the + slave IO thread. + + @param rli Relay_log_info structure for the slave IO thread. + + @return The event read, or NULL on error. If an error occurs, the + error is reported through the sql_print_information() or + sql_print_error() functions. + + The size of the read event (in bytes) is returned in *event_size. +*/ +static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) +{ + Log_event* ev; + Relay_log_info *rli= rgi->rli; + IO_CACHE* cur_log = rli->cur_log; + mysql_mutex_t *log_lock = rli->relay_log.get_log_lock(); + const char* errmsg=0; + DBUG_ENTER("next_event"); + + DBUG_ASSERT(rgi->thd != 0 && rgi->thd == rli->sql_driver_thd); + *event_size= 0; + +#ifndef DBUG_OFF + if (abort_slave_event_count && !rli->events_till_abort--) + DBUG_RETURN(0); +#endif + + /* + For most operations we need to protect rli members with data_lock, + so we assume calling function acquired this mutex for us and we will + hold it for the most of the loop below However, we will release it + whenever it is worth the hassle, and in the cases when we go into a + mysql_cond_wait() with the non-data_lock mutex + */ + mysql_mutex_assert_owner(&rli->data_lock); + + while (!sql_slave_killed(rgi)) + { + /* + We can have two kinds of log reading: + hot_log: + rli->cur_log points at the IO_CACHE of relay_log, which + is actively being updated by the I/O thread. We need to be careful + in this case and make sure that we are not looking at a stale log that + has already been rotated. If it has been, we reopen the log. + + The other case is much simpler: + We just have a read only log that nobody else will be updating. + */ + ulonglong old_pos; + bool hot_log; + if ((hot_log = (cur_log != &rli->cache_buf))) + { + DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor + mysql_mutex_lock(log_lock); + + /* + Reading xxx_file_id is safe because the log will only + be rotated when we hold relay_log.LOCK_log + */ + if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count) + { + // The master has switched to a new log file; Reopen the old log file + cur_log=reopen_relay_log(rli, &errmsg); + mysql_mutex_unlock(log_lock); + if (!cur_log) // No more log files + goto err; + hot_log=0; // Using old binary log + } + } + /* + As there is no guarantee that the relay is open (for example, an I/O + error during a write by the slave I/O thread may have closed it), we + have to test it. + */ + if (!my_b_inited(cur_log)) + goto err; +#ifndef DBUG_OFF + { + /* This is an assertion which sometimes fails, let's try to track it */ + DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu rli->event_relay_log_pos=%llu", + my_b_tell(cur_log), rli->event_relay_log_pos)); + DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); + DBUG_ASSERT(rli->mi->using_parallel() || + my_b_tell(cur_log) == rli->event_relay_log_pos); + } +#endif + /* + Relay log is always in new format - if the master is 3.23, the + I/O thread will convert the format for us. + A problem: the description event may be in a previous relay log. So if + the slave has been shutdown meanwhile, we would have to look in old relay + logs, which may even have been deleted. So we need to write this + description event at the beginning of the relay log. + When the relay log is created when the I/O thread starts, easy: the + master will send the description event and we will queue it. + But if the relay log is created by new_file(): then the solution is: + MYSQL_BIN_LOG::open() will write the buffered description event. + */ + old_pos= rli->event_relay_log_pos; + if ((ev= Log_event::read_log_event(cur_log, + rli->relay_log.description_event_for_exec, + opt_slave_sql_verify_checksum))) + + { + /* + read it while we have a lock, to avoid a mutex lock in + inc_event_relay_log_pos() + */ + rli->future_event_relay_log_pos= my_b_tell(cur_log); + *event_size= rli->future_event_relay_log_pos - old_pos; + + if (hot_log) + mysql_mutex_unlock(log_lock); + rli->sql_thread_caught_up= false; + DBUG_RETURN(ev); + } + if (opt_reckless_slave) // For mysql-test + cur_log->error = 0; + if (unlikely(cur_log->error < 0)) + { + errmsg = "slave SQL thread aborted because of I/O error"; + if (hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } + if (!cur_log->error) /* EOF */ + { + /* + On a hot log, EOF means that there are no more updates to + process and we must block until I/O thread adds some and + signals us to continue + */ + if (hot_log) + { + /* + We say in Seconds_Behind_Master that we have "caught up". Note that + for example if network link is broken but I/O slave thread hasn't + noticed it (slave_net_timeout not elapsed), then we'll say "caught + up" whereas we're not really caught up. Fixing that would require + internally cutting timeout in smaller pieces in network read, no + thanks. Another example: SQL has caught up on I/O, now I/O has read + a new event and is queuing it; the false "0" will exist until SQL + finishes executing the new event; it will be look abnormal only if + the events have old timestamps (then you get "many", 0, "many"). + + Transient phases like this can be fixed with implemeting + Heartbeat event which provides the slave the status of the + master at time the master does not have any new update to send. + Seconds_Behind_Master would be zero only when master has no + more updates in binlog for slave. The heartbeat can be sent + in a (small) fraction of slave_net_timeout. Until it's done + rli->sql_thread_caught_up is temporarely (for time of waiting for + the following event) set whenever EOF is reached. + */ + rli->sql_thread_caught_up= true; + + DBUG_ASSERT(rli->relay_log.get_open_count() == + rli->cur_log_old_open_count); + + if (rli->ign_master_log_name_end[0]) + { + /* We generate and return a Rotate, to make our positions advance */ + DBUG_PRINT("info",("seeing an ignored end segment")); + ev= new Rotate_log_event(rli->ign_master_log_name_end, + 0, rli->ign_master_log_pos_end, + Rotate_log_event::DUP_NAME); + rli->ign_master_log_name_end[0]= 0; + mysql_mutex_unlock(log_lock); + if (unlikely(!ev)) + { + errmsg= "Slave SQL thread failed to create a Rotate event " + "(out of memory?), SHOW SLAVE STATUS may be inaccurate"; + goto err; + } + ev->server_id= 0; // don't be ignored by slave SQL thread + DBUG_RETURN(ev); + } + + if (rli->ign_gtids.count() && !rli->is_in_group()) + { + /* + We generate and return a Gtid_list, to update gtid_slave_pos, + unless being in the middle of a group. + */ + DBUG_PRINT("info",("seeing ignored end gtids")); + ev= new Gtid_list_log_event(&rli->ign_gtids, + Gtid_list_log_event::FLAG_IGN_GTIDS); + rli->ign_gtids.reset(); + mysql_mutex_unlock(log_lock); + if (unlikely(!ev)) + { + errmsg= "Slave SQL thread failed to create a Gtid_list event " + "(out of memory?), gtid_slave_pos may be inaccurate"; + goto err; + } + ev->server_id= 0; // don't be ignored by slave SQL thread + ev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos + DBUG_RETURN(ev); + } + + /* + We have to check sql_slave_killed() here an extra time. + Otherwise we may miss a wakeup, since last check was done + without holding LOCK_log. + */ + if (sql_slave_killed(rgi)) + { + mysql_mutex_unlock(log_lock); + break; + } + + /* + We can, and should release data_lock while we are waiting for + update. If we do not, show slave status will block + */ + mysql_mutex_unlock(&rli->data_lock); + + /* + Possible deadlock : + - the I/O thread has reached log_space_limit + - the SQL thread has read all relay logs, but cannot purge for some + reason: + * it has already purged all logs except the current one + * there are other logs than the current one but they're involved in + a transaction that finishes in the current one (or is not finished) + Solution : + Wake up the possibly waiting I/O thread, and set a boolean asking + the I/O thread to temporarily ignore the log_space_limit + constraint, because we do not want the I/O thread to block because of + space (it's ok if it blocks for any other reason (e.g. because the + master does not send anything). Then the I/O thread stops waiting + and reads one more event and starts honoring log_space_limit again. + + If the SQL thread needs more events to be able to rotate the log (it + might need to finish the current group first), then it can ask for + one more at a time. Thus we don't outgrow the relay log indefinitely, + but rather in a controlled manner, until the next rotate. + + When the SQL thread starts it sets ignore_log_space_limit to false. + We should also reset ignore_log_space_limit to 0 when the user does + RESET SLAVE, but in fact, no need as RESET SLAVE requires that the + slave be stopped, and the SQL thread sets ignore_log_space_limit + to 0 when + it stops. + */ + mysql_mutex_lock(&rli->log_space_lock); + + /* + If we have reached the limit of the relay space and we + are going to sleep, waiting for more events: + + 1. If outside a group, SQL thread asks the IO thread + to force a rotation so that the SQL thread purges + logs next time it processes an event (thus space is + freed). + + 2. If in a group, SQL thread asks the IO thread to + ignore the limit and queues yet one more event + so that the SQL thread finishes the group and + is are able to rotate and purge sometime soon. + */ + if (rli->log_space_limit && + rli->log_space_limit < rli->log_space_total) + { + /* force rotation if not in an unfinished group */ + rli->sql_force_rotate_relay= !rli->is_in_group(); + + /* ask for one more event */ + rli->ignore_log_space_limit= true; + } + + mysql_cond_broadcast(&rli->log_space_cond); + mysql_mutex_unlock(&rli->log_space_lock); + // Note that wait_for_update_relay_log unlocks lock_log ! + rli->relay_log.wait_for_update_relay_log(rli->sql_driver_thd); + // re-acquire data lock since we released it earlier + mysql_mutex_lock(&rli->data_lock); + rli->sql_thread_caught_up= false; + continue; + } + /* + If the log was not hot, we need to move to the next log in + sequence. The next log could be hot or cold, we deal with both + cases separately after doing some common initialization + */ + end_io_cache(cur_log); + DBUG_ASSERT(rli->cur_log_fd >= 0); + mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + rli->last_inuse_relaylog->completed= true; + rli->relay_log.description_event_for_exec->reset_crypto(); + + if (relay_log_purge) + { + /* + purge_first_log will properly set up relay log coordinates in rli. + If the group's coordinates are equal to the event's coordinates + (i.e. the relay log was not rotated in the middle of a group), + we can purge this relay log too. + We do ulonglong and string comparisons, this may be slow but + - purging the last relay log is nice (it can save 1GB of disk), so we + like to detect the case where we can do it, and given this, + - I see no better detection method + - purge_first_log is not called that often + */ + if (rli->relay_log.purge_first_log + (rli, + rli->group_relay_log_pos == rli->event_relay_log_pos + && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name))) + { + errmsg = "Error purging processed logs"; + goto err; + } + } + else + { + /* + If hot_log is set, then we already have a lock on + LOCK_log. If not, we have to get the lock. + + According to Sasha, the only time this code will ever be executed + is if we are recovering from a bug. + */ + if (rli->relay_log.find_next_log(&rli->linfo, !hot_log)) + { + errmsg = "error switching to the next log"; + goto err; + } + rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE; + strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name); + if (rli->flush()) + { + errmsg= "error flushing relay log"; + goto err; + } + } + /* + Now we want to open this next log. To know if it's a hot log (the one + being written by the I/O thread now) or a cold log, we can use + is_active(); if it is hot, we use the I/O cache; if it's cold we open + the file normally. But if is_active() reports that the log is hot, this + may change between the test and the consequence of the test. So we may + open the I/O cache whereas the log is now cold, which is nonsense. + To guard against this, we need to have LOCK_log. + */ + + DBUG_PRINT("info",("hot_log: %d",hot_log)); + if (!hot_log) /* if hot_log, we already have this mutex */ + mysql_mutex_lock(log_lock); + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { + rli->cur_log= cur_log= rli->relay_log.get_log_file(); + rli->cur_log_old_open_count= rli->relay_log.get_open_count(); + DBUG_ASSERT(rli->cur_log_fd == -1); + + /* + When the SQL thread is [stopped and] (re)started the + following may happen: + + 1. Log was hot at stop time and remains hot at restart + + SQL thread reads again from hot_log (SQL thread was + reading from the active log when it was stopped and the + very same log is still active on SQL thread restart). + + In this case, my_b_seek is performed on cur_log, while + cur_log points to relay_log.get_log_file(); + + 2. Log was hot at stop time but got cold before restart + + The log was hot when SQL thread stopped, but it is not + anymore when the SQL thread restarts. + + In this case, the SQL thread reopens the log, using + cache_buf, ie, cur_log points to &cache_buf, and thence + its coordinates are reset. + + 3. Log was already cold at stop time + + The log was not hot when the SQL thread stopped, and, of + course, it will not be hot when it restarts. + + In this case, the SQL thread opens the cold log again, + using cache_buf, ie, cur_log points to &cache_buf, and + thence its coordinates are reset. + + 4. Log was hot at stop time, DBA changes to previous cold + log and restarts SQL thread + + The log was hot when the SQL thread was stopped, but the + user changed the coordinates of the SQL thread to + restart from a previous cold log. + + In this case, at start time, cur_log points to a cold + log, opened using &cache_buf as cache, and coordinates + are reset. However, as it moves on to the next logs, it + will eventually reach the hot log. If the hot log is the + same at the time the SQL thread was stopped, then + coordinates were not reset - the cur_log will point to + relay_log.get_log_file(), and not a freshly opened + IO_CACHE through cache_buf. For this reason we need to + deploy a my_b_seek before calling check_binlog_magic at + this point of the code (see: BUG#55263 for more + details). + + NOTES: + - We must keep the LOCK_log to read the 4 first bytes, as + this is a hot log (same as when we call read_log_event() + above: for a hot log we take the mutex). + + - Because of scenario #4 above, we need to have a + my_b_seek here. Otherwise, we might hit the assertion + inside check_binlog_magic. + */ + + my_b_seek(cur_log, (my_off_t) 0); + if (check_binlog_magic(cur_log,&errmsg)) + { + if (!hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } + if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name)) + { + if (!hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } + if (!hot_log) + mysql_mutex_unlock(log_lock); + continue; + } + if (!hot_log) + mysql_mutex_unlock(log_lock); + /* + if we get here, the log was not hot, so we will have to open it + ourselves. We are sure that the log is still not hot now (a log can get + from hot to cold, but not from cold to hot). No need for LOCK_log. + */ + // open_binlog() will check the magic header + if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, + &errmsg)) <0) + goto err; + if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name)) + goto err; + } + else + { + /* + Read failed with a non-EOF error. + TODO: come up with something better to handle this error + */ + if (hot_log) + mysql_mutex_unlock(log_lock); + sql_print_error("Slave SQL thread: I/O error reading \ +event(errno: %d cur_log->error: %d)", + my_errno,cur_log->error); + // set read position to the beginning of the event + my_b_seek(cur_log,rli->event_relay_log_pos); + /* otherwise, we have had a partial read */ + errmsg = "Aborting slave SQL thread because of partial event read"; + break; // To end of function + } + } + if (!errmsg && global_system_variables.log_warnings) + { + sql_print_information("Error reading relay log event: %s", + "slave SQL thread was killed"); + DBUG_RETURN(0); + } + +err: + if (errmsg) + sql_print_error("Error reading relay log event: %s", errmsg); + DBUG_RETURN(0); +} +#ifdef WITH_WSREP +enum Log_event_type wsrep_peak_event(rpl_group_info *rgi, ulonglong* event_size) +{ + enum Log_event_type ev_type; + + mysql_mutex_lock(&rgi->rli->data_lock); + + unsigned long long event_pos= rgi->event_relay_log_pos; + unsigned long long orig_future_pos= rgi->future_event_relay_log_pos; + unsigned long long future_pos= rgi->future_event_relay_log_pos; + + /* scan the log to read next event and we skip + annotate events. */ + do { + my_b_seek(rgi->rli->cur_log, future_pos); + rgi->rli->event_relay_log_pos= future_pos; + rgi->event_relay_log_pos= future_pos; + Log_event* ev= next_event(rgi, event_size); + ev_type= (ev) ? ev->get_type_code() : UNKNOWN_EVENT; + delete ev; + future_pos+= *event_size; + } while (ev_type == ANNOTATE_ROWS_EVENT || ev_type == XID_EVENT); + + /* scan the log back and re-set the positions to original values */ + rgi->rli->event_relay_log_pos= event_pos; + rgi->event_relay_log_pos= event_pos; + my_b_seek(rgi->rli->cur_log, orig_future_pos); + + mysql_mutex_unlock(&rgi->rli->data_lock); + + return ev_type; +} +#endif /* WITH_WSREP */ +/* + Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation + because of size is simpler because when we do it we already have all relevant + locks; here we don't, so this function is mainly taking locks). + Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file() + is void). +*/ + +int rotate_relay_log(Master_info* mi) +{ + DBUG_ENTER("rotate_relay_log"); + Relay_log_info* rli= &mi->rli; + int error= 0; + + DBUG_EXECUTE_IF("crash_before_rotate_relaylog", DBUG_SUICIDE();); + + /* + We need to test inited because otherwise, new_file() will attempt to lock + LOCK_log, which may not be inited (if we're not a slave). + */ + if (!rli->inited) + { + DBUG_PRINT("info", ("rli->inited == 0")); + goto end; + } + + /* If the relay log is closed, new_file() will do nothing. */ + if ((error= rli->relay_log.new_file())) + goto end; + + /* + We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately + be counted, so imagine a succession of FLUSH LOGS and assume the slave + threads are started: + relay_log_space decreases by the size of the deleted relay log, but does + not increase, so flush-after-flush we may become negative, which is wrong. + Even if this will be corrected as soon as a query is replicated on the + slave (because the I/O thread will then call harvest_bytes_written() which + will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange + output in SHOW SLAVE STATUS meanwhile. So we harvest now. + If the log is closed, then this will just harvest the last writes, probably + 0 as they probably have been harvested. + + Note that it needs to be protected by mi->data_lock. + */ + mysql_mutex_assert_owner(&mi->data_lock); + rli->relay_log.harvest_bytes_written(&rli->log_space_total); +end: + DBUG_RETURN(error); +} + + +/** + Detects, based on master's version (as found in the relay log), if master + has a certain bug. + @param rli Relay_log_info which tells the master's version + @param bug_id Number of the bug as found in bugs.mysql.com + @param report bool report error message, default TRUE + + @param pred Predicate function that will be called with @c param to + check for the bug. If the function return @c true, the bug is present, + otherwise, it is not. + + @param param State passed to @c pred function. + + @return TRUE if master has the bug, FALSE if it does not. +*/ +bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report, + bool (*pred)(const void *), const void *param) +{ + struct st_version_range_for_one_bug { + uint bug_id; + Version introduced_in; // first version with bug + Version fixed_in; // first version with fix + }; + static struct st_version_range_for_one_bug versions_for_all_bugs[]= + { + {24432, { 5, 0, 24 }, { 5, 0, 38 } }, + {24432, { 5, 1, 12 }, { 5, 1, 17 } }, + {33029, { 5, 0, 0 }, { 5, 0, 58 } }, + {33029, { 5, 1, 0 }, { 5, 1, 12 } }, + {37426, { 5, 1, 0 }, { 5, 1, 26 } }, + }; + const Version &master_ver= + rli->relay_log.description_event_for_exec->server_version_split; + + for (uint i= 0; + i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++) + { + const Version &introduced_in= versions_for_all_bugs[i].introduced_in; + const Version &fixed_in= versions_for_all_bugs[i].fixed_in; + if ((versions_for_all_bugs[i].bug_id == bug_id) && + introduced_in <= master_ver && + fixed_in > master_ver && + (pred == NULL || (*pred)(param))) + { + if (!report) + return TRUE; + // a short message for SHOW SLAVE STATUS (message length constraints) + my_printf_error(ER_UNKNOWN_ERROR, "master may suffer from" + " http://bugs.mysql.com/bug.php?id=%u" + " so slave stops; check error log on slave" + " for more info", MYF(0), bug_id); + // a verbose message for the error log + rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, NULL, + "According to the master's version ('%s')," + " it is probable that master suffers from this bug:" + " http://bugs.mysql.com/bug.php?id=%u" + " and thus replicating the current binary log event" + " may make the slave's data become different from the" + " master's data." + " To take no risk, slave refuses to replicate" + " this event and stops." + " We recommend that all updates be stopped on the" + " master and slave, that the data of both be" + " manually synchronized," + " that master's binary logs be deleted," + " that master be upgraded to a version at least" + " equal to '%d.%d.%d'. Then replication can be" + " restarted.", + rli->relay_log.description_event_for_exec->server_version, + bug_id, + fixed_in[0], fixed_in[1], fixed_in[2]); + return TRUE; + } + } + return FALSE; +} + +/** + BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12 + exclusive, if one statement in a SP generated AUTO_INCREMENT value + by the top statement, all statements after it would be considered + generated AUTO_INCREMENT value by the top statement, and a + erroneous INSERT_ID value might be associated with these statement, + which could cause duplicate entry error and stop the slave. + + Detect buggy master to work around. + */ +bool rpl_master_erroneous_autoinc(THD *thd) +{ + if (thd->rgi_slave) + { + DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;); + return rpl_master_has_bug(thd->rgi_slave->rli, 33029, FALSE, NULL, NULL); + } + return FALSE; +} + + +static bool get_row_event_stmt_end(const char* buf, + const Format_description_log_event *fdle) +{ + uint8 const common_header_len= fdle->common_header_len; + Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]; + + uint8 const post_header_len= fdle->post_header_len[event_type-1]; + const char *flag_start= buf + common_header_len; + /* + The term 4 below signifies that master is of 'an intermediate source', see + Rows_log_event::Rows_log_event. + */ + flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET); + + return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0; +} + + +/* + Reset log event tracking data. +*/ + +void Rows_event_tracker::reset() +{ + binlog_file_name[0]= 0; + first_seen= last_seen= 0; + stmt_end_seen= false; +} + + +/* + Update log event tracking data. + + The first- and last- seen event binlog position get memorized, as + well as the end-of-statement status of the last one. +*/ + +void Rows_event_tracker::update(const char* file_name, my_off_t pos, + const char* buf, + const Format_description_log_event *fdle) +{ + DBUG_ENTER("Rows_event_tracker::update"); + if (!first_seen) + { + first_seen= pos; + strmake(binlog_file_name, file_name, sizeof(binlog_file_name) - 1); + } + last_seen= pos; + DBUG_ASSERT(stmt_end_seen == 0); // We can only have one + stmt_end_seen= get_row_event_stmt_end(buf, fdle); + DBUG_VOID_RETURN; +}; + + +/** + The function is called at next event reading + after a sequence of Rows- log-events. It checks the end-of-statement status + of the past sequence to report on any isssue. + In the positive case the tracker gets reset. + + @return true when the Rows- event group integrity found compromised, + false otherwise. +*/ +bool Rows_event_tracker::check_and_report(const char* file_name, + my_off_t pos) +{ + if (last_seen) + { + // there was at least one "block" event previously + if (!stmt_end_seen) + { + sql_print_error("Slave IO thread did not receive an expected " + "Rows-log end-of-statement for event starting " + "at log '%s' position %llu " + "whose last block was seen at log '%s' position %llu. " + "The end-of-statement should have been delivered " + "before the current one at log '%s' position %llu", + binlog_file_name, first_seen, + binlog_file_name, last_seen, file_name, pos); + return true; + } + reset(); + } + + return false; +} + +/** + @} (end of group Replication) +*/ + +#endif /* HAVE_REPLICATION */ |