summaryrefslogtreecommitdiffstats
path: root/sql/semisync_master.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sql/semisync_master.cc175
1 files changed, 85 insertions, 90 deletions
diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc
index 670a6d8d..9f30a820 100644
--- a/sql/semisync_master.cc
+++ b/sql/semisync_master.cc
@@ -91,7 +91,9 @@ Active_tranx::Active_tranx(mysql_mutex_t *lock,
for (int idx = 0; idx < m_num_entries; ++idx)
m_trx_htb[idx] = NULL;
+#ifdef EXTRA_DEBUG
sql_print_information("Semi-sync replication initialized for transactions.");
+#endif
}
Active_tranx::~Active_tranx()
@@ -352,8 +354,7 @@ Repl_semi_sync_master::Repl_semi_sync_master()
m_state(0),
m_wait_point(0)
{
- strcpy(m_reply_file_name, "");
- strcpy(m_wait_file_name, "");
+ m_reply_file_name[0]= m_wait_file_name[0]= 0;
}
int Repl_semi_sync_master::init_object()
@@ -379,20 +380,10 @@ int Repl_semi_sync_master::init_object()
{
result = enable_master();
if (!result)
- {
result= ack_receiver.start(); /* Start the ACK thread. */
- /*
- If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily
- switch off semisync to avoid hang if there's none active slave.
- */
- if (!rpl_semi_sync_master_wait_no_slave)
- switch_off();
- }
}
else
- {
disable_master();
- }
return result;
}
@@ -441,7 +432,7 @@ void Repl_semi_sync_master::disable_master()
*/
switch_off();
- assert(m_active_tranxs != NULL);
+ DBUG_ASSERT(m_active_tranxs != NULL);
delete m_active_tranxs;
m_active_tranxs = NULL;
@@ -450,7 +441,6 @@ void Repl_semi_sync_master::disable_master()
m_commit_file_name_inited = false;
set_master_enabled(false);
- sql_print_information("Semi-sync replication disabled on the master.");
}
unlock();
@@ -537,31 +527,34 @@ void Repl_semi_sync_master::add_slave()
void Repl_semi_sync_master::remove_slave()
{
lock();
- rpl_semi_sync_master_clients--;
-
- /* Only switch off if semi-sync is enabled and is on */
- if (get_master_enabled() && is_on())
+ if (!(--rpl_semi_sync_master_clients) && !rpl_semi_sync_master_wait_no_slave)
{
- /* If user has chosen not to wait if no semi-sync slave available
- and the last semi-sync slave exits, turn off semi-sync on master
- immediately.
- */
- if (!rpl_semi_sync_master_wait_no_slave &&
- rpl_semi_sync_master_clients == 0)
- switch_off();
+ /*
+ Signal transactions waiting in commit_trx() that they do not have to
+ wait anymore.
+ */
+ cond_broadcast();
}
unlock();
}
+
+/*
+ Check report package
+
+ @retval 0 ok
+ @retval 1 Error
+ @retval -1 Slave is going down (ok)
+*/
+
int Repl_semi_sync_master::report_reply_packet(uint32 server_id,
const uchar *packet,
ulong packet_len)
{
- int result= -1;
+ int result= 1; // Assume error
char log_file_name[FN_REFLEN+1];
my_off_t log_file_pos;
ulong log_file_len = 0;
-
DBUG_ENTER("Repl_semi_sync_master::report_reply_packet");
DBUG_EXECUTE_IF("semisync_corrupt_magic",
@@ -569,7 +562,14 @@ int Repl_semi_sync_master::report_reply_packet(uint32 server_id,
if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] !=
Repl_semi_sync_master::k_packet_magic_num))
{
- sql_print_error("Read semi-sync reply magic number error");
+ if (packet[0] == COM_QUIT && packet_len == 1)
+ {
+ /* Slave sent COM_QUIT as part of IO thread going down */
+ sql_print_information("slave IO thread has stopped");
+ DBUG_RETURN(-1);
+ }
+ else
+ sql_print_error("Read semi-sync reply magic number error");
goto l_end;
}
@@ -597,14 +597,13 @@ int Repl_semi_sync_master::report_reply_packet(uint32 server_id,
rpl_semi_sync_master_get_ack++;
report_reply_binlog(server_id, log_file_name, log_file_pos);
- result= 0;
+ DBUG_RETURN(0);
l_end:
- if (result == -1)
{
char buf[256];
- octet2hex(buf, (const char*) packet, std::min(static_cast<ulong>(sizeof(buf)-1),
- packet_len));
+ octet2hex(buf, (const char*) packet,
+ MY_MIN(sizeof(buf)-1, (size_t) packet_len));
sql_print_information("First bytes of the packet from semisync slave "
"server-id %d: %s", server_id, buf);
@@ -668,7 +667,7 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
m_reply_file_name_inited = true;
/* Remove all active transaction nodes before this point. */
- assert(m_active_tranxs != NULL);
+ DBUG_ASSERT(m_active_tranxs != NULL);
m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos);
DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
@@ -809,6 +808,8 @@ int Repl_semi_sync_master::dump_start(THD* thd,
(long) thd->variables.server_id, log_file,
(ulong) log_pos);
+ /* Mark that semi-sync net->pkt_nr is not reliable */
+ thd->net.pkt_nr_can_be_reset= 1;
return 0;
}
@@ -827,8 +828,15 @@ void Repl_semi_sync_master::dump_end(THD* thd)
int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos)
{
+ bool success= 0;
DBUG_ENTER("Repl_semi_sync_master::commit_trx");
+ if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave)
+ {
+ rpl_semi_sync_master_no_transactions++;
+ DBUG_RETURN(0);
+ }
+
if (get_master_enabled() && trx_wait_binlog_name)
{
struct timespec start_ts;
@@ -836,7 +844,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
int wait_result;
PSI_stage_info old_stage;
THD *thd= current_thd;
-
+ bool aborted= 0;
set_timespec(start_ts, 0);
DEBUG_SYNC(thd, "rpl_semisync_master_commit_trx_before_lock");
@@ -859,6 +867,13 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
while (is_on() && !thd_killed(thd))
{
+ /* We have to check these again as things may have changed */
+ if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave)
+ {
+ aborted= 1;
+ break;
+ }
+
if (m_reply_file_name_inited)
{
int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
@@ -873,6 +888,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
"Repl_semi_sync_master::commit_trx",
m_reply_file_name,
(ulong)m_reply_file_pos));
+ success= 1;
break;
}
}
@@ -973,13 +989,13 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
m_active_tranxs may be NULL if someone disabled semi sync during
cond_timewait()
*/
- assert(thd_killed(thd) || !m_active_tranxs ||
- !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
- trx_wait_binlog_pos));
+ DBUG_ASSERT(thd_killed(thd) || !m_active_tranxs || aborted ||
+ !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
+ trx_wait_binlog_pos));
l_end:
/* Update the status counter. */
- if (is_on())
+ if (success)
rpl_semi_sync_master_yes_transactions++;
else
rpl_semi_sync_master_no_transactions++;
@@ -1014,18 +1030,20 @@ void Repl_semi_sync_master::switch_off()
{
DBUG_ENTER("Repl_semi_sync_master::switch_off");
- m_state = false;
-
- /* Clear the active transaction list. */
- assert(m_active_tranxs != NULL);
- m_active_tranxs->clear_active_tranx_nodes(NULL, 0);
+ if (m_state)
+ {
+ m_state = false;
- rpl_semi_sync_master_off_times++;
- m_wait_file_name_inited = false;
- m_reply_file_name_inited = false;
- sql_print_information("Semi-sync replication switched OFF.");
- cond_broadcast(); /* wake up all waiting threads */
+ /* Clear the active transaction list. */
+ DBUG_ASSERT(m_active_tranxs != NULL);
+ m_active_tranxs->clear_active_tranx_nodes(NULL, 0);
+ rpl_semi_sync_master_off_times++;
+ m_wait_file_name_inited = false;
+ m_reply_file_name_inited = false;
+ sql_print_information("Semi-sync replication switched OFF.");
+ }
+ cond_broadcast(); /* wake up all waiting threads */
DBUG_VOID_RETURN;
}
@@ -1072,9 +1090,10 @@ int Repl_semi_sync_master::reserve_sync_header(String* packet)
{
DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header");
- /* Set the magic number and the sync status. By default, no sync
- * is required.
- */
+ /*
+ Set the magic number and the sync status. By default, no sync
+ is required.
+ */
packet->append(reinterpret_cast<const char*>(k_sync_header),
sizeof(k_sync_header));
DBUG_RETURN(0);
@@ -1087,7 +1106,6 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
{
int cmp = 0;
bool sync = false;
-
DBUG_ENTER("Repl_semi_sync_master::update_sync_header");
/* If the semi-sync master is not enabled, or the slave is not a semi-sync
@@ -1103,16 +1121,11 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
/* This is the real check inside the mutex. */
if (!get_master_enabled())
- {
- assert(sync == false);
goto l_end;
- }
if (is_on())
{
/* semi-sync is ON */
- sync = false; /* No sync unless a transaction is involved. */
-
if (m_reply_file_name_inited)
{
cmp = Active_tranx::compare(log_file_name, log_file_pos,
@@ -1126,15 +1139,10 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
}
}
+ cmp = 1;
if (m_wait_file_name_inited)
- {
cmp = Active_tranx::compare(log_file_name, log_file_pos,
m_wait_file_name, m_wait_file_pos);
- }
- else
- {
- cmp = 1;
- }
/* If we are already waiting for some transaction replies which
* are later in binlog, do not wait for this one event.
@@ -1144,7 +1152,7 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
/*
* We only wait if the event is a transaction's ending event.
*/
- assert(m_active_tranxs != NULL);
+ DBUG_ASSERT(m_active_tranxs != NULL);
sync = m_active_tranxs->is_tranx_end_pos(log_file_name,
log_file_pos);
}
@@ -1172,13 +1180,12 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
l_end:
unlock();
- /* We do not need to clear sync flag because we set it to 0 when we
- * reserve the packet header.
- */
+ /*
+ We do not need to clear sync flag in packet because we set it to 0 when we
+ reserve the packet header.
+ */
if (sync)
- {
- (packet)[2] = k_packet_flag_sync;
- }
+ packet[2]= k_packet_flag_sync;
DBUG_RETURN(0);
}
@@ -1225,7 +1232,7 @@ int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
if (is_on())
{
- assert(m_active_tranxs != NULL);
+ DBUG_ASSERT(m_active_tranxs != NULL);
if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos))
{
/*
@@ -1256,7 +1263,7 @@ int Repl_semi_sync_master::flush_net(THD *thd,
DBUG_ENTER("Repl_semi_sync_master::flush_net");
- assert((unsigned char)event_buf[1] == k_packet_magic_num);
+ DBUG_ASSERT((unsigned char)event_buf[1] == k_packet_magic_num);
if ((unsigned char)event_buf[2] != k_packet_flag_sync)
{
/* current event does not require reply */
@@ -1274,6 +1281,11 @@ int Repl_semi_sync_master::flush_net(THD *thd,
goto l_end;
}
+ /*
+ We have to do a net_clear() as with semi-sync the slave_reply's are
+ interleaved with data from the master and then the net->pkt_nr
+ cannot be kept in sync. Better to start pkt_nr from 0 again.
+ */
net_clear(net, 0);
net->pkt_nr++;
net->compress_pkt_nr++;
@@ -1300,11 +1312,7 @@ int Repl_semi_sync_master::after_reset_master()
lock();
- if (rpl_semi_sync_master_clients == 0 &&
- !rpl_semi_sync_master_wait_no_slave)
- m_state = 0;
- else
- m_state = get_master_enabled()? 1 : 0;
+ m_state = get_master_enabled() ? 1 : 0;
m_wait_file_name_inited = false;
m_reply_file_name_inited = false;
@@ -1338,18 +1346,6 @@ int Repl_semi_sync_master::before_reset_master()
DBUG_RETURN(result);
}
-void Repl_semi_sync_master::check_and_switch()
-{
- lock();
- if (get_master_enabled() && is_on())
- {
- if (!rpl_semi_sync_master_wait_no_slave
- && rpl_semi_sync_master_clients == 0)
- switch_off();
- }
- unlock();
-}
-
void Repl_semi_sync_master::set_export_stats()
{
lock();
@@ -1363,7 +1359,6 @@ void Repl_semi_sync_master::set_export_stats()
((rpl_semi_sync_master_net_wait_num) ?
(ulong)((double)rpl_semi_sync_master_net_wait_time /
((double)rpl_semi_sync_master_net_wait_num)) : 0);
-
unlock();
}