summaryrefslogtreecommitdiffstats
path: root/sql/semisync_master_ack_receiver.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 13:39:13 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 13:39:13 +0000
commit86fbb58c3ac0865482819c10a3e81f2eea001c36 (patch)
tree28c9e526ea739c6f9b89e36115e1e2698bddf981 /sql/semisync_master_ack_receiver.cc
parentReleasing progress-linux version 1:10.11.6-2~progress7.99u1. (diff)
downloadmariadb-86fbb58c3ac0865482819c10a3e81f2eea001c36.tar.xz
mariadb-86fbb58c3ac0865482819c10a3e81f2eea001c36.zip
Merging upstream version 1:10.11.7.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/semisync_master_ack_receiver.cc')
-rw-r--r--sql/semisync_master_ack_receiver.cc130
1 files changed, 104 insertions, 26 deletions
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc
index 559f939c..a311599c 100644
--- a/sql/semisync_master_ack_receiver.cc
+++ b/sql/semisync_master_ack_receiver.cc
@@ -24,7 +24,8 @@ extern PSI_cond_key key_COND_ack_receiver;
#ifdef HAVE_PSI_THREAD_INTERFACE
extern PSI_thread_key key_thread_ack_receiver;
#endif
-extern Repl_semi_sync_master repl_semisync;
+
+my_socket global_ack_signal_fd= -1;
/* Callback function of ack receive thread */
pthread_handler_t ack_receive_handler(void *arg)
@@ -45,6 +46,7 @@ Ack_receiver::Ack_receiver()
m_status= ST_DOWN;
mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, NULL);
mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL);
+ mysql_cond_init(key_COND_ack_receiver, &m_cond_reply, NULL);
m_pid= 0;
DBUG_VOID_RETURN;
@@ -57,6 +59,7 @@ void Ack_receiver::cleanup()
stop();
mysql_mutex_destroy(&m_mutex);
mysql_cond_destroy(&m_cond);
+ mysql_cond_destroy(&m_cond_reply);
DBUG_VOID_RETURN;
}
@@ -104,6 +107,7 @@ void Ack_receiver::stop()
if (m_status == ST_UP)
{
m_status= ST_STOPPING;
+ signal_listener(); // Signal listener thread to stop
mysql_cond_broadcast(&m_cond);
while (m_status == ST_STOPPING)
@@ -118,6 +122,21 @@ void Ack_receiver::stop()
DBUG_VOID_RETURN;
}
+#ifndef DBUG_OFF
+void static dbug_verify_no_duplicate_slaves(Slave_ilist *m_slaves, THD *thd)
+{
+ I_List_iterator<Slave> it(*m_slaves);
+ Slave *slave;
+ while ((slave= it++))
+ {
+ DBUG_ASSERT(slave->thd->variables.server_id != thd->variables.server_id);
+ }
+}
+#else
+#define dbug_verify_no_duplicate_slaves(A,B) do {} while(0)
+#endif
+
+
bool Ack_receiver::add_slave(THD *thd)
{
Slave *slave;
@@ -126,17 +145,23 @@ bool Ack_receiver::add_slave(THD *thd)
if (!(slave= new Slave))
DBUG_RETURN(true);
+ slave->active= 0;
slave->thd= thd;
slave->vio= *thd->net.vio;
slave->vio.mysql_socket.m_psi= NULL;
slave->vio.read_timeout= 1;
mysql_mutex_lock(&m_mutex);
+
+ dbug_verify_no_duplicate_slaves(&m_slaves, thd);
+
m_slaves.push_back(slave);
m_slaves_changed= true;
mysql_cond_broadcast(&m_cond);
mysql_mutex_unlock(&m_mutex);
+ signal_listener(); // Inform listener that there are new slaves
+
DBUG_RETURN(false);
}
@@ -144,6 +169,7 @@ void Ack_receiver::remove_slave(THD *thd)
{
I_List_iterator<Slave> it(m_slaves);
Slave *slave;
+ bool slaves_changed= 0;
DBUG_ENTER("Ack_receiver::remove_slave");
mysql_mutex_lock(&m_mutex);
@@ -153,10 +179,23 @@ void Ack_receiver::remove_slave(THD *thd)
if (slave->thd == thd)
{
delete slave;
- m_slaves_changed= true;
+ slaves_changed= true;
break;
}
}
+ if (slaves_changed)
+ {
+ m_slaves_changed= true;
+ mysql_cond_broadcast(&m_cond);
+ /*
+ Wait until Ack_receiver::run() acknowledges remove of slave
+ As this is only sent under the mutex and after listners has
+ been collected, we know that listener has ignored the found
+ slave.
+ */
+ if (m_status != ST_DOWN)
+ mysql_cond_wait(&m_cond_reply, &m_mutex);
+ }
mysql_mutex_unlock(&m_mutex);
DBUG_VOID_RETURN;
@@ -167,10 +206,15 @@ inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
(void)MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
}
-inline void Ack_receiver::wait_for_slave_connection()
+void Ack_receiver::wait_for_slave_connection(THD *thd)
{
- set_stage_info(stage_waiting_for_semi_sync_slave);
- mysql_cond_wait(&m_cond, &m_mutex);
+ thd->enter_cond(&m_cond, &m_mutex, &stage_waiting_for_semi_sync_slave,
+ 0, __func__, __FILE__, __LINE__);
+
+ while (m_status == ST_UP && m_slaves.is_empty())
+ mysql_cond_wait(&m_cond, &m_mutex);
+
+ thd->exit_cond(0, __func__, __FILE__, __LINE__);
}
/* Auxilary function to initialize a NET object with given net buffer. */
@@ -188,17 +232,23 @@ void Ack_receiver::run()
THD *thd= new THD(next_thread_id());
NET net;
unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
+ DBUG_ENTER("Ack_receiver::run");
my_thread_init();
- DBUG_ENTER("Ack_receiver::run");
-
#ifdef HAVE_POLL
Poll_socket_listener listener(m_slaves);
#else
Select_socket_listener listener(m_slaves);
#endif //HAVE_POLL
+ if (listener.got_error())
+ {
+ sql_print_error("Got error %M starting ack receiver thread",
+ listener.got_error());
+ return;
+ }
+
sql_print_information("Starting ack receiver thread");
thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
thd->thread_stack= (char*) &thd;
@@ -207,64 +257,79 @@ void Ack_receiver::run()
thd->set_command(COM_DAEMON);
init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
- mysql_mutex_lock(&m_mutex);
+ /*
+ Mark that we have to setup the listener. Note that only this functions can
+ set m_slaves_changed to false
+ */
m_slaves_changed= true;
- mysql_mutex_unlock(&m_mutex);
while (1)
{
- int ret;
- uint slave_count __attribute__((unused))= 0;
+ int ret, slave_count= 0;
Slave *slave;
mysql_mutex_lock(&m_mutex);
- if (unlikely(m_status == ST_STOPPING))
+ if (unlikely(m_status != ST_UP))
goto end;
- set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
if (unlikely(m_slaves_changed))
{
if (unlikely(m_slaves.is_empty()))
{
- wait_for_slave_connection();
- mysql_mutex_unlock(&m_mutex);
+ m_slaves_changed= false;
+ mysql_cond_broadcast(&m_cond_reply); // Signal remove_slave
+ wait_for_slave_connection(thd);
+ /* Wait for slave unlocks m_mutex */
continue;
}
+ set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
if ((slave_count= listener.init_slave_sockets()) == 0)
+ {
+ mysql_mutex_unlock(&m_mutex);
+ m_slaves_changed= true;
+ continue; // Retry
+ }
+ if (slave_count < 0)
goto end;
m_slaves_changed= false;
+ mysql_cond_broadcast(&m_cond_reply); // Signal remove_slave
+ }
+
#ifdef HAVE_POLL
DBUG_PRINT("info", ("fd count %u", slave_count));
#else
DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,
(int) listener.get_max_fd()));
#endif
- }
+ mysql_mutex_unlock(&m_mutex);
ret= listener.listen_on_sockets();
+
if (ret <= 0)
{
- mysql_mutex_unlock(&m_mutex);
ret= DBUG_IF("rpl_semisync_simulate_select_error") ? -1 : ret;
if (ret == -1 && errno != EINTR)
sql_print_information("Failed to wait on semi-sync sockets, "
"error: errno=%d", socket_errno);
- /* Sleep 1us, so other threads can catch the m_mutex easily. */
- my_sleep(1);
continue;
}
+ listener.clear_signal();
+ mysql_mutex_lock(&m_mutex);
set_stage_info(stage_reading_semi_sync_ack);
Slave_ilist_iterator it(m_slaves);
while ((slave= it++))
{
- if (listener.is_socket_active(slave))
+ if (slave->active &&
+ ((slave->vio.read_pos < slave->vio.read_end) ||
+ listener.is_socket_active(slave)))
{
ulong len;
+ /* Semi-sync packets will always be sent with pkt_nr == 1 */
net_clear(&net, 0);
net.vio= &slave->vio;
/*
@@ -275,29 +340,42 @@ void Ack_receiver::run()
len= my_net_read(&net);
if (likely(len != packet_error))
- repl_semisync_master.report_reply_packet(slave->server_id(),
- net.read_pos, len);
- else
{
- if (net.last_errno == ER_NET_READ_ERROR)
+ int res;
+ res= repl_semisync_master.report_reply_packet(slave->server_id(),
+ net.read_pos, len);
+ if (unlikely(res < 0))
{
- listener.clear_socket_info(slave);
+ /*
+ Slave has sent COM_QUIT or other failure.
+ Delete it from listener
+ */
+ it.remove();
+ m_slaves_changed= true;
}
+ }
+ else if (net.last_errno == ER_NET_READ_ERROR)
+ {
if (net.last_errno > 0 && global_system_variables.log_warnings > 2)
sql_print_warning("Semisync ack receiver got error %d \"%s\" "
"from slave server-id %d",
net.last_errno, ER_DEFAULT(net.last_errno),
slave->server_id());
+ it.remove();
+ m_slaves_changed= true;
}
}
}
mysql_mutex_unlock(&m_mutex);
}
+
end:
sql_print_information("Stopping ack receiver thread");
m_status= ST_DOWN;
- delete thd;
mysql_cond_broadcast(&m_cond);
+ mysql_cond_broadcast(&m_cond_reply);
mysql_mutex_unlock(&m_mutex);
+
+ delete thd;
DBUG_VOID_RETURN;
}