diff options
Diffstat (limited to 'sql/wsrep_storage_service.cc')
-rw-r--r-- | sql/wsrep_storage_service.cc | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/sql/wsrep_storage_service.cc b/sql/wsrep_storage_service.cc new file mode 100644 index 00000000..4885fd9f --- /dev/null +++ b/sql/wsrep_storage_service.cc @@ -0,0 +1,206 @@ +/* Copyright 2018 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "my_global.h" +#include "wsrep_storage_service.h" +#include "wsrep_trans_observer.h" /* wsrep_open() */ +#include "wsrep_schema.h" +#include "wsrep_binlog.h" + +#include "sql_class.h" +#include "mysqld.h" /* next_query_id() */ +#include "slave.h" /* opt_log_slave_updates() */ +#include "transaction.h" /* trans_commit(), trans_rollback() */ + +/* + Temporarily enable wsrep on thd + */ +class Wsrep_on +{ +public: + Wsrep_on(THD* thd) + : m_thd(thd) + , m_wsrep_on(thd->variables.wsrep_on) + { + thd->variables.wsrep_on= TRUE; + } + ~Wsrep_on() + { + m_thd->variables.wsrep_on= m_wsrep_on; + } +private: + THD* m_thd; + my_bool m_wsrep_on; +}; + +Wsrep_storage_service::Wsrep_storage_service(THD* thd) + : wsrep::storage_service() + , wsrep::high_priority_context(thd->wsrep_cs()) + , m_thd(thd) +{ + thd->security_ctx->skip_grants(); + thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; + + /* No binlogging */ + + /* No general log */ + thd->variables.option_bits |= OPTION_LOG_OFF; + + /* Read committed isolation to avoid gap locking */ + thd->variables.tx_isolation = ISO_READ_COMMITTED; + + /* Keep wsrep on to enter commit ordering hooks */ + thd->variables.wsrep_on= 1; + thd->wsrep_skip_locking= true; + + wsrep_open(thd); + wsrep_before_command(thd); +} + +Wsrep_storage_service::~Wsrep_storage_service() +{ + wsrep_after_command_ignore_result(m_thd); + wsrep_close(m_thd); + m_thd->wsrep_skip_locking= false; +} + +int Wsrep_storage_service::start_transaction(const wsrep::ws_handle& ws_handle) +{ + DBUG_ENTER("Wsrep_storage_service::start_transaction"); + DBUG_ASSERT(m_thd == current_thd); + DBUG_PRINT("info", ("Wsrep_storage_service::start_transcation(%llu, %p)", + m_thd->thread_id, m_thd)); + m_thd->set_wsrep_next_trx_id(ws_handle.transaction_id().get()); + DBUG_RETURN(m_thd->wsrep_cs().start_transaction( + wsrep::transaction_id(m_thd->wsrep_next_trx_id())) || + trans_begin(m_thd, MYSQL_START_TRANS_OPT_READ_WRITE)); +} + +void Wsrep_storage_service::adopt_transaction(const wsrep::transaction& transaction) +{ + DBUG_ENTER("Wsrep_Storage_server::adopt_transaction"); + DBUG_ASSERT(m_thd == current_thd); + m_thd->wsrep_cs().adopt_transaction(transaction); + trans_begin(m_thd, MYSQL_START_TRANS_OPT_READ_WRITE); + DBUG_VOID_RETURN; +} + +int Wsrep_storage_service::append_fragment(const wsrep::id& server_id, + wsrep::transaction_id transaction_id, + int flags, + const wsrep::const_buffer& data, + const wsrep::xid& xid WSREP_UNUSED) +{ + DBUG_ENTER("Wsrep_storage_service::append_fragment"); + DBUG_ASSERT(m_thd == current_thd); + DBUG_PRINT("info", ("Wsrep_storage_service::append_fragment(%llu, %p)", + m_thd->thread_id, m_thd)); + int ret= wsrep_schema->append_fragment(m_thd, + server_id, + transaction_id, + wsrep::seqno(-1), + flags, + data); + DBUG_RETURN(ret); +} + +int Wsrep_storage_service::update_fragment_meta(const wsrep::ws_meta& ws_meta) +{ + DBUG_ENTER("Wsrep_storage_service::update_fragment_meta"); + DBUG_ASSERT(m_thd == current_thd); + DBUG_PRINT("info", ("Wsrep_storage_service::update_fragment_meta(%llu, %p)", + m_thd->thread_id, m_thd)); + int ret= wsrep_schema->update_fragment_meta(m_thd, ws_meta); + DBUG_RETURN(ret); +} + +int Wsrep_storage_service::remove_fragments() +{ + DBUG_ENTER("Wsrep_storage_service::remove_fragments"); + DBUG_ASSERT(m_thd == current_thd); + + int ret= wsrep_schema->remove_fragments(m_thd, + m_thd->wsrep_trx().server_id(), + m_thd->wsrep_trx().id(), + m_thd->wsrep_sr().fragments()); + DBUG_RETURN(ret); +} + +int Wsrep_storage_service::commit(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + DBUG_ENTER("Wsrep_storage_service::commit"); + DBUG_ASSERT(m_thd == current_thd); + DBUG_PRINT("info", ("Wsrep_storage_service::commit(%llu, %p)", + m_thd->thread_id, m_thd)); + WSREP_DEBUG("Storage service commit: %llu, %lld", + ws_meta.transaction_id().get(), ws_meta.seqno().get()); + int ret= 0; + const bool is_ordered= !ws_meta.seqno().is_undefined(); + + if (is_ordered) + { + ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true); + } + + ret= ret || trans_commit(m_thd); + + if (!is_ordered) + { + /* Wsrep commit was not ordered so it does not go through commit time + hooks and remains active. Roll it back to make cleanup happen + in after_applying() call. */ + m_thd->wsrep_cs().before_rollback(); + m_thd->wsrep_cs().after_rollback(); + } + else if (ret) + { + /* Commit failed, this probably means that the parent SR transaction + was BF aborted. Roll back out of order, the parent + transaction will release commit order after it has rolled back. */ + m_thd->wsrep_cs().prepare_for_ordering(wsrep::ws_handle(), + wsrep::ws_meta(), + false); + trans_rollback(m_thd); + } + m_thd->wsrep_cs().after_applying(); + m_thd->release_transactional_locks(); + DBUG_RETURN(ret); +} + +int Wsrep_storage_service::rollback(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + DBUG_ENTER("Wsrep_storage_service::rollback"); + DBUG_ASSERT(m_thd == current_thd); + DBUG_PRINT("info", ("Wsrep_storage_service::rollback(%llu, %p)", + m_thd->thread_id, m_thd)); + int ret= (m_thd->wsrep_cs().prepare_for_ordering( + ws_handle, ws_meta, false) || + trans_rollback(m_thd)); + m_thd->wsrep_cs().after_applying(); + m_thd->release_transactional_locks(); + DBUG_RETURN(ret); +} + +void Wsrep_storage_service::store_globals() +{ + wsrep_store_threadvars(m_thd); +} + +void Wsrep_storage_service::reset_globals() +{ + wsrep_reset_threadvars(m_thd); +} |