diff options
Diffstat (limited to 'sql/semisync_master.cc')
-rw-r--r-- | sql/semisync_master.cc | 1419 |
1 files changed, 1419 insertions, 0 deletions
diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc new file mode 100644 index 00000000..670a6d8d --- /dev/null +++ b/sql/semisync_master.cc @@ -0,0 +1,1419 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (c) 2008, 2013, Oracle and/or its affiliates. + Copyright (c) 2011, 2022, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include <my_global.h> +#include "semisync_master.h" +#include <algorithm> +#include <mysql_com.h> + +#define TIME_THOUSAND 1000 +#define TIME_MILLION 1000000 +#define TIME_BILLION 1000000000 + +/* This indicates whether semi-synchronous replication is enabled. */ +my_bool rpl_semi_sync_master_enabled= 0; +unsigned long long rpl_semi_sync_master_request_ack = 0; +unsigned long long rpl_semi_sync_master_get_ack = 0; +my_bool rpl_semi_sync_master_wait_no_slave = 1; +my_bool rpl_semi_sync_master_status = 0; +ulong rpl_semi_sync_master_wait_point = + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT; +ulong rpl_semi_sync_master_timeout; +ulong rpl_semi_sync_master_trace_level; +ulong rpl_semi_sync_master_yes_transactions = 0; +ulong rpl_semi_sync_master_no_transactions = 0; +ulong rpl_semi_sync_master_off_times = 0; +ulong rpl_semi_sync_master_timefunc_fails = 0; +ulong rpl_semi_sync_master_wait_timeouts = 0; +ulong rpl_semi_sync_master_wait_sessions = 0; +ulong rpl_semi_sync_master_wait_pos_backtraverse = 0; +ulong rpl_semi_sync_master_avg_trx_wait_time = 0; +ulonglong rpl_semi_sync_master_trx_wait_num = 0; +ulong rpl_semi_sync_master_avg_net_wait_time = 0; +ulonglong rpl_semi_sync_master_net_wait_num = 0; +ulong rpl_semi_sync_master_clients = 0; +ulonglong rpl_semi_sync_master_net_wait_time = 0; +ulonglong rpl_semi_sync_master_trx_wait_time = 0; + +Repl_semi_sync_master repl_semisync_master; +Ack_receiver ack_receiver; + +/* + structure to save transaction log filename and position +*/ +typedef struct Trans_binlog_info { + my_off_t log_pos; + char log_file[FN_REFLEN]; +} Trans_binlog_info; + +static int get_wait_time(const struct timespec& start_ts); + +static ulonglong timespec_to_usec(const struct timespec *ts) +{ + return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND; +} + +/******************************************************************************* + * + * <Active_tranx> class : manage all active transaction nodes + * + ******************************************************************************/ + +Active_tranx::Active_tranx(mysql_mutex_t *lock, + ulong trace_level) + : Trace(trace_level), m_allocator(max_connections), + m_num_entries(max_connections << 1), /* Transaction hash table size + * is set to double the size + * of max_connections */ + m_lock(lock) +{ + /* No transactions are in the list initially. */ + m_trx_front = NULL; + m_trx_rear = NULL; + + /* Create the hash table to find a transaction's ending event. */ + m_trx_htb = new Tranx_node *[m_num_entries]; + for (int idx = 0; idx < m_num_entries; ++idx) + m_trx_htb[idx] = NULL; + + sql_print_information("Semi-sync replication initialized for transactions."); +} + +Active_tranx::~Active_tranx() +{ + delete [] m_trx_htb; + m_trx_htb = NULL; + m_num_entries = 0; +} + +unsigned int Active_tranx::calc_hash(const unsigned char *key, size_t length) +{ + unsigned int nr = 1, nr2 = 4; + + /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */ + while (length--) + { + nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8); + nr2 += 3; + } + return((unsigned int) nr); +} + +unsigned int Active_tranx::get_hash_value(const char *log_file_name, + my_off_t log_file_pos) +{ + unsigned int hash1 = calc_hash((const unsigned char *)log_file_name, + strlen(log_file_name)); + unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos), + sizeof(log_file_pos)); + + return (hash1 + hash2) % m_num_entries; +} + +int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1, + const char *log_file_name2, my_off_t log_file_pos2) +{ + int cmp = strcmp(log_file_name1, log_file_name2); + + if (cmp != 0) + return cmp; + + if (log_file_pos1 > log_file_pos2) + return 1; + else if (log_file_pos1 < log_file_pos2) + return -1; + return 0; +} + +int Active_tranx::insert_tranx_node(const char *log_file_name, + my_off_t log_file_pos) +{ + Tranx_node *ins_node; + int result = 0; + unsigned int hash_val; + + DBUG_ENTER("Active_tranx:insert_tranx_node"); + + ins_node = m_allocator.allocate_node(); + if (!ins_node) + { + sql_print_error("%s: transaction node allocation failed for: (%s, %lu)", + "Active_tranx:insert_tranx_node", + log_file_name, (ulong)log_file_pos); + result = -1; + goto l_end; + } + + /* insert the binlog position in the active transaction list. */ + strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1); + ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + ins_node->log_pos = log_file_pos; + + if (!m_trx_front) + { + /* The list is empty. */ + m_trx_front = m_trx_rear = ins_node; + } + else + { + int cmp = compare(ins_node, m_trx_rear); + if (cmp > 0) + { + /* Compare with the tail first. If the transaction happens later in + * binlog, then make it the new tail. + */ + m_trx_rear->next = ins_node; + m_trx_rear = ins_node; + } + else + { + /* Otherwise, it is an error because the transaction should hold the + * mysql_bin_log.LOCK_log when appending events. + */ + sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), " + "new node (%s, %lu)", "Active_tranx:insert_tranx_node", + m_trx_rear->log_name, (ulong)m_trx_rear->log_pos, + ins_node->log_name, (ulong)ins_node->log_pos); + result = -1; + goto l_end; + } + } + + hash_val = get_hash_value(ins_node->log_name, ins_node->log_pos); + ins_node->hash_next = m_trx_htb[hash_val]; + m_trx_htb[hash_val] = ins_node; + + DBUG_PRINT("semisync", ("%s: insert (%s, %lu) in entry(%u)", + "Active_tranx:insert_tranx_node", + ins_node->log_name, (ulong)ins_node->log_pos, + hash_val)); + l_end: + + DBUG_RETURN(result); +} + +bool Active_tranx::is_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos) +{ + DBUG_ENTER("Active_tranx::is_tranx_end_pos"); + + unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); + Tranx_node *entry = m_trx_htb[hash_val]; + + while (entry != NULL) + { + if (compare(entry, log_file_name, log_file_pos) == 0) + break; + + entry = entry->hash_next; + } + + DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)", + "Active_tranx::is_tranx_end_pos", + log_file_name, (ulong)log_file_pos, hash_val)); + + DBUG_RETURN(entry != NULL); +} + +void Active_tranx::clear_active_tranx_nodes(const char *log_file_name, + my_off_t log_file_pos) +{ + Tranx_node *new_front; + + DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes"); + + if (log_file_name != NULL) + { + new_front = m_trx_front; + + while (new_front) + { + if (compare(new_front, log_file_name, log_file_pos) > 0) + break; + new_front = new_front->next; + } + } + else + { + /* If log_file_name is NULL, clear everything. */ + new_front = NULL; + } + + if (new_front == NULL) + { + /* No active transaction nodes after the call. */ + + /* Clear the hash table. */ + memset(m_trx_htb, 0, m_num_entries * sizeof(Tranx_node *)); + m_allocator.free_all_nodes(); + + /* Clear the active transaction list. */ + if (m_trx_front != NULL) + { + m_trx_front = NULL; + m_trx_rear = NULL; + } + + DBUG_PRINT("semisync", ("%s: cleared all nodes", + "Active_tranx::::clear_active_tranx_nodes")); + } + else if (new_front != m_trx_front) + { + Tranx_node *curr_node, *next_node; + + /* Delete all transaction nodes before the confirmation point. */ +#ifdef DBUG_TRACE + int n_frees = 0; +#endif + curr_node = m_trx_front; + while (curr_node != new_front) + { + next_node = curr_node->next; +#ifdef DBUG_TRACE + n_frees++; +#endif + + /* Remove the node from the hash table. */ + unsigned int hash_val = get_hash_value(curr_node->log_name, curr_node->log_pos); + Tranx_node **hash_ptr = &(m_trx_htb[hash_val]); + while ((*hash_ptr) != NULL) + { + if ((*hash_ptr) == curr_node) + { + (*hash_ptr) = curr_node->hash_next; + break; + } + hash_ptr = &((*hash_ptr)->hash_next); + } + + curr_node = next_node; + } + + m_trx_front = new_front; + m_allocator.free_nodes_before(m_trx_front); + + DBUG_PRINT("semisync", ("%s: cleared %d nodes back until pos (%s, %lu)", + "Active_tranx::::clear_active_tranx_nodes", + n_frees, + m_trx_front->log_name, (ulong)m_trx_front->log_pos)); + } + + DBUG_VOID_RETURN; +} + + +/******************************************************************************* + * + * <Repl_semi_sync_master> class: the basic code layer for semisync master. + * <Repl_semi_sync_slave> class: the basic code layer for semisync slave. + * + * The most important functions during semi-syn replication listed: + * + * Master: + * . report_reply_binlog(): called by the binlog dump thread when it receives + * the slave's status information. + * . update_sync_header(): based on transaction waiting information, decide + * whether to request the slave to reply. + * . write_tranx_in_binlog(): called by the transaction thread when it finishes + * writing all transaction events in binlog. + * . commit_trx(): transaction thread wait for the slave reply. + * + * Slave: + * . slave_read_sync_header(): read the semi-sync header from the master, get + * the sync status and get the payload for events. + * . slave_reply(): reply to the master about the replication progress. + * + ******************************************************************************/ + +Repl_semi_sync_master::Repl_semi_sync_master() + : m_active_tranxs(NULL), + m_init_done(false), + m_reply_file_name_inited(false), + m_reply_file_pos(0L), + m_wait_file_name_inited(false), + m_wait_file_pos(0), + m_master_enabled(false), + m_wait_timeout(0L), + m_state(0), + m_wait_point(0) +{ + strcpy(m_reply_file_name, ""); + strcpy(m_wait_file_name, ""); +} + +int Repl_semi_sync_master::init_object() +{ + int result= 0; + + m_init_done = true; + + /* References to the parameter works after set_options(). */ + set_wait_timeout(rpl_semi_sync_master_timeout); + set_trace_level(rpl_semi_sync_master_trace_level); + set_wait_point(rpl_semi_sync_master_wait_point); + + /* Mutex initialization can only be done after MY_INIT(). */ + mysql_mutex_init(key_LOCK_rpl_semi_sync_master_enabled, + &LOCK_rpl_semi_sync_master_enabled, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_binlog, + &LOCK_binlog, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_binlog_send, + &COND_binlog_send, NULL); + + if (rpl_semi_sync_master_enabled) + { + result = enable_master(); + if (!result) + { + result= ack_receiver.start(); /* Start the ACK thread. */ + /* + If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily + switch off semisync to avoid hang if there's none active slave. + */ + if (!rpl_semi_sync_master_wait_no_slave) + switch_off(); + } + } + else + { + disable_master(); + } + + return result; +} + +int Repl_semi_sync_master::enable_master() +{ + int result = 0; + + /* Must have the lock when we do enable of disable. */ + lock(); + + if (!get_master_enabled()) + { + m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level); + if (m_active_tranxs != NULL) + { + m_commit_file_name_inited = false; + m_reply_file_name_inited = false; + m_wait_file_name_inited = false; + + set_master_enabled(true); + m_state = true; + sql_print_information("Semi-sync replication enabled on the master."); + } + else + { + sql_print_error("Cannot allocate memory to enable semi-sync on the master."); + result = -1; + } + } + + unlock(); + + return result; +} + +void Repl_semi_sync_master::disable_master() +{ + /* Must have the lock when we do enable of disable. */ + lock(); + + if (get_master_enabled()) + { + /* Switch off the semi-sync first so that waiting transaction will be + * waken up. + */ + switch_off(); + + assert(m_active_tranxs != NULL); + delete m_active_tranxs; + m_active_tranxs = NULL; + + m_reply_file_name_inited = false; + m_wait_file_name_inited = false; + m_commit_file_name_inited = false; + + set_master_enabled(false); + sql_print_information("Semi-sync replication disabled on the master."); + } + + unlock(); +} + +void Repl_semi_sync_master::cleanup() +{ + if (m_init_done) + { + mysql_mutex_destroy(&LOCK_rpl_semi_sync_master_enabled); + mysql_mutex_destroy(&LOCK_binlog); + mysql_cond_destroy(&COND_binlog_send); + m_init_done= 0; + } + + delete m_active_tranxs; +} + +int Repl_semi_sync_master::sync_get_master_wait_sessions() +{ + int wait_sessions; + lock(); + wait_sessions= rpl_semi_sync_master_wait_sessions; + unlock(); + return wait_sessions; +} + +void Repl_semi_sync_master::create_timeout(struct timespec *out, + struct timespec *start_arg) +{ + struct timespec *start_ts; + struct timespec now_ts; + if (!start_arg) + { + set_timespec(now_ts, 0); + start_ts= &now_ts; + } + else + { + start_ts= start_arg; + } + + long diff_secs= (long) (m_wait_timeout / TIME_THOUSAND); + long diff_nsecs= (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION); + long nsecs= start_ts->tv_nsec + diff_nsecs; + out->tv_sec= start_ts->tv_sec + diff_secs + nsecs / TIME_BILLION; + out->tv_nsec= nsecs % TIME_BILLION; +} + +void Repl_semi_sync_master::lock() +{ + mysql_mutex_lock(&LOCK_binlog); +} + +void Repl_semi_sync_master::unlock() +{ + mysql_mutex_unlock(&LOCK_binlog); +} + +void Repl_semi_sync_master::cond_broadcast() +{ + mysql_cond_broadcast(&COND_binlog_send); +} + +int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time) +{ + int wait_res; + + DBUG_ENTER("Repl_semi_sync_master::cond_timewait()"); + + wait_res= mysql_cond_timedwait(&COND_binlog_send, + &LOCK_binlog, wait_time); + + DBUG_RETURN(wait_res); +} + +void Repl_semi_sync_master::add_slave() +{ + lock(); + rpl_semi_sync_master_clients++; + unlock(); +} + +void Repl_semi_sync_master::remove_slave() +{ + lock(); + rpl_semi_sync_master_clients--; + + /* Only switch off if semi-sync is enabled and is on */ + if (get_master_enabled() && is_on()) + { + /* If user has chosen not to wait if no semi-sync slave available + and the last semi-sync slave exits, turn off semi-sync on master + immediately. + */ + if (!rpl_semi_sync_master_wait_no_slave && + rpl_semi_sync_master_clients == 0) + switch_off(); + } + unlock(); +} + +int Repl_semi_sync_master::report_reply_packet(uint32 server_id, + const uchar *packet, + ulong packet_len) +{ + int result= -1; + char log_file_name[FN_REFLEN+1]; + my_off_t log_file_pos; + ulong log_file_len = 0; + + DBUG_ENTER("Repl_semi_sync_master::report_reply_packet"); + + DBUG_EXECUTE_IF("semisync_corrupt_magic", + const_cast<uchar*>(packet)[REPLY_MAGIC_NUM_OFFSET]= 0;); + if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] != + Repl_semi_sync_master::k_packet_magic_num)) + { + sql_print_error("Read semi-sync reply magic number error"); + goto l_end; + } + + if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET)) + { + sql_print_error("Read semi-sync reply length error: packet is too small"); + goto l_end; + } + + log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET); + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; + if (unlikely(log_file_len >= FN_REFLEN)) + { + sql_print_error("Read semi-sync reply binlog file length too large"); + goto l_end; + } + strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len); + log_file_name[log_file_len] = 0; + + DBUG_ASSERT(dirname_length(log_file_name) == 0); + + DBUG_PRINT("semisync", ("%s: Got reply(%s, %lu) from server %u", + "Repl_semi_sync_master::report_reply_packet", + log_file_name, (ulong)log_file_pos, server_id)); + + rpl_semi_sync_master_get_ack++; + report_reply_binlog(server_id, log_file_name, log_file_pos); + result= 0; + +l_end: + if (result == -1) + { + char buf[256]; + octet2hex(buf, (const char*) packet, std::min(static_cast<ulong>(sizeof(buf)-1), + packet_len)); + sql_print_information("First bytes of the packet from semisync slave " + "server-id %d: %s", server_id, buf); + + } + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::report_reply_binlog(uint32 server_id, + const char *log_file_name, + my_off_t log_file_pos) +{ + int cmp; + bool can_release_threads = false; + bool need_copy_send_pos = true; + + DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog"); + + if (!(get_master_enabled())) + DBUG_RETURN(0); + + lock(); + + /* This is the real check inside the mutex. */ + if (!get_master_enabled()) + goto l_end; + + if (!is_on()) + /* We check to see whether we can switch semi-sync ON. */ + try_switch_on(server_id, log_file_name, log_file_pos); + + /* The position should increase monotonically, if there is only one + * thread sending the binlog to the slave. + * In reality, to improve the transaction availability, we allow multiple + * sync replication slaves. So, if any one of them get the transaction, + * the transaction session in the primary can move forward. + */ + if (m_reply_file_name_inited) + { + cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_reply_file_name, m_reply_file_pos); + + /* If the requested position is behind the sending binlog position, + * would not adjust sending binlog position. + * We based on the assumption that there are multiple semi-sync slave, + * and at least one of them shou/ld be up to date. + * If all semi-sync slaves are behind, at least initially, the primary + * can find the situation after the waiting timeout. After that, some + * slaves should catch up quickly. + */ + if (cmp < 0) + { + /* If the position is behind, do not copy it. */ + need_copy_send_pos = false; + } + } + + if (need_copy_send_pos) + { + strmake_buf(m_reply_file_name, log_file_name); + m_reply_file_pos = log_file_pos; + m_reply_file_name_inited = true; + + /* Remove all active transaction nodes before this point. */ + assert(m_active_tranxs != NULL); + m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos); + + DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)", + "Repl_semi_sync_master::report_reply_binlog", + log_file_name, (ulong)log_file_pos)); + } + + if (rpl_semi_sync_master_wait_sessions > 0) + { + /* Let us check if some of the waiting threads doing a trx + * commit can now proceed. + */ + cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, + m_wait_file_name, m_wait_file_pos); + if (cmp >= 0) + { + /* Yes, at least one waiting thread can now proceed: + * let us release all waiting threads with a broadcast + */ + can_release_threads = true; + m_wait_file_name_inited = false; + } + } + + l_end: + unlock(); + + if (can_release_threads) + { + DBUG_PRINT("semisync", ("%s: signal all waiting threads.", + "Repl_semi_sync_master::report_reply_binlog")); + + cond_broadcast(); + } + + DBUG_RETURN(0); +} + +int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos) +{ + if (!get_master_enabled()) + return 0; + + int ret= 0; + if(log_pos && + wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) + ret= commit_trx(log_file + dirname_length(log_file), log_pos); + + return ret; +} + +int Repl_semi_sync_master::wait_after_commit(THD* thd, bool all) +{ + if (!get_master_enabled()) + return 0; + + int ret= 0; + const char *log_file; + my_off_t log_pos; + + bool is_real_trans= + (all || thd->transaction->all.ha_list == 0); + /* + The coordinates are propagated to this point having been computed + in report_binlog_update + */ + Trans_binlog_info *log_info= thd->semisync_info; + log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0; + log_pos= log_info ? log_info->log_pos : 0; + + DBUG_ASSERT(!log_file || dirname_length(log_file) == 0); + + if (is_real_trans && + log_pos && + wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT) + ret= commit_trx(log_file, log_pos); + + if (is_real_trans && log_info) + { + log_info->log_file[0]= 0; + log_info->log_pos= 0; + } + + return ret; +} + +int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all) +{ + return wait_after_commit(thd, all); +} + +/** + The method runs after flush to binary log is done. +*/ +int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file, + my_off_t log_pos) +{ + if (get_master_enabled()) + { + Trans_binlog_info *log_info; + + if (!(log_info= thd->semisync_info)) + { + if(!(log_info= (Trans_binlog_info*)my_malloc(PSI_INSTRUMENT_ME, + sizeof(Trans_binlog_info), MYF(0)))) + return 1; + thd->semisync_info= log_info; + } + strcpy(log_info->log_file, log_file + dirname_length(log_file)); + log_info->log_pos = log_pos; + + return write_tranx_in_binlog(log_info->log_file, log_pos); + } + + return 0; +} + +int Repl_semi_sync_master::dump_start(THD* thd, + const char *log_file, + my_off_t log_pos) +{ + if (!thd->semi_sync_slave) + return 0; + + if (ack_receiver.add_slave(thd)) + { + sql_print_error("Failed to register slave to semi-sync ACK receiver " + "thread. Turning off semisync"); + thd->semi_sync_slave= 0; + return 1; + } + + add_slave(); + report_reply_binlog(thd->variables.server_id, + log_file + dirname_length(log_file), log_pos); + sql_print_information("Start semi-sync binlog_dump to slave " + "(server_id: %ld), pos(%s, %lu)", + (long) thd->variables.server_id, log_file, + (ulong) log_pos); + + return 0; +} + +void Repl_semi_sync_master::dump_end(THD* thd) +{ + if (!thd->semi_sync_slave) + return; + + sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %ld)", + (long) thd->variables.server_id); + + remove_slave(); + ack_receiver.remove_slave(thd); +} + +int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, + my_off_t trx_wait_binlog_pos) +{ + DBUG_ENTER("Repl_semi_sync_master::commit_trx"); + + if (get_master_enabled() && trx_wait_binlog_name) + { + struct timespec start_ts; + struct timespec abstime; + int wait_result; + PSI_stage_info old_stage; + THD *thd= current_thd; + + set_timespec(start_ts, 0); + + DEBUG_SYNC(thd, "rpl_semisync_master_commit_trx_before_lock"); + /* Acquire the mutex. */ + lock(); + + /* This must be called after acquired the lock */ + THD_ENTER_COND(thd, &COND_binlog_send, &LOCK_binlog, + & stage_waiting_for_semi_sync_ack_from_slave, + & old_stage); + + /* This is the real check inside the mutex. */ + if (!get_master_enabled() || !is_on()) + goto l_end; + + DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)", + "Repl_semi_sync_master::commit_trx", + trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, + (int)is_on())); + + while (is_on() && !thd_killed(thd)) + { + if (m_reply_file_name_inited) + { + int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, + trx_wait_binlog_name, + trx_wait_binlog_pos); + if (cmp >= 0) + { + /* We have already sent the relevant binlog to the slave: no need to + * wait here. + */ + DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),", + "Repl_semi_sync_master::commit_trx", + m_reply_file_name, + (ulong)m_reply_file_pos)); + break; + } + } + + /* Let us update the info about the minimum binlog position of waiting + * threads. + */ + if (m_wait_file_name_inited) + { + int cmp = Active_tranx::compare(trx_wait_binlog_name, + trx_wait_binlog_pos, + m_wait_file_name, m_wait_file_pos); + if (cmp <= 0) + { + /* This thd has a lower position, let's update the minimum info. */ + strmake_buf(m_wait_file_name, trx_wait_binlog_name); + m_wait_file_pos = trx_wait_binlog_pos; + + rpl_semi_sync_master_wait_pos_backtraverse++; + DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),", + "Repl_semi_sync_master::commit_trx", + m_wait_file_name, (ulong)m_wait_file_pos)); + } + } + else + { + strmake_buf(m_wait_file_name, trx_wait_binlog_name); + m_wait_file_pos = trx_wait_binlog_pos; + m_wait_file_name_inited = true; + + DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),", + "Repl_semi_sync_master::commit_trx", + m_wait_file_name, (ulong)m_wait_file_pos)); + } + + /* In semi-synchronous replication, we wait until the binlog-dump + * thread has received the reply on the relevant binlog segment from the + * replication slave. + * + * Let us suspend this thread to wait on the condition; + * when replication has progressed far enough, we will release + * these waiting threads. + */ + rpl_semi_sync_master_wait_sessions++; + + /* We keep track of when this thread is awaiting an ack to ensure it is + * not killed while awaiting an ACK if a shutdown is issued. + */ + set_thd_awaiting_semisync_ack(thd, TRUE); + + DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)", + "Repl_semi_sync_master::commit_trx", + m_wait_timeout, + m_wait_file_name, (ulong)m_wait_file_pos)); + + create_timeout(&abstime, &start_ts); + wait_result = cond_timewait(&abstime); + + set_thd_awaiting_semisync_ack(thd, FALSE); + rpl_semi_sync_master_wait_sessions--; + + if (wait_result != 0) + { + /* This is a real wait timeout. */ + sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), " + "semi-sync up to file %s, position %lu.", + trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, + m_reply_file_name, (ulong)m_reply_file_pos); + rpl_semi_sync_master_wait_timeouts++; + + /* switch semi-sync off */ + switch_off(); + } + else + { + int wait_time; + + wait_time = get_wait_time(start_ts); + if (wait_time < 0) + { + DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at " + "wait position (%s, %lu)", + trx_wait_binlog_name, + (ulong)trx_wait_binlog_pos)); + rpl_semi_sync_master_timefunc_fails++; + } + else + { + rpl_semi_sync_master_trx_wait_num++; + rpl_semi_sync_master_trx_wait_time += wait_time; + } + } + } + + /* + At this point, the binlog file and position of this transaction + must have been removed from Active_tranx. + m_active_tranxs may be NULL if someone disabled semi sync during + cond_timewait() + */ + assert(thd_killed(thd) || !m_active_tranxs || + !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name, + trx_wait_binlog_pos)); + + l_end: + /* Update the status counter. */ + if (is_on()) + rpl_semi_sync_master_yes_transactions++; + else + rpl_semi_sync_master_no_transactions++; + + /* The lock held will be released by thd_exit_cond, so no need to + call unlock() here */ + THD_EXIT_COND(thd, &old_stage); + } + + DBUG_RETURN(0); +} + +/* Indicate that semi-sync replication is OFF now. + * + * What should we do when it is disabled? The problem is that we want + * the semi-sync replication enabled again when the slave catches up + * later. But, it is not that easy to detect that the slave has caught + * up. This is caused by the fact that MySQL's replication protocol is + * asynchronous, meaning that if the master does not use the semi-sync + * protocol, the slave would not send anything to the master. + * Still, if the master is sending (N+1)-th event, we assume that it is + * an indicator that the slave has received N-th event and earlier ones. + * + * If semi-sync is disabled, all transactions still update the wait + * position with the last position in binlog. But no transactions will + * wait for confirmations and the active transaction list would not be + * maintained. In binlog dump thread, update_sync_header() checks whether + * the current sending event catches up with last wait position. If it + * does match, semi-sync will be switched on again. + */ +void Repl_semi_sync_master::switch_off() +{ + DBUG_ENTER("Repl_semi_sync_master::switch_off"); + + m_state = false; + + /* Clear the active transaction list. */ + assert(m_active_tranxs != NULL); + m_active_tranxs->clear_active_tranx_nodes(NULL, 0); + + rpl_semi_sync_master_off_times++; + m_wait_file_name_inited = false; + m_reply_file_name_inited = false; + sql_print_information("Semi-sync replication switched OFF."); + cond_broadcast(); /* wake up all waiting threads */ + + DBUG_VOID_RETURN; +} + +int Repl_semi_sync_master::try_switch_on(int server_id, + const char *log_file_name, + my_off_t log_file_pos) +{ + bool semi_sync_on = false; + + DBUG_ENTER("Repl_semi_sync_master::try_switch_on"); + + /* If the current sending event's position is larger than or equal to the + * 'largest' commit transaction binlog position, the slave is already + * catching up now and we can switch semi-sync on here. + * If m_commit_file_name_inited indicates there are no recent transactions, + * we can enable semi-sync immediately. + */ + if (m_commit_file_name_inited) + { + int cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_commit_file_name, m_commit_file_pos); + semi_sync_on = (cmp >= 0); + } + else + { + semi_sync_on = true; + } + + if (semi_sync_on) + { + /* Switch semi-sync replication on. */ + m_state = true; + + sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) " + "at (%s, %lu)", + server_id, log_file_name, + (ulong)log_file_pos); + } + + DBUG_RETURN(0); +} + +int Repl_semi_sync_master::reserve_sync_header(String* packet) +{ + DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header"); + + /* Set the magic number and the sync status. By default, no sync + * is required. + */ + packet->append(reinterpret_cast<const char*>(k_sync_header), + sizeof(k_sync_header)); + DBUG_RETURN(0); +} + +int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet, + const char *log_file_name, + my_off_t log_file_pos, + bool* need_sync) +{ + int cmp = 0; + bool sync = false; + + DBUG_ENTER("Repl_semi_sync_master::update_sync_header"); + + /* If the semi-sync master is not enabled, or the slave is not a semi-sync + * target, do not request replies from the slave. + */ + if (!get_master_enabled() || !thd->semi_sync_slave) + { + *need_sync = false; + DBUG_RETURN(0); + } + + lock(); + + /* This is the real check inside the mutex. */ + if (!get_master_enabled()) + { + assert(sync == false); + goto l_end; + } + + if (is_on()) + { + /* semi-sync is ON */ + sync = false; /* No sync unless a transaction is involved. */ + + if (m_reply_file_name_inited) + { + cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_reply_file_name, m_reply_file_pos); + if (cmp <= 0) + { + /* If we have already got the reply for the event, then we do + * not need to sync the transaction again. + */ + goto l_end; + } + } + + if (m_wait_file_name_inited) + { + cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_wait_file_name, m_wait_file_pos); + } + else + { + cmp = 1; + } + + /* If we are already waiting for some transaction replies which + * are later in binlog, do not wait for this one event. + */ + if (cmp >= 0) + { + /* + * We only wait if the event is a transaction's ending event. + */ + assert(m_active_tranxs != NULL); + sync = m_active_tranxs->is_tranx_end_pos(log_file_name, + log_file_pos); + } + } + else + { + if (m_commit_file_name_inited) + { + int cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_commit_file_name, m_commit_file_pos); + sync = (cmp >= 0); + } + else + { + sync = true; + } + } + + DBUG_PRINT("semisync", ("%s: server(%lu), (%s, %lu) sync(%d), repl(%d)", + "Repl_semi_sync_master::update_sync_header", + thd->variables.server_id, log_file_name, + (ulong)log_file_pos, sync, (int)is_on())); + *need_sync= sync; + + l_end: + unlock(); + + /* We do not need to clear sync flag because we set it to 0 when we + * reserve the packet header. + */ + if (sync) + { + (packet)[2] = k_packet_flag_sync; + } + + DBUG_RETURN(0); +} + +int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name, + my_off_t log_file_pos) +{ + int result = 0; + + DBUG_ENTER("Repl_semi_sync_master::write_tranx_in_binlog"); + + lock(); + + /* This is the real check inside the mutex. */ + if (!get_master_enabled()) + goto l_end; + + /* Update the 'largest' transaction commit position seen so far even + * though semi-sync is switched off. + * It is much better that we update m_commit_file* here, instead of + * inside commit_trx(). This is mostly because update_sync_header() + * will watch for m_commit_file* to decide whether to switch semi-sync + * on. The detailed reason is explained in function update_sync_header(). + */ + if (m_commit_file_name_inited) + { + int cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_commit_file_name, m_commit_file_pos); + if (cmp > 0) + { + /* This is a larger position, let's update the maximum info. */ + strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1); + m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + m_commit_file_pos = log_file_pos; + } + } + else + { + strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1); + m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + m_commit_file_pos = log_file_pos; + m_commit_file_name_inited = true; + } + + if (is_on()) + { + assert(m_active_tranxs != NULL); + if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos)) + { + /* + if insert tranx_node failed, print a warning message + and turn off semi-sync + */ + sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu", + log_file_name, (ulong)log_file_pos); + switch_off(); + } + else + { + rpl_semi_sync_master_request_ack++; + } + } + + l_end: + unlock(); + + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::flush_net(THD *thd, + const char *event_buf) +{ + int result = -1; + NET* net= &thd->net; + + DBUG_ENTER("Repl_semi_sync_master::flush_net"); + + assert((unsigned char)event_buf[1] == k_packet_magic_num); + if ((unsigned char)event_buf[2] != k_packet_flag_sync) + { + /* current event does not require reply */ + result = 0; + goto l_end; + } + + /* We flush to make sure that the current event is sent to the network, + * instead of being buffered in the TCP/IP stack. + */ + if (net_flush(net)) + { + sql_print_error("Semi-sync master failed on net_flush() " + "before waiting for slave reply"); + goto l_end; + } + + net_clear(net, 0); + net->pkt_nr++; + net->compress_pkt_nr++; + result = 0; + rpl_semi_sync_master_net_wait_num++; + + l_end: + thd->clear_error(); + + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::after_reset_master() +{ + int result = 0; + + DBUG_ENTER("Repl_semi_sync_master::after_reset_master"); + + if (rpl_semi_sync_master_enabled) + { + sql_print_information("Enable Semi-sync Master after reset master"); + enable_master(); + } + + lock(); + + if (rpl_semi_sync_master_clients == 0 && + !rpl_semi_sync_master_wait_no_slave) + m_state = 0; + else + m_state = get_master_enabled()? 1 : 0; + + m_wait_file_name_inited = false; + m_reply_file_name_inited = false; + m_commit_file_name_inited = false; + + rpl_semi_sync_master_yes_transactions = 0; + rpl_semi_sync_master_no_transactions = 0; + rpl_semi_sync_master_off_times = 0; + rpl_semi_sync_master_timefunc_fails = 0; + rpl_semi_sync_master_wait_sessions = 0; + rpl_semi_sync_master_wait_pos_backtraverse = 0; + rpl_semi_sync_master_trx_wait_num = 0; + rpl_semi_sync_master_trx_wait_time = 0; + rpl_semi_sync_master_net_wait_num = 0; + rpl_semi_sync_master_net_wait_time = 0; + + unlock(); + + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::before_reset_master() +{ + int result = 0; + + DBUG_ENTER("Repl_semi_sync_master::before_reset_master"); + + if (rpl_semi_sync_master_enabled) + disable_master(); + + DBUG_RETURN(result); +} + +void Repl_semi_sync_master::check_and_switch() +{ + lock(); + if (get_master_enabled() && is_on()) + { + if (!rpl_semi_sync_master_wait_no_slave + && rpl_semi_sync_master_clients == 0) + switch_off(); + } + unlock(); +} + +void Repl_semi_sync_master::set_export_stats() +{ + lock(); + + rpl_semi_sync_master_status = m_state; + rpl_semi_sync_master_avg_trx_wait_time= + ((rpl_semi_sync_master_trx_wait_num) ? + (ulong)((double)rpl_semi_sync_master_trx_wait_time / + ((double)rpl_semi_sync_master_trx_wait_num)) : 0); + rpl_semi_sync_master_avg_net_wait_time= + ((rpl_semi_sync_master_net_wait_num) ? + (ulong)((double)rpl_semi_sync_master_net_wait_time / + ((double)rpl_semi_sync_master_net_wait_num)) : 0); + + unlock(); +} + +void Repl_semi_sync_master::await_slave_reply() +{ + struct timespec abstime; + + DBUG_ENTER("Repl_semi_sync_master::::await_slave_reply"); + lock(); + + /* Just return if there is nothing to wait for */ + if (!rpl_semi_sync_master_wait_sessions) + goto end; + + create_timeout(&abstime, NULL); + cond_timewait(&abstime); + +end: + unlock(); + DBUG_VOID_RETURN; +} + +/* Get the waiting time given the wait's staring time. + * + * Return: + * >= 0: the waiting time in microsecons(us) + * < 0: error in get time or time back traverse + */ +static int get_wait_time(const struct timespec& start_ts) +{ + ulonglong start_usecs, end_usecs; + struct timespec end_ts; + + /* Starting time in microseconds(us). */ + start_usecs = timespec_to_usec(&start_ts); + + /* Get the wait time interval. */ + set_timespec(end_ts, 0); + + /* Ending time in microseconds(us). */ + end_usecs = timespec_to_usec(&end_ts); + + if (end_usecs < start_usecs) + return -1; + + return (int)(end_usecs - start_usecs); +} + +void semi_sync_master_deinit() +{ + repl_semisync_master.cleanup(); + ack_receiver.cleanup(); +} |