diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 13:39:13 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 13:39:13 +0000 |
commit | 86fbb58c3ac0865482819c10a3e81f2eea001c36 (patch) | |
tree | 28c9e526ea739c6f9b89e36115e1e2698bddf981 /sql/semisync_master_ack_receiver.cc | |
parent | Releasing progress-linux version 1:10.11.6-2~progress7.99u1. (diff) | |
download | mariadb-86fbb58c3ac0865482819c10a3e81f2eea001c36.tar.xz mariadb-86fbb58c3ac0865482819c10a3e81f2eea001c36.zip |
Merging upstream version 1:10.11.7.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/semisync_master_ack_receiver.cc')
-rw-r--r-- | sql/semisync_master_ack_receiver.cc | 130 |
1 files changed, 104 insertions, 26 deletions
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc index 559f939c..a311599c 100644 --- a/sql/semisync_master_ack_receiver.cc +++ b/sql/semisync_master_ack_receiver.cc @@ -24,7 +24,8 @@ extern PSI_cond_key key_COND_ack_receiver; #ifdef HAVE_PSI_THREAD_INTERFACE extern PSI_thread_key key_thread_ack_receiver; #endif -extern Repl_semi_sync_master repl_semisync; + +my_socket global_ack_signal_fd= -1; /* Callback function of ack receive thread */ pthread_handler_t ack_receive_handler(void *arg) @@ -45,6 +46,7 @@ Ack_receiver::Ack_receiver() m_status= ST_DOWN; mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, NULL); mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL); + mysql_cond_init(key_COND_ack_receiver, &m_cond_reply, NULL); m_pid= 0; DBUG_VOID_RETURN; @@ -57,6 +59,7 @@ void Ack_receiver::cleanup() stop(); mysql_mutex_destroy(&m_mutex); mysql_cond_destroy(&m_cond); + mysql_cond_destroy(&m_cond_reply); DBUG_VOID_RETURN; } @@ -104,6 +107,7 @@ void Ack_receiver::stop() if (m_status == ST_UP) { m_status= ST_STOPPING; + signal_listener(); // Signal listener thread to stop mysql_cond_broadcast(&m_cond); while (m_status == ST_STOPPING) @@ -118,6 +122,21 @@ void Ack_receiver::stop() DBUG_VOID_RETURN; } +#ifndef DBUG_OFF +void static dbug_verify_no_duplicate_slaves(Slave_ilist *m_slaves, THD *thd) +{ + I_List_iterator<Slave> it(*m_slaves); + Slave *slave; + while ((slave= it++)) + { + DBUG_ASSERT(slave->thd->variables.server_id != thd->variables.server_id); + } +} +#else +#define dbug_verify_no_duplicate_slaves(A,B) do {} while(0) +#endif + + bool Ack_receiver::add_slave(THD *thd) { Slave *slave; @@ -126,17 +145,23 @@ bool Ack_receiver::add_slave(THD *thd) if (!(slave= new Slave)) DBUG_RETURN(true); + slave->active= 0; slave->thd= thd; slave->vio= *thd->net.vio; slave->vio.mysql_socket.m_psi= NULL; slave->vio.read_timeout= 1; mysql_mutex_lock(&m_mutex); + + dbug_verify_no_duplicate_slaves(&m_slaves, thd); + m_slaves.push_back(slave); m_slaves_changed= true; mysql_cond_broadcast(&m_cond); mysql_mutex_unlock(&m_mutex); + signal_listener(); // Inform listener that there are new slaves + DBUG_RETURN(false); } @@ -144,6 +169,7 @@ void Ack_receiver::remove_slave(THD *thd) { I_List_iterator<Slave> it(m_slaves); Slave *slave; + bool slaves_changed= 0; DBUG_ENTER("Ack_receiver::remove_slave"); mysql_mutex_lock(&m_mutex); @@ -153,10 +179,23 @@ void Ack_receiver::remove_slave(THD *thd) if (slave->thd == thd) { delete slave; - m_slaves_changed= true; + slaves_changed= true; break; } } + if (slaves_changed) + { + m_slaves_changed= true; + mysql_cond_broadcast(&m_cond); + /* + Wait until Ack_receiver::run() acknowledges remove of slave + As this is only sent under the mutex and after listners has + been collected, we know that listener has ignored the found + slave. + */ + if (m_status != ST_DOWN) + mysql_cond_wait(&m_cond_reply, &m_mutex); + } mysql_mutex_unlock(&m_mutex); DBUG_VOID_RETURN; @@ -167,10 +206,15 @@ inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage) (void)MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__); } -inline void Ack_receiver::wait_for_slave_connection() +void Ack_receiver::wait_for_slave_connection(THD *thd) { - set_stage_info(stage_waiting_for_semi_sync_slave); - mysql_cond_wait(&m_cond, &m_mutex); + thd->enter_cond(&m_cond, &m_mutex, &stage_waiting_for_semi_sync_slave, + 0, __func__, __FILE__, __LINE__); + + while (m_status == ST_UP && m_slaves.is_empty()) + mysql_cond_wait(&m_cond, &m_mutex); + + thd->exit_cond(0, __func__, __FILE__, __LINE__); } /* Auxilary function to initialize a NET object with given net buffer. */ @@ -188,17 +232,23 @@ void Ack_receiver::run() THD *thd= new THD(next_thread_id()); NET net; unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; + DBUG_ENTER("Ack_receiver::run"); my_thread_init(); - DBUG_ENTER("Ack_receiver::run"); - #ifdef HAVE_POLL Poll_socket_listener listener(m_slaves); #else Select_socket_listener listener(m_slaves); #endif //HAVE_POLL + if (listener.got_error()) + { + sql_print_error("Got error %M starting ack receiver thread", + listener.got_error()); + return; + } + sql_print_information("Starting ack receiver thread"); thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; thd->thread_stack= (char*) &thd; @@ -207,64 +257,79 @@ void Ack_receiver::run() thd->set_command(COM_DAEMON); init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); - mysql_mutex_lock(&m_mutex); + /* + Mark that we have to setup the listener. Note that only this functions can + set m_slaves_changed to false + */ m_slaves_changed= true; - mysql_mutex_unlock(&m_mutex); while (1) { - int ret; - uint slave_count __attribute__((unused))= 0; + int ret, slave_count= 0; Slave *slave; mysql_mutex_lock(&m_mutex); - if (unlikely(m_status == ST_STOPPING)) + if (unlikely(m_status != ST_UP)) goto end; - set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); if (unlikely(m_slaves_changed)) { if (unlikely(m_slaves.is_empty())) { - wait_for_slave_connection(); - mysql_mutex_unlock(&m_mutex); + m_slaves_changed= false; + mysql_cond_broadcast(&m_cond_reply); // Signal remove_slave + wait_for_slave_connection(thd); + /* Wait for slave unlocks m_mutex */ continue; } + set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); if ((slave_count= listener.init_slave_sockets()) == 0) + { + mysql_mutex_unlock(&m_mutex); + m_slaves_changed= true; + continue; // Retry + } + if (slave_count < 0) goto end; m_slaves_changed= false; + mysql_cond_broadcast(&m_cond_reply); // Signal remove_slave + } + #ifdef HAVE_POLL DBUG_PRINT("info", ("fd count %u", slave_count)); #else DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, (int) listener.get_max_fd())); #endif - } + mysql_mutex_unlock(&m_mutex); ret= listener.listen_on_sockets(); + if (ret <= 0) { - mysql_mutex_unlock(&m_mutex); ret= DBUG_IF("rpl_semisync_simulate_select_error") ? -1 : ret; if (ret == -1 && errno != EINTR) sql_print_information("Failed to wait on semi-sync sockets, " "error: errno=%d", socket_errno); - /* Sleep 1us, so other threads can catch the m_mutex easily. */ - my_sleep(1); continue; } + listener.clear_signal(); + mysql_mutex_lock(&m_mutex); set_stage_info(stage_reading_semi_sync_ack); Slave_ilist_iterator it(m_slaves); while ((slave= it++)) { - if (listener.is_socket_active(slave)) + if (slave->active && + ((slave->vio.read_pos < slave->vio.read_end) || + listener.is_socket_active(slave))) { ulong len; + /* Semi-sync packets will always be sent with pkt_nr == 1 */ net_clear(&net, 0); net.vio= &slave->vio; /* @@ -275,29 +340,42 @@ void Ack_receiver::run() len= my_net_read(&net); if (likely(len != packet_error)) - repl_semisync_master.report_reply_packet(slave->server_id(), - net.read_pos, len); - else { - if (net.last_errno == ER_NET_READ_ERROR) + int res; + res= repl_semisync_master.report_reply_packet(slave->server_id(), + net.read_pos, len); + if (unlikely(res < 0)) { - listener.clear_socket_info(slave); + /* + Slave has sent COM_QUIT or other failure. + Delete it from listener + */ + it.remove(); + m_slaves_changed= true; } + } + else if (net.last_errno == ER_NET_READ_ERROR) + { if (net.last_errno > 0 && global_system_variables.log_warnings > 2) sql_print_warning("Semisync ack receiver got error %d \"%s\" " "from slave server-id %d", net.last_errno, ER_DEFAULT(net.last_errno), slave->server_id()); + it.remove(); + m_slaves_changed= true; } } } mysql_mutex_unlock(&m_mutex); } + end: sql_print_information("Stopping ack receiver thread"); m_status= ST_DOWN; - delete thd; mysql_cond_broadcast(&m_cond); + mysql_cond_broadcast(&m_cond_reply); mysql_mutex_unlock(&m_mutex); + + delete thd; DBUG_VOID_RETURN; } |