summaryrefslogtreecommitdiffstats
path: root/sql/rpl_injector.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:24:36 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:24:36 +0000
commit06eaf7232e9a920468c0f8d74dcf2fe8b555501c (patch)
treee2c7b5777f728320e5b5542b6213fd3591ba51e2 /sql/rpl_injector.cc
parentInitial commit. (diff)
downloadmariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.tar.xz
mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.zip
Adding upstream version 1:10.11.6.upstream/1%10.11.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/rpl_injector.cc')
-rw-r--r--sql/rpl_injector.cc197
1 files changed, 197 insertions, 0 deletions
diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc
new file mode 100644
index 00000000..3080d92b
--- /dev/null
+++ b/sql/rpl_injector.cc
@@ -0,0 +1,197 @@
+/* Copyright (c) 2006, 2011, Oracle and/or its affiliates.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
+
+#include "mariadb.h"
+#include "sql_priv.h"
+#include "rpl_injector.h"
+#include "transaction.h"
+#include "sql_parse.h" // begin_trans, end_trans, COMMIT
+#include "sql_base.h" // close_thread_tables
+#include "log_event.h" // Incident_log_event
+
+/*
+ injector::transaction - member definitions
+*/
+
+/* inline since it's called below */
+inline
+injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
+ : m_state(START_STATE), m_thd(thd)
+{
+ /*
+ Default initialization of m_start_pos (which initializes it to garbage).
+ We need to fill it in using the code below.
+ */
+ LOG_INFO log_info;
+ log->get_current_log(&log_info);
+ /* !!! binlog_pos does not follow RAII !!! */
+ m_start_pos.m_file_name= my_strdup(key_memory_binlog_pos,
+ log_info.log_file_name, MYF(0));
+ m_start_pos.m_file_pos= log_info.pos;
+
+ m_thd->lex->start_transaction_opt= 0; /* for begin_trans() */
+ trans_begin(m_thd);
+}
+
+injector::transaction::~transaction()
+{
+ if (!good())
+ return;
+
+ /* Needed since my_free expects a 'char*' (instead of 'void*'). */
+ char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
+
+ /*
+ We set the first character to null just to give all the copies of the
+ start position a (minimal) chance of seening that the memory is lost.
+ All assuming the my_free does not step over the memory, of course.
+ */
+ *the_memory= '\0';
+
+ my_free(the_memory);
+}
+
+/**
+ @retval 0 transaction committed
+ @retval 1 transaction rolled back
+ */
+int injector::transaction::commit()
+{
+ DBUG_ENTER("injector::transaction::commit()");
+ int error= m_thd->binlog_flush_pending_rows_event(true);
+ /*
+ Cluster replication does not preserve statement or
+ transaction boundaries of the master. Instead, a new
+ transaction on replication slave is started when a new GCI
+ (global checkpoint identifier) is issued, and is committed
+ when the last event of the check point has been received and
+ processed. This ensures consistency of each cluster in
+ cluster replication, and there is no requirement for stronger
+ consistency: MySQL replication is asynchronous with other
+ engines as well.
+
+ A practical consequence of that is that row level replication
+ stream passed through the injector thread never contains
+ COMMIT events.
+ Here we should preserve the server invariant that there is no
+ outstanding statement transaction when the normal transaction
+ is committed by committing the statement transaction
+ explicitly.
+ */
+ trans_commit_stmt(m_thd);
+ if (!trans_commit(m_thd))
+ {
+ close_thread_tables(m_thd);
+ m_thd->release_transactional_locks();
+ }
+ DBUG_RETURN(error);
+}
+
+
+#ifdef TO_BE_DELETED
+int injector::transaction::use_table(server_id_type sid, table tbl)
+{
+ DBUG_ENTER("injector::transaction::use_table");
+
+ int error;
+
+ if (unlikely((error= check_state(TABLE_STATE))))
+ DBUG_RETURN(error);
+
+ server_id_type save_id= m_thd->variables.server_id;
+ m_thd->set_server_id(sid);
+ DBUG_ASSERT(tbl.is_transactional() == tbl.get_table()->file->row_logging_has_trans);
+ error= m_thd->binlog_write_table_map(tbl.get_table(), 0);
+ m_thd->set_server_id(save_id);
+ DBUG_RETURN(error);
+}
+#endif
+
+
+injector::transaction::binlog_pos injector::transaction::start_pos() const
+{
+ return m_start_pos;
+}
+
+
+/*
+ injector - member definitions
+*/
+
+/* This constructor is called below */
+inline injector::injector() = default;
+
+static injector *s_injector= 0;
+injector *injector::instance()
+{
+ if (s_injector == 0)
+ s_injector= new injector;
+ /* "There can be only one [instance]" */
+ return s_injector;
+}
+
+void injector::free_instance()
+{
+ injector *inj = s_injector;
+
+ if (inj != 0)
+ {
+ s_injector= 0;
+ delete inj;
+ }
+}
+
+
+injector::transaction injector::new_trans(THD *thd)
+{
+ DBUG_ENTER("injector::new_trans(THD*)");
+ /*
+ Currently, there is no alternative to using 'mysql_bin_log' since that
+ is hardcoded into the way the handler is using the binary log.
+ */
+ DBUG_RETURN(transaction(&mysql_bin_log, thd));
+}
+
+void injector::new_trans(THD *thd, injector::transaction *ptr)
+{
+ DBUG_ENTER("injector::new_trans(THD *, transaction *)");
+ /*
+ Currently, there is no alternative to using 'mysql_bin_log' since that
+ is hardcoded into the way the handler is using the binary log.
+ */
+ transaction trans(&mysql_bin_log, thd);
+ ptr->swap(trans);
+
+ DBUG_VOID_RETURN;
+}
+
+int injector::record_incident(THD *thd, Incident incident)
+{
+ Incident_log_event ev(thd, incident);
+ int error;
+ if (unlikely((error= mysql_bin_log.write(&ev))))
+ return error;
+ return mysql_bin_log.rotate_and_purge(true);
+}
+
+int injector::record_incident(THD *thd, Incident incident,
+ const LEX_CSTRING *message)
+{
+ Incident_log_event ev(thd, incident, message);
+ int error;
+ if (unlikely((error= mysql_bin_log.write(&ev))))
+ return error;
+ return mysql_bin_log.rotate_and_purge(true);
+}