summaryrefslogtreecommitdiffstats
path: root/sql/wsrep_applier.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_applier.cc')
-rw-r--r--sql/wsrep_applier.cc233
1 files changed, 233 insertions, 0 deletions
diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc
new file mode 100644
index 00000000..90ede81a
--- /dev/null
+++ b/sql/wsrep_applier.cc
@@ -0,0 +1,233 @@
+/* Copyright (C) 2013-2019 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
+
+#include "mariadb.h"
+#include "mysql/service_wsrep.h"
+#include "wsrep_applier.h"
+
+#include "wsrep_priv.h"
+#include "wsrep_binlog.h" // wsrep_dump_rbr_buf()
+#include "wsrep_xid.h"
+#include "wsrep_thd.h"
+#include "wsrep_trans_observer.h"
+
+#include "slave.h" // opt_log_slave_updates
+#include "debug_sync.h"
+
+/*
+ read the first event from (*buf). The size of the (*buf) is (*buf_len).
+ At the end (*buf) is shitfed to point to the following event or NULL and
+ (*buf_len) will be changed to account just being read bytes of the 1st event.
+*/
+static Log_event* wsrep_read_log_event(
+ char **arg_buf, size_t *arg_buf_len,
+ const Format_description_log_event *description_event)
+{
+ DBUG_ENTER("wsrep_read_log_event");
+ char *head= (*arg_buf);
+
+ uint data_len= uint4korr(head + EVENT_LEN_OFFSET);
+ uchar *buf= (uchar*) (*arg_buf);
+ const char *error= 0;
+ Log_event *res= 0;
+
+ res= Log_event::read_log_event(buf, data_len, &error, description_event,
+ true);
+
+ if (!res)
+ {
+ DBUG_ASSERT(error != 0);
+ sql_print_error("Error in Log_event::read_log_event(): "
+ "'%s', data_len: %d, event_type: %d",
+ error,data_len,head[EVENT_TYPE_OFFSET]);
+ }
+ (*arg_buf)+= data_len;
+ (*arg_buf_len)-= data_len;
+ DBUG_RETURN(res);
+}
+
+#include "transaction.h" // trans_commit(), trans_rollback()
+
+void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
+{
+ if (thd->wsrep_apply_format)
+ {
+ delete (Format_description_log_event*)thd->wsrep_apply_format;
+ }
+ thd->wsrep_apply_format= ev;
+}
+
+Format_description_log_event*
+wsrep_get_apply_format(THD* thd)
+{
+ if (thd->wsrep_apply_format)
+ {
+ return (Format_description_log_event*) thd->wsrep_apply_format;
+ }
+
+ DBUG_ASSERT(thd->wsrep_rgi);
+
+ return thd->wsrep_rgi->rli->relay_log.description_event_for_exec;
+}
+
+void wsrep_store_error(const THD* const thd, wsrep::mutable_buffer& dst)
+{
+ Diagnostics_area::Sql_condition_iterator it=
+ thd->get_stmt_da()->sql_conditions();
+ const Sql_condition* cond;
+
+ static size_t const max_len= 2*MAX_SLAVE_ERRMSG; // 2x so that we have enough
+
+ dst.resize(max_len);
+
+ char* slider= dst.data();
+ const char* const buf_end= slider + max_len - 1; // -1: leave space for \0
+
+ for (cond= it++; cond && slider < buf_end; cond= it++)
+ {
+ uint const err_code= cond->get_sql_errno();
+ const char* const err_str= cond->get_message_text();
+
+ slider+= my_snprintf(slider, buf_end - slider, " %s, Error_code: %d;",
+ err_str, err_code);
+ }
+
+ if (slider != dst.data())
+ {
+ *slider= '\0';
+ slider++;
+ }
+
+ dst.resize(slider - dst.data());
+
+ WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: '%s'",
+ thd->thread_id, (long long)wsrep_thd_trx_seqno(thd),
+ dst.size(), dst.size() ? dst.data() : "(null)");
+}
+
+int wsrep_apply_events(THD* thd,
+ Relay_log_info* rli,
+ const void* events_buf,
+ size_t buf_len)
+{
+ char *buf= (char *)events_buf;
+ int rcode= 0;
+ int event= 1;
+ Log_event_type typ;
+
+ DBUG_ENTER("wsrep_apply_events");
+ if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
+ (long long) wsrep_thd_trx_seqno(thd));
+
+ thd->variables.gtid_seq_no= 0;
+ if (wsrep_gtid_mode)
+ thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id;
+ else
+ thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id;
+
+ while (buf_len)
+ {
+ int exec_res;
+ Log_event* ev= wsrep_read_log_event(&buf, &buf_len,
+ wsrep_get_apply_format(thd));
+ if (!ev)
+ {
+ WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %zu",
+ (long long)wsrep_thd_trx_seqno(thd), buf_len);
+ rcode= WSREP_ERR_BAD_EVENT;
+ goto error;
+ }
+
+ typ= ev->get_type_code();
+
+ switch (typ) {
+ case FORMAT_DESCRIPTION_EVENT:
+ wsrep_set_apply_format(thd, (Format_description_log_event*)ev);
+ continue;
+ case GTID_EVENT:
+ {
+ Gtid_log_event *gtid_ev= (Gtid_log_event*)ev;
+ thd->variables.server_id= gtid_ev->server_id;
+ thd->variables.gtid_domain_id= gtid_ev->domain_id;
+ if ((gtid_ev->server_id == wsrep_gtid_server.server_id) &&
+ (gtid_ev->domain_id == wsrep_gtid_server.domain_id))
+ {
+ thd->variables.wsrep_gtid_seq_no= gtid_ev->seq_no;
+ }
+ else
+ {
+ thd->variables.gtid_seq_no= gtid_ev->seq_no;
+ }
+ delete ev;
+ }
+ continue;
+ default:
+ break;
+ }
+
+
+ if (!thd->variables.gtid_seq_no && wsrep_thd_is_toi(thd) &&
+ (ev->get_type_code() == QUERY_EVENT))
+ {
+ uint64 seqno= wsrep_gtid_server.seqno_inc();
+ thd->wsrep_current_gtid_seqno= seqno;
+ if (mysql_bin_log.is_open() && wsrep_gtid_mode)
+ {
+ thd->variables.gtid_seq_no= seqno;
+ }
+ }
+ /* Use the original server id for logging. */
+ thd->set_server_id(ev->server_id);
+ thd->set_time(); // time the query
+ thd->transaction->start_time.reset(thd);
+ thd->lex->current_select= 0;
+ if (!ev->when)
+ {
+ my_hrtime_t hrtime= my_hrtime();
+ ev->when= hrtime_to_my_time(hrtime);
+ ev->when_sec_part= hrtime_sec_part(hrtime);
+ }
+
+ thd->variables.option_bits=
+ (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
+ (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
+
+ ev->thd= thd;
+ exec_res= ev->apply_event(thd->wsrep_rgi);
+ DBUG_PRINT("info", ("exec_event result: %d", exec_res));
+
+ if (exec_res)
+ {
+ WSREP_WARN("Event %d %s apply failed: %d, seqno %lld",
+ event, ev->get_type_str(), exec_res,
+ (long long) wsrep_thd_trx_seqno(thd));
+ rcode= exec_res;
+ /* stop processing for the first error */
+ delete ev;
+ goto error;
+ }
+ event++;
+
+ delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev);
+ }
+
+error:
+ if (thd->killed == KILL_CONNECTION)
+ WSREP_INFO("applier aborted: %lld", (long long)wsrep_thd_trx_seqno(thd));
+
+ wsrep_set_apply_format(thd, NULL);
+
+ DBUG_RETURN(rcode);
+}