diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
commit | a175314c3e5827eb193872241446f2f8f5c9d33c (patch) | |
tree | cd3d60ca99ae00829c52a6ca79150a5b6e62528b /sql/semisync_master_ack_receiver.cc | |
parent | Initial commit. (diff) | |
download | mariadb-10.5-upstream/1%10.5.12.tar.xz mariadb-10.5-upstream/1%10.5.12.zip |
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/semisync_master_ack_receiver.cc')
-rw-r--r-- | sql/semisync_master_ack_receiver.cc | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc new file mode 100644 index 00000000..b65b7824 --- /dev/null +++ b/sql/semisync_master_ack_receiver.cc @@ -0,0 +1,293 @@ +/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include <my_global.h> +#include "semisync_master.h" +#include "semisync_master_ack_receiver.h" + +#ifdef HAVE_PSI_MUTEX_INTERFACE +extern PSI_mutex_key key_LOCK_ack_receiver; +extern PSI_cond_key key_COND_ack_receiver; +#endif +#ifdef HAVE_PSI_THREAD_INTERFACE +extern PSI_thread_key key_thread_ack_receiver; +#endif +extern Repl_semi_sync_master repl_semisync; + +/* Callback function of ack receive thread */ +pthread_handler_t ack_receive_handler(void *arg) +{ + Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg); + + my_thread_init(); + recv->run(); + my_thread_end(); + + return NULL; +} + +Ack_receiver::Ack_receiver() +{ + DBUG_ENTER("Ack_receiver::Ack_receiver"); + + m_status= ST_DOWN; + mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, NULL); + mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL); + m_pid= 0; + + DBUG_VOID_RETURN; +} + +void Ack_receiver::cleanup() +{ + DBUG_ENTER("Ack_receiver::~Ack_receiver"); + + stop(); + mysql_mutex_destroy(&m_mutex); + mysql_cond_destroy(&m_cond); + + DBUG_VOID_RETURN; +} + +bool Ack_receiver::start() +{ + DBUG_ENTER("Ack_receiver::start"); + + mysql_mutex_lock(&m_mutex); + if(m_status == ST_DOWN) + { + pthread_attr_t attr; + + m_status= ST_UP; + + if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) || + pthread_attr_init(&attr) != 0 || + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 || +#ifndef _WIN32 + pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 || +#endif + mysql_thread_create(key_thread_ack_receiver, &m_pid, + &attr, ack_receive_handler, this)) + { + sql_print_error("Failed to start semi-sync ACK receiver thread, " + " could not create thread(errno:%d)", errno); + + m_status= ST_DOWN; + mysql_mutex_unlock(&m_mutex); + + DBUG_RETURN(true); + } + (void) pthread_attr_destroy(&attr); + } + mysql_mutex_unlock(&m_mutex); + + DBUG_RETURN(false); +} + +void Ack_receiver::stop() +{ + DBUG_ENTER("Ack_receiver::stop"); + + mysql_mutex_lock(&m_mutex); + if (m_status == ST_UP) + { + m_status= ST_STOPPING; + mysql_cond_broadcast(&m_cond); + + while (m_status == ST_STOPPING) + mysql_cond_wait(&m_cond, &m_mutex); + + DBUG_ASSERT(m_status == ST_DOWN); + + m_pid= 0; + } + mysql_mutex_unlock(&m_mutex); + + DBUG_VOID_RETURN; +} + +bool Ack_receiver::add_slave(THD *thd) +{ + Slave *slave; + DBUG_ENTER("Ack_receiver::add_slave"); + + if (!(slave= new Slave)) + DBUG_RETURN(true); + + slave->thd= thd; + slave->vio= *thd->net.vio; + slave->vio.mysql_socket.m_psi= NULL; + slave->vio.read_timeout= 1; + + mysql_mutex_lock(&m_mutex); + m_slaves.push_back(slave); + m_slaves_changed= true; + mysql_cond_broadcast(&m_cond); + mysql_mutex_unlock(&m_mutex); + + DBUG_RETURN(false); +} + +void Ack_receiver::remove_slave(THD *thd) +{ + I_List_iterator<Slave> it(m_slaves); + Slave *slave; + DBUG_ENTER("Ack_receiver::remove_slave"); + + mysql_mutex_lock(&m_mutex); + + while ((slave= it++)) + { + if (slave->thd == thd) + { + delete slave; + m_slaves_changed= true; + break; + } + } + mysql_mutex_unlock(&m_mutex); + + DBUG_VOID_RETURN; +} + +inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage) +{ + (void)MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__); +} + +inline void Ack_receiver::wait_for_slave_connection() +{ + set_stage_info(stage_waiting_for_semi_sync_slave); + mysql_cond_wait(&m_cond, &m_mutex); +} + +/* Auxilary function to initialize a NET object with given net buffer. */ +static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) +{ + memset(net, 0, sizeof(NET)); + net->max_packet= buff_len; + net->buff= buff; + net->buff_end= buff + buff_len; + net->read_pos= net->buff; +} + +void Ack_receiver::run() +{ + THD *thd= new THD(next_thread_id()); + NET net; + unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; + + my_thread_init(); + + DBUG_ENTER("Ack_receiver::run"); + +#ifdef HAVE_POLL + Poll_socket_listener listener(m_slaves); +#else + Select_socket_listener listener(m_slaves); +#endif //HAVE_POLL + + sql_print_information("Starting ack receiver thread"); + thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; + thd->thread_stack= (char*) &thd; + thd->store_globals(); + thd->security_ctx->skip_grants(); + thd->set_command(COM_DAEMON); + init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); + + mysql_mutex_lock(&m_mutex); + m_slaves_changed= true; + mysql_mutex_unlock(&m_mutex); + + while (1) + { + int ret; + uint slave_count __attribute__((unused))= 0; + Slave *slave; + + mysql_mutex_lock(&m_mutex); + if (unlikely(m_status == ST_STOPPING)) + goto end; + + set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); + if (unlikely(m_slaves_changed)) + { + if (unlikely(m_slaves.is_empty())) + { + wait_for_slave_connection(); + mysql_mutex_unlock(&m_mutex); + continue; + } + + if ((slave_count= listener.init_slave_sockets()) == 0) + goto end; + m_slaves_changed= false; +#ifdef HAVE_POLL + DBUG_PRINT("info", ("fd count %u", slave_count)); +#else + DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, + (int) listener.get_max_fd())); +#endif + } + + ret= listener.listen_on_sockets(); + if (ret <= 0) + { + mysql_mutex_unlock(&m_mutex); + + ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret); + + if (ret == -1 && errno != EINTR) + sql_print_information("Failed to wait on semi-sync sockets, " + "error: errno=%d", socket_errno); + /* Sleep 1us, so other threads can catch the m_mutex easily. */ + my_sleep(1); + continue; + } + + set_stage_info(stage_reading_semi_sync_ack); + Slave_ilist_iterator it(m_slaves); + while ((slave= it++)) + { + if (listener.is_socket_active(slave)) + { + ulong len; + + net_clear(&net, 0); + net.vio= &slave->vio; + /* + Set compress flag. This is needed to support + Slave_compress_protocol flag enabled Slaves + */ + net.compress= slave->thd->net.compress; + + len= my_net_read(&net); + if (likely(len != packet_error)) + repl_semisync_master.report_reply_packet(slave->server_id(), + net.read_pos, len); + else if (net.last_errno == ER_NET_READ_ERROR) + listener.clear_socket_info(slave); + } + } + mysql_mutex_unlock(&m_mutex); + } +end: + sql_print_information("Stopping ack receiver thread"); + m_status= ST_DOWN; + delete thd; + mysql_cond_broadcast(&m_cond); + mysql_mutex_unlock(&m_mutex); + DBUG_VOID_RETURN; +} |