From a175314c3e5827eb193872241446f2f8f5c9d33c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 20:07:14 +0200 Subject: Adding upstream version 1:10.5.12. Signed-off-by: Daniel Baumann --- sql/wsrep_applier.cc | 233 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 sql/wsrep_applier.cc (limited to 'sql/wsrep_applier.cc') diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc new file mode 100644 index 00000000..4005de22 --- /dev/null +++ b/sql/wsrep_applier.cc @@ -0,0 +1,233 @@ +/* Copyright (C) 2013-2019 Codership Oy + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */ + +#include "mariadb.h" +#include "mysql/service_wsrep.h" +#include "wsrep_applier.h" + +#include "wsrep_priv.h" +#include "wsrep_binlog.h" // wsrep_dump_rbr_buf() +#include "wsrep_xid.h" +#include "wsrep_thd.h" +#include "wsrep_trans_observer.h" + +#include "slave.h" // opt_log_slave_updates +#include "debug_sync.h" + +/* + read the first event from (*buf). The size of the (*buf) is (*buf_len). + At the end (*buf) is shitfed to point to the following event or NULL and + (*buf_len) will be changed to account just being read bytes of the 1st event. +*/ +static Log_event* wsrep_read_log_event( + char **arg_buf, size_t *arg_buf_len, + const Format_description_log_event *description_event) +{ + DBUG_ENTER("wsrep_read_log_event"); + char *head= (*arg_buf); + + uint data_len= uint4korr(head + EVENT_LEN_OFFSET); + char *buf= (*arg_buf); + const char *error= 0; + Log_event *res= 0; + + res= Log_event::read_log_event(buf, data_len, &error, description_event, + true); + + if (!res) + { + DBUG_ASSERT(error != 0); + sql_print_error("Error in Log_event::read_log_event(): " + "'%s', data_len: %d, event_type: %d", + error,data_len,head[EVENT_TYPE_OFFSET]); + } + (*arg_buf)+= data_len; + (*arg_buf_len)-= data_len; + DBUG_RETURN(res); +} + +#include "transaction.h" // trans_commit(), trans_rollback() + +void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev) +{ + if (thd->wsrep_apply_format) + { + delete (Format_description_log_event*)thd->wsrep_apply_format; + } + thd->wsrep_apply_format= ev; +} + +Format_description_log_event* +wsrep_get_apply_format(THD* thd) +{ + if (thd->wsrep_apply_format) + { + return (Format_description_log_event*) thd->wsrep_apply_format; + } + + DBUG_ASSERT(thd->wsrep_rgi); + + return thd->wsrep_rgi->rli->relay_log.description_event_for_exec; +} + +void wsrep_store_error(const THD* const thd, wsrep::mutable_buffer& dst) +{ + Diagnostics_area::Sql_condition_iterator it= + thd->get_stmt_da()->sql_conditions(); + const Sql_condition* cond; + + static size_t const max_len= 2*MAX_SLAVE_ERRMSG; // 2x so that we have enough + + dst.resize(max_len); + + char* slider= dst.data(); + const char* const buf_end= slider + max_len - 1; // -1: leave space for \0 + + for (cond= it++; cond && slider < buf_end; cond= it++) + { + uint const err_code= cond->get_sql_errno(); + const char* const err_str= cond->get_message_text(); + + slider+= my_snprintf(slider, buf_end - slider, " %s, Error_code: %d;", + err_str, err_code); + } + + if (slider != dst.data()) + { + *slider= '\0'; + slider++; + } + + dst.resize(slider - dst.data()); + + WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: '%s'", + thd->thread_id, (long long)wsrep_thd_trx_seqno(thd), + dst.size(), dst.size() ? dst.data() : "(null)"); +} + +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const void* events_buf, + size_t buf_len) +{ + char *buf= (char *)events_buf; + int rcode= 0; + int event= 1; + Log_event_type typ; + + DBUG_ENTER("wsrep_apply_events"); + if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", + (long long) wsrep_thd_trx_seqno(thd)); + + thd->variables.gtid_seq_no= 0; + if (wsrep_gtid_mode) + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + else + thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + + while (buf_len) + { + int exec_res; + Log_event* ev= wsrep_read_log_event(&buf, &buf_len, + wsrep_get_apply_format(thd)); + if (!ev) + { + WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %zu", + (long long)wsrep_thd_trx_seqno(thd), buf_len); + rcode= WSREP_ERR_BAD_EVENT; + goto error; + } + + typ= ev->get_type_code(); + + switch (typ) { + case FORMAT_DESCRIPTION_EVENT: + wsrep_set_apply_format(thd, (Format_description_log_event*)ev); + continue; + case GTID_EVENT: + { + Gtid_log_event *gtid_ev= (Gtid_log_event*)ev; + thd->variables.server_id= gtid_ev->server_id; + thd->variables.gtid_domain_id= gtid_ev->domain_id; + if ((gtid_ev->server_id == wsrep_gtid_server.server_id) && + (gtid_ev->domain_id == wsrep_gtid_server.domain_id)) + { + thd->variables.wsrep_gtid_seq_no= gtid_ev->seq_no; + } + else + { + thd->variables.gtid_seq_no= gtid_ev->seq_no; + } + delete ev; + } + continue; + default: + break; + } + + + if (!thd->variables.gtid_seq_no && wsrep_thd_is_toi(thd) && + (ev->get_type_code() == QUERY_EVENT)) + { + uint64 seqno= wsrep_gtid_server.seqno_inc(); + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + } + } + /* Use the original server id for logging. */ + thd->set_server_id(ev->server_id); + thd->set_time(); // time the query + thd->transaction->start_time.reset(thd); + thd->lex->current_select= 0; + if (!ev->when) + { + my_hrtime_t hrtime= my_hrtime(); + ev->when= hrtime_to_my_time(hrtime); + ev->when_sec_part= hrtime_sec_part(hrtime); + } + + thd->variables.option_bits= + (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | + (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); + + ev->thd= thd; + exec_res= ev->apply_event(thd->wsrep_rgi); + DBUG_PRINT("info", ("exec_event result: %d", exec_res)); + + if (exec_res) + { + WSREP_WARN("Event %d %s apply failed: %d, seqno %lld", + event, ev->get_type_str(), exec_res, + (long long) wsrep_thd_trx_seqno(thd)); + rcode= exec_res; + /* stop processing for the first error */ + delete ev; + goto error; + } + event++; + + delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); + } + +error: + if (thd->killed == KILL_CONNECTION) + WSREP_INFO("applier aborted: %lld", (long long)wsrep_thd_trx_seqno(thd)); + + wsrep_set_apply_format(thd, NULL); + + DBUG_RETURN(rcode); +} -- cgit v1.2.3