summaryrefslogtreecommitdiffstats
path: root/sql/semisync_master_ack_receiver.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
commita175314c3e5827eb193872241446f2f8f5c9d33c (patch)
treecd3d60ca99ae00829c52a6ca79150a5b6e62528b /sql/semisync_master_ack_receiver.cc
parentInitial commit. (diff)
downloadmariadb-10.5-upstream/1%10.5.12.tar.xz
mariadb-10.5-upstream/1%10.5.12.zip
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/semisync_master_ack_receiver.cc')
-rw-r--r--sql/semisync_master_ack_receiver.cc293
1 files changed, 293 insertions, 0 deletions
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc
new file mode 100644
index 00000000..b65b7824
--- /dev/null
+++ b/sql/semisync_master_ack_receiver.cc
@@ -0,0 +1,293 @@
+/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#include <my_global.h>
+#include "semisync_master.h"
+#include "semisync_master_ack_receiver.h"
+
+#ifdef HAVE_PSI_MUTEX_INTERFACE
+extern PSI_mutex_key key_LOCK_ack_receiver;
+extern PSI_cond_key key_COND_ack_receiver;
+#endif
+#ifdef HAVE_PSI_THREAD_INTERFACE
+extern PSI_thread_key key_thread_ack_receiver;
+#endif
+extern Repl_semi_sync_master repl_semisync;
+
+/* Callback function of ack receive thread */
+pthread_handler_t ack_receive_handler(void *arg)
+{
+ Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
+
+ my_thread_init();
+ recv->run();
+ my_thread_end();
+
+ return NULL;
+}
+
+Ack_receiver::Ack_receiver()
+{
+ DBUG_ENTER("Ack_receiver::Ack_receiver");
+
+ m_status= ST_DOWN;
+ mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, NULL);
+ mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL);
+ m_pid= 0;
+
+ DBUG_VOID_RETURN;
+}
+
+void Ack_receiver::cleanup()
+{
+ DBUG_ENTER("Ack_receiver::~Ack_receiver");
+
+ stop();
+ mysql_mutex_destroy(&m_mutex);
+ mysql_cond_destroy(&m_cond);
+
+ DBUG_VOID_RETURN;
+}
+
+bool Ack_receiver::start()
+{
+ DBUG_ENTER("Ack_receiver::start");
+
+ mysql_mutex_lock(&m_mutex);
+ if(m_status == ST_DOWN)
+ {
+ pthread_attr_t attr;
+
+ m_status= ST_UP;
+
+ if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
+ pthread_attr_init(&attr) != 0 ||
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
+#ifndef _WIN32
+ pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
+#endif
+ mysql_thread_create(key_thread_ack_receiver, &m_pid,
+ &attr, ack_receive_handler, this))
+ {
+ sql_print_error("Failed to start semi-sync ACK receiver thread, "
+ " could not create thread(errno:%d)", errno);
+
+ m_status= ST_DOWN;
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_RETURN(true);
+ }
+ (void) pthread_attr_destroy(&attr);
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_RETURN(false);
+}
+
+void Ack_receiver::stop()
+{
+ DBUG_ENTER("Ack_receiver::stop");
+
+ mysql_mutex_lock(&m_mutex);
+ if (m_status == ST_UP)
+ {
+ m_status= ST_STOPPING;
+ mysql_cond_broadcast(&m_cond);
+
+ while (m_status == ST_STOPPING)
+ mysql_cond_wait(&m_cond, &m_mutex);
+
+ DBUG_ASSERT(m_status == ST_DOWN);
+
+ m_pid= 0;
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_VOID_RETURN;
+}
+
+bool Ack_receiver::add_slave(THD *thd)
+{
+ Slave *slave;
+ DBUG_ENTER("Ack_receiver::add_slave");
+
+ if (!(slave= new Slave))
+ DBUG_RETURN(true);
+
+ slave->thd= thd;
+ slave->vio= *thd->net.vio;
+ slave->vio.mysql_socket.m_psi= NULL;
+ slave->vio.read_timeout= 1;
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves.push_back(slave);
+ m_slaves_changed= true;
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_RETURN(false);
+}
+
+void Ack_receiver::remove_slave(THD *thd)
+{
+ I_List_iterator<Slave> it(m_slaves);
+ Slave *slave;
+ DBUG_ENTER("Ack_receiver::remove_slave");
+
+ mysql_mutex_lock(&m_mutex);
+
+ while ((slave= it++))
+ {
+ if (slave->thd == thd)
+ {
+ delete slave;
+ m_slaves_changed= true;
+ break;
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_VOID_RETURN;
+}
+
+inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
+{
+ (void)MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
+}
+
+inline void Ack_receiver::wait_for_slave_connection()
+{
+ set_stage_info(stage_waiting_for_semi_sync_slave);
+ mysql_cond_wait(&m_cond, &m_mutex);
+}
+
+/* Auxilary function to initialize a NET object with given net buffer. */
+static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
+{
+ memset(net, 0, sizeof(NET));
+ net->max_packet= buff_len;
+ net->buff= buff;
+ net->buff_end= buff + buff_len;
+ net->read_pos= net->buff;
+}
+
+void Ack_receiver::run()
+{
+ THD *thd= new THD(next_thread_id());
+ NET net;
+ unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
+
+ my_thread_init();
+
+ DBUG_ENTER("Ack_receiver::run");
+
+#ifdef HAVE_POLL
+ Poll_socket_listener listener(m_slaves);
+#else
+ Select_socket_listener listener(m_slaves);
+#endif //HAVE_POLL
+
+ sql_print_information("Starting ack receiver thread");
+ thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
+ thd->thread_stack= (char*) &thd;
+ thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thd->set_command(COM_DAEMON);
+ init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves_changed= true;
+ mysql_mutex_unlock(&m_mutex);
+
+ while (1)
+ {
+ int ret;
+ uint slave_count __attribute__((unused))= 0;
+ Slave *slave;
+
+ mysql_mutex_lock(&m_mutex);
+ if (unlikely(m_status == ST_STOPPING))
+ goto end;
+
+ set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
+ if (unlikely(m_slaves_changed))
+ {
+ if (unlikely(m_slaves.is_empty()))
+ {
+ wait_for_slave_connection();
+ mysql_mutex_unlock(&m_mutex);
+ continue;
+ }
+
+ if ((slave_count= listener.init_slave_sockets()) == 0)
+ goto end;
+ m_slaves_changed= false;
+#ifdef HAVE_POLL
+ DBUG_PRINT("info", ("fd count %u", slave_count));
+#else
+ DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,
+ (int) listener.get_max_fd()));
+#endif
+ }
+
+ ret= listener.listen_on_sockets();
+ if (ret <= 0)
+ {
+ mysql_mutex_unlock(&m_mutex);
+
+ ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
+
+ if (ret == -1 && errno != EINTR)
+ sql_print_information("Failed to wait on semi-sync sockets, "
+ "error: errno=%d", socket_errno);
+ /* Sleep 1us, so other threads can catch the m_mutex easily. */
+ my_sleep(1);
+ continue;
+ }
+
+ set_stage_info(stage_reading_semi_sync_ack);
+ Slave_ilist_iterator it(m_slaves);
+ while ((slave= it++))
+ {
+ if (listener.is_socket_active(slave))
+ {
+ ulong len;
+
+ net_clear(&net, 0);
+ net.vio= &slave->vio;
+ /*
+ Set compress flag. This is needed to support
+ Slave_compress_protocol flag enabled Slaves
+ */
+ net.compress= slave->thd->net.compress;
+
+ len= my_net_read(&net);
+ if (likely(len != packet_error))
+ repl_semisync_master.report_reply_packet(slave->server_id(),
+ net.read_pos, len);
+ else if (net.last_errno == ER_NET_READ_ERROR)
+ listener.clear_socket_info(slave);
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+ }
+end:
+ sql_print_information("Stopping ack receiver thread");
+ m_status= ST_DOWN;
+ delete thd;
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+ DBUG_VOID_RETURN;
+}