summaryrefslogtreecommitdiffstats
path: root/sql/semisync_master.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-18 13:22:53 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-18 13:22:53 +0000
commit347c164c35eddab388009470e6848cb361ac93f8 (patch)
tree2c0c44eac690f510bb0a35b2a13b36d606b77b6b /sql/semisync_master.cc
parentReleasing progress-linux version 1:10.11.7-4~progress7.99u1. (diff)
downloadmariadb-347c164c35eddab388009470e6848cb361ac93f8.tar.xz
mariadb-347c164c35eddab388009470e6848cb361ac93f8.zip
Merging upstream version 1:10.11.8.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/semisync_master.cc')
-rw-r--r--sql/semisync_master.cc291
1 files changed, 181 insertions, 110 deletions
diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc
index 9f30a820..aa1056c8 100644
--- a/sql/semisync_master.cc
+++ b/sql/semisync_master.cc
@@ -68,6 +68,19 @@ static ulonglong timespec_to_usec(const struct timespec *ts)
return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
}
+int signal_waiting_transaction(THD *waiting_thd, const char *binlog_file,
+ my_off_t binlog_pos)
+{
+ /*
+ It is possible that the connection thd waiting for an ACK was killed. In
+ such circumstance, the connection thread will nullify the thd member of its
+ Active_tranx node. So before we try to signal, ensure the THD exists.
+ */
+ if (waiting_thd)
+ mysql_cond_signal(&waiting_thd->COND_wakeup_ready);
+ return 0;
+}
+
/*******************************************************************************
*
* <Active_tranx> class : manage all active transaction nodes
@@ -75,12 +88,14 @@ static ulonglong timespec_to_usec(const struct timespec *ts)
******************************************************************************/
Active_tranx::Active_tranx(mysql_mutex_t *lock,
+ mysql_cond_t *cond,
ulong trace_level)
: Trace(trace_level), m_allocator(max_connections),
m_num_entries(max_connections << 1), /* Transaction hash table size
* is set to double the size
* of max_connections */
- m_lock(lock)
+ m_lock(lock),
+ m_cond_empty(cond)
{
/* No transactions are in the list initially. */
m_trx_front = NULL;
@@ -142,7 +157,8 @@ int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
return 0;
}
-int Active_tranx::insert_tranx_node(const char *log_file_name,
+int Active_tranx::insert_tranx_node(THD *thd_to_wait,
+ const char *log_file_name,
my_off_t log_file_pos)
{
Tranx_node *ins_node;
@@ -165,6 +181,7 @@ int Active_tranx::insert_tranx_node(const char *log_file_name,
strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1);
ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
ins_node->log_pos = log_file_pos;
+ ins_node->thd= thd_to_wait;
if (!m_trx_front)
{
@@ -232,28 +249,22 @@ bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
DBUG_RETURN(entry != NULL);
}
-void Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
- my_off_t log_file_pos)
+void Active_tranx::clear_active_tranx_nodes(
+ const char *log_file_name, my_off_t log_file_pos,
+ active_tranx_action pre_delete_hook)
{
Tranx_node *new_front;
DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes");
- if (log_file_name != NULL)
- {
- new_front = m_trx_front;
-
- while (new_front)
- {
- if (compare(new_front, log_file_name, log_file_pos) > 0)
- break;
- new_front = new_front->next;
- }
- }
- else
+ new_front= m_trx_front;
+ while (new_front)
{
- /* If log_file_name is NULL, clear everything. */
- new_front = NULL;
+ if ((log_file_name != NULL) &&
+ compare(new_front, log_file_name, log_file_pos) > 0)
+ break;
+ pre_delete_hook(new_front->thd, new_front->log_name, new_front->log_pos);
+ new_front = new_front->next;
}
if (new_front == NULL)
@@ -315,9 +326,66 @@ void Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
m_trx_front->log_name, (ulong)m_trx_front->log_pos));
}
+ /*
+ m_cond_empty aliases Repl_semi_sync_master::COND_binlog, which holds the
+ condition variable to notify that we have cleared all nodes, e.g. used by
+ SHUTDOWN WAIT FOR ALL SLAVES.
+ */
+ if (is_empty())
+ mysql_cond_signal(m_cond_empty);
+
DBUG_VOID_RETURN;
}
+void Active_tranx::unlink_thd_as_waiter(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ DBUG_ENTER("Active_tranx::unlink_thd_as_waiter");
+ mysql_mutex_assert_owner(m_lock);
+
+ unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
+ Tranx_node *entry = m_trx_htb[hash_val];
+
+ while (entry != NULL)
+ {
+ if (compare(entry, log_file_name, log_file_pos) == 0)
+ break;
+
+ entry = entry->hash_next;
+ }
+
+ if (entry)
+ entry->thd= NULL;
+
+ DBUG_VOID_RETURN;
+}
+
+#ifndef DBUG_OFF
+void Active_tranx::assert_thd_is_waiter(THD *thd_to_check,
+ const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ DBUG_ENTER("Active_tranx::assert_thd_is_waiter");
+ mysql_mutex_assert_owner(m_lock);
+
+ unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
+ Tranx_node *entry = m_trx_htb[hash_val];
+
+ while (entry != NULL)
+ {
+ if (compare(entry, log_file_name, log_file_pos) == 0)
+ break;
+
+ entry = entry->hash_next;
+ }
+
+ DBUG_ASSERT(entry);
+ DBUG_ASSERT(entry->thd);
+ DBUG_ASSERT(entry->thd->thread_id == thd_to_check->thread_id);
+
+ DBUG_VOID_RETURN;
+}
+#endif
/*******************************************************************************
*
@@ -397,7 +465,8 @@ int Repl_semi_sync_master::enable_master()
if (!get_master_enabled())
{
- m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level);
+ m_active_tranxs=
+ new Active_tranx(&LOCK_binlog, &COND_binlog_send, m_trace_level);
if (m_active_tranxs != NULL)
{
m_commit_file_name_inited = false;
@@ -459,15 +528,6 @@ void Repl_semi_sync_master::cleanup()
delete m_active_tranxs;
}
-int Repl_semi_sync_master::sync_get_master_wait_sessions()
-{
- int wait_sessions;
- lock();
- wait_sessions= rpl_semi_sync_master_wait_sessions;
- unlock();
- return wait_sessions;
-}
-
void Repl_semi_sync_master::create_timeout(struct timespec *out,
struct timespec *start_arg)
{
@@ -500,23 +560,6 @@ void Repl_semi_sync_master::unlock()
mysql_mutex_unlock(&LOCK_binlog);
}
-void Repl_semi_sync_master::cond_broadcast()
-{
- mysql_cond_broadcast(&COND_binlog_send);
-}
-
-int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time)
-{
- int wait_res;
-
- DBUG_ENTER("Repl_semi_sync_master::cond_timewait()");
-
- wait_res= mysql_cond_timedwait(&COND_binlog_send,
- &LOCK_binlog, wait_time);
-
- DBUG_RETURN(wait_res);
-}
-
void Repl_semi_sync_master::add_slave()
{
lock();
@@ -533,7 +576,8 @@ void Repl_semi_sync_master::remove_slave()
Signal transactions waiting in commit_trx() that they do not have to
wait anymore.
*/
- cond_broadcast();
+ m_active_tranxs->clear_active_tranx_nodes(NULL, 0,
+ signal_waiting_transaction);
}
unlock();
}
@@ -616,7 +660,6 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
my_off_t log_file_pos)
{
int cmp;
- bool can_release_threads = false;
bool need_copy_send_pos = true;
DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog");
@@ -668,45 +711,26 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
/* Remove all active transaction nodes before this point. */
DBUG_ASSERT(m_active_tranxs != NULL);
- m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos);
+ m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos,
+ signal_waiting_transaction);
+ if (m_active_tranxs->is_empty())
+ m_wait_file_name_inited= false;
DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
"Repl_semi_sync_master::report_reply_binlog",
log_file_name, (ulong)log_file_pos));
}
- if (rpl_semi_sync_master_wait_sessions > 0)
- {
- /* Let us check if some of the waiting threads doing a trx
- * commit can now proceed.
- */
- cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
- m_wait_file_name, m_wait_file_pos);
- if (cmp >= 0)
- {
- /* Yes, at least one waiting thread can now proceed:
- * let us release all waiting threads with a broadcast
- */
- can_release_threads = true;
- m_wait_file_name_inited = false;
- }
- }
l_end:
unlock();
- if (can_release_threads)
- {
- DBUG_PRINT("semisync", ("%s: signal all waiting threads.",
- "Repl_semi_sync_master::report_reply_binlog"));
-
- cond_broadcast();
- }
DBUG_RETURN(0);
}
-int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos)
+int Repl_semi_sync_master::wait_after_sync(const char *log_file,
+ my_off_t log_pos)
{
if (!get_master_enabled())
return 0;
@@ -762,24 +786,27 @@ int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all)
/**
The method runs after flush to binary log is done.
*/
-int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file,
+int Repl_semi_sync_master::report_binlog_update(THD *trans_thd,
+ THD *waiter_thd,
+ const char *log_file,
my_off_t log_pos)
{
if (get_master_enabled())
{
Trans_binlog_info *log_info;
- if (!(log_info= thd->semisync_info))
+ if (!(log_info= trans_thd->semisync_info))
{
if(!(log_info= (Trans_binlog_info*)my_malloc(PSI_INSTRUMENT_ME,
sizeof(Trans_binlog_info), MYF(0))))
return 1;
- thd->semisync_info= log_info;
+ trans_thd->semisync_info= log_info;
}
strcpy(log_info->log_file, log_file + dirname_length(log_file));
log_info->log_pos = log_pos;
- return write_tranx_in_binlog(log_info->log_file, log_pos);
+ return write_tranx_in_binlog(waiter_thd, log_info->log_file,
+ log_pos);
}
return 0;
@@ -825,7 +852,7 @@ void Repl_semi_sync_master::dump_end(THD* thd)
ack_receiver.remove_slave(thd);
}
-int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
+int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos)
{
bool success= 0;
@@ -844,7 +871,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
int wait_result;
PSI_stage_info old_stage;
THD *thd= current_thd;
- bool aborted= 0;
+ bool aborted __attribute__((unused)) = 0;
set_timespec(start_ts, 0);
DEBUG_SYNC(thd, "rpl_semisync_master_commit_trx_before_lock");
@@ -852,9 +879,8 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
lock();
/* This must be called after acquired the lock */
- THD_ENTER_COND(thd, &COND_binlog_send, &LOCK_binlog,
- & stage_waiting_for_semi_sync_ack_from_slave,
- & old_stage);
+ THD_ENTER_COND(thd, &thd->COND_wakeup_ready, &LOCK_binlog,
+ &stage_waiting_for_semi_sync_ack_from_slave, &old_stage);
/* This is the real check inside the mutex. */
if (!get_master_enabled() || !is_on())
@@ -865,7 +891,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
(int)is_on()));
- while (is_on() && !thd_killed(thd))
+ while (is_on() && !(aborted= thd_killed(thd)))
{
/* We have to check these again as things may have changed */
if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave)
@@ -902,7 +928,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
trx_wait_binlog_pos,
m_wait_file_name, m_wait_file_pos);
if (cmp <= 0)
- {
+ {
/* This thd has a lower position, let's update the minimum info. */
strmake_buf(m_wait_file_name, trx_wait_binlog_name);
m_wait_file_pos = trx_wait_binlog_pos;
@@ -934,20 +960,18 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
*/
rpl_semi_sync_master_wait_sessions++;
- /* We keep track of when this thread is awaiting an ack to ensure it is
- * not killed while awaiting an ACK if a shutdown is issued.
- */
- set_thd_awaiting_semisync_ack(thd, TRUE);
-
DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)",
"Repl_semi_sync_master::commit_trx",
m_wait_timeout,
m_wait_file_name, (ulong)m_wait_file_pos));
+#ifndef DBUG_OFF
+ m_active_tranxs->assert_thd_is_waiter(thd, trx_wait_binlog_name,
+ trx_wait_binlog_pos);
+#endif
create_timeout(&abstime, &start_ts);
- wait_result = cond_timewait(&abstime);
-
- set_thd_awaiting_semisync_ack(thd, FALSE);
+ wait_result= mysql_cond_timedwait(&thd->COND_wakeup_ready, &LOCK_binlog,
+ &abstime);
rpl_semi_sync_master_wait_sessions--;
if (wait_result != 0)
@@ -979,17 +1003,49 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
{
rpl_semi_sync_master_trx_wait_num++;
rpl_semi_sync_master_trx_wait_time += wait_time;
+
+ DBUG_EXECUTE_IF("testing_cond_var_per_thd", {
+ /*
+ DBUG log warning to ensure we have either recieved our ACK; or
+ have timed out and are awoken in an off state. Test
+ rpl.rpl_semi_sync_cond_var_per_thd scans the logs to ensure this
+ warning is not present.
+ */
+ bool valid_wakeup=
+ (!get_master_enabled() || !is_on() || thd->is_killed() ||
+ 0 <= Active_tranx::compare(
+ m_reply_file_name, m_reply_file_pos,
+ trx_wait_binlog_name, trx_wait_binlog_pos));
+ if (!valid_wakeup)
+ {
+ sql_print_warning(
+ "Thread awaiting semi-sync ACK was awoken before its "
+ "ACK. THD (%llu), Wait coord: (%s, %llu), ACK coord: (%s, "
+ "%llu)",
+ thd->thread_id, trx_wait_binlog_name, trx_wait_binlog_pos,
+ m_reply_file_name, m_reply_file_pos);
+ }
+ });
}
}
}
/*
+ If our THD was killed (rather than awoken from an ACK) notify the
+ Active_tranx cache that we are no longer waiting for the ACK, so nobody
+ signals our COND var invalidly.
+ */
+ if (aborted)
+ m_active_tranxs->unlink_thd_as_waiter(trx_wait_binlog_name,
+ trx_wait_binlog_pos);
+
+ /*
At this point, the binlog file and position of this transaction
must have been removed from Active_tranx.
m_active_tranxs may be NULL if someone disabled semi sync during
- cond_timewait()
+ mysql_cond_timedwait
*/
- DBUG_ASSERT(thd_killed(thd) || !m_active_tranxs || aborted ||
+ DBUG_ASSERT(aborted || !m_active_tranxs || m_active_tranxs->is_empty() ||
!m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
trx_wait_binlog_pos));
@@ -1030,20 +1086,21 @@ void Repl_semi_sync_master::switch_off()
{
DBUG_ENTER("Repl_semi_sync_master::switch_off");
+ /* Clear the active transaction list. */
+ if (m_active_tranxs)
+ m_active_tranxs->clear_active_tranx_nodes(NULL, 0,
+ signal_waiting_transaction);
+
if (m_state)
{
m_state = false;
- /* Clear the active transaction list. */
- DBUG_ASSERT(m_active_tranxs != NULL);
- m_active_tranxs->clear_active_tranx_nodes(NULL, 0);
rpl_semi_sync_master_off_times++;
m_wait_file_name_inited = false;
m_reply_file_name_inited = false;
sql_print_information("Semi-sync replication switched OFF.");
}
- cond_broadcast(); /* wake up all waiting threads */
DBUG_VOID_RETURN;
}
@@ -1190,7 +1247,8 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
DBUG_RETURN(0);
}
-int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
+int Repl_semi_sync_master::write_tranx_in_binlog(THD *thd,
+ const char *log_file_name,
my_off_t log_file_pos)
{
int result = 0;
@@ -1233,7 +1291,7 @@ int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
if (is_on())
{
DBUG_ASSERT(m_active_tranxs != NULL);
- if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos))
+ if(m_active_tranxs->insert_tranx_node(thd, log_file_name, log_file_pos))
{
/*
if insert tranx_node failed, print a warning message
@@ -1362,21 +1420,34 @@ void Repl_semi_sync_master::set_export_stats()
unlock();
}
-void Repl_semi_sync_master::await_slave_reply()
+void Repl_semi_sync_master::await_all_slave_replies(const char *msg)
{
- struct timespec abstime;
+ struct timespec timeout;
+ int wait_result= 0;
+ bool first= true;
+ DBUG_ENTER("Repl_semi_sync_master::::await_all_slave_replies");
- DBUG_ENTER("Repl_semi_sync_master::::await_slave_reply");
- lock();
-
- /* Just return if there is nothing to wait for */
- if (!rpl_semi_sync_master_wait_sessions)
- goto end;
+ /*
+ Wait for all transactions that need ACKS to have received them; or timeout.
+ If it is a timeout, the connection thread should attempt to turn off
+ semi-sync and broadcast to all other waiting threads to move on.
- create_timeout(&abstime, NULL);
- cond_timewait(&abstime);
+ COND_binlog_send is only signalled after the Active_tranx cache has been
+ emptied.
+ */
+ create_timeout(&timeout, NULL);
+ lock();
+ while (get_master_enabled() && is_on() && !m_active_tranxs->is_empty() && !wait_result)
+ {
+ if (msg && first)
+ {
+ first= false;
+ sql_print_information(msg);
+ }
-end:
+ wait_result=
+ mysql_cond_timedwait(&COND_binlog_send, &LOCK_binlog, &timeout);
+ }
unlock();
DBUG_VOID_RETURN;
}