diff options
Diffstat (limited to '')
-rw-r--r-- | wsrep-lib/include/wsrep/transaction.hpp | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/wsrep-lib/include/wsrep/transaction.hpp b/wsrep-lib/include/wsrep/transaction.hpp new file mode 100644 index 00000000..3328c093 --- /dev/null +++ b/wsrep-lib/include/wsrep/transaction.hpp @@ -0,0 +1,325 @@ +/* + * Copyright (C) 2018 Codership Oy <info@codership.com> + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file transaction.hpp */ +#ifndef WSREP_TRANSACTION_HPP +#define WSREP_TRANSACTION_HPP + +#include "provider.hpp" +#include "transaction_id.hpp" +#include "streaming_context.hpp" +#include "lock.hpp" +#include "sr_key_set.hpp" +#include "buffer.hpp" +#include "xid.hpp" + +#include <iosfwd> +#include <vector> + +namespace wsrep +{ + class client_service; + class client_state; + class key; + class const_buffer; + class server_service; + + class transaction + { + public: + enum state + { + s_executing, + s_preparing, + s_prepared, + s_certifying, + s_committing, + s_ordered_commit, + s_committed, + s_cert_failed, + s_must_abort, + s_aborting, + s_aborted, + s_must_replay, + s_replaying + }; + static const int n_states = s_replaying + 1; + enum state state() const + { return state_; } + + transaction(wsrep::client_state& client_state); + ~transaction(); + // Accessors + wsrep::transaction_id id() const + { return id_; } + + const wsrep::id& server_id() const + { return server_id_; } + + bool active() const + { return (id_ != wsrep::transaction_id::undefined()); } + + + void state(wsrep::unique_lock<wsrep::mutex>&, enum state); + + // Return true if the certification of the last + // fragment succeeded + bool certified() const { return certified_; } + + wsrep::seqno seqno() const + { + return ws_meta_.seqno(); + } + // Return true if the last fragment was ordered by the + // provider + bool ordered() const + { return (ws_meta_.seqno().is_undefined() == false); } + + /** + * Return true if any fragments have been successfully certified + * for the transaction. + */ + bool is_streaming() const + { + return (streaming_context_.fragments_certified() > 0); + } + + /** + * Return number of fragments certified for current statement. + * + * This counts fragments which have been successfully certified + * since the construction of object or last after_statement() + * call. + * + * @return Number of fragments certified for current statement. + */ + size_t fragments_certified_for_statement() const + { + return fragments_certified_for_statement_; + } + + /** + * Return true if transaction has not generated any changes. + */ + bool is_empty() const + { + return sr_keys_.empty(); + } + + bool is_xa() const + { + return !xid_.is_null(); + } + + void assign_xid(const wsrep::xid& xid); + + const wsrep::xid& xid() const + { + return xid_; + } + + int restore_to_prepared_state(const wsrep::xid& xid); + + int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit); + + void xa_detach(); + + int xa_replay(wsrep::unique_lock<wsrep::mutex>&); + + bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); } + void pa_unsafe(bool pa_unsafe) { + if (pa_unsafe) { + flags(flags() | wsrep::provider::flag::pa_unsafe); + } else { + flags(flags() & ~wsrep::provider::flag::pa_unsafe); + } + } + bool implicit_deps() const { return implicit_deps_; } + void implicit_deps(bool implicit) { implicit_deps_ = implicit; } + + int start_transaction(const wsrep::transaction_id& id); + + int start_transaction(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta); + + int next_fragment(const wsrep::ws_meta& ws_meta); + + void adopt(const transaction& transaction); + void fragment_applied(wsrep::seqno seqno); + + int prepare_for_ordering(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit); + + int assign_read_view(const wsrep::gtid* gtid); + + int append_key(const wsrep::key&); + + int append_data(const wsrep::const_buffer&); + + int after_row(); + + int before_prepare(wsrep::unique_lock<wsrep::mutex>&); + + int after_prepare(wsrep::unique_lock<wsrep::mutex>&); + + int before_commit(); + + int ordered_commit(); + + int after_commit(); + + int before_rollback(); + + int after_rollback(); + + int before_statement(); + + int after_statement(); + int after_statement(wsrep::unique_lock<wsrep::mutex>&); + + void after_command_must_abort(wsrep::unique_lock<wsrep::mutex>&); + + void after_applying(); + + bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock, + wsrep::seqno bf_seqno); + bool total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>&, + wsrep::seqno bf_seqno); + + void clone_for_replay(const wsrep::transaction& other); + + bool bf_aborted() const + { + return (bf_abort_client_state_ != 0); + } + + bool bf_aborted_in_total_order() const + { + return bf_aborted_in_total_order_; + } + + int flags() const + { + return flags_; + } + + // wsrep::mutex& mutex(); + wsrep::ws_handle& ws_handle() { return ws_handle_; } + const wsrep::ws_handle& ws_handle() const { return ws_handle_; } + const wsrep::ws_meta& ws_meta() const { return ws_meta_; } + const wsrep::streaming_context& streaming_context() const + { return streaming_context_; } + wsrep::streaming_context& streaming_context() + { return streaming_context_; } + void adopt_apply_error(wsrep::mutable_buffer& buf) + { + apply_error_buf_ = std::move(buf); + } + const wsrep::mutable_buffer& apply_error() const + { return apply_error_buf_; } + private: + transaction(const transaction&); + transaction operator=(const transaction&); + + wsrep::provider& provider(); + void flags(int flags) { flags_ = flags; } + // Return true if the transaction must abort, is aborting, + // or has been aborted, or has been interrupted by DBMS + // as indicated by client_service::interrupted() call. + // The call will adjust transaction state and set client_state + // error status accordingly. + bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&); + int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false); + int certify_fragment(wsrep::unique_lock<wsrep::mutex>&); + int certify_commit(wsrep::unique_lock<wsrep::mutex>&); + int append_sr_keys_for_commit(); + int release_commit_order(wsrep::unique_lock<wsrep::mutex>&); + void remove_fragments_in_storage_service_scope( + wsrep::unique_lock<wsrep::mutex>&); + void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&); + int replay(wsrep::unique_lock<wsrep::mutex>&); + void xa_replay_common(wsrep::unique_lock<wsrep::mutex>&); + int xa_replay_commit(wsrep::unique_lock<wsrep::mutex>&); + void cleanup(); + void debug_log_state(const char*) const; + void debug_log_key_append(const wsrep::key& key) const; + + wsrep::server_service& server_service_; + wsrep::client_service& client_service_; + wsrep::client_state& client_state_; + wsrep::id server_id_; + wsrep::transaction_id id_; + enum state state_; + std::vector<enum state> state_hist_; + enum state bf_abort_state_; + enum wsrep::provider::status bf_abort_provider_status_; + int bf_abort_client_state_; + bool bf_aborted_in_total_order_; + wsrep::ws_handle ws_handle_; + wsrep::ws_meta ws_meta_; + int flags_; + bool implicit_deps_; + bool certified_; + size_t fragments_certified_for_statement_; + wsrep::streaming_context streaming_context_; + wsrep::sr_key_set sr_keys_; + wsrep::mutable_buffer apply_error_buf_; + wsrep::xid xid_; + bool streaming_rollback_in_progress_; + /* This flag tells that the transaction has become immutable + against BF aborts. Ideally this would be deduced from transaction + state, i.e. s_ordered_commit or s_committed, but the current + version of wsrep-lib changes the transaction state to + s_ordered_commit too late. Fixing this appears to require + too many changes to application using the lib, so boolean flag + must do. */ + bool is_bf_immutable_; + }; + + static inline const char* to_c_string(enum wsrep::transaction::state state) + { + switch (state) + { + case wsrep::transaction::s_executing: return "executing"; + case wsrep::transaction::s_preparing: return "preparing"; + case wsrep::transaction::s_prepared: return "prepared"; + case wsrep::transaction::s_certifying: return "certifying"; + case wsrep::transaction::s_committing: return "committing"; + case wsrep::transaction::s_ordered_commit: return "ordered_commit"; + case wsrep::transaction::s_committed: return "committed"; + case wsrep::transaction::s_cert_failed: return "cert_failed"; + case wsrep::transaction::s_must_abort: return "must_abort"; + case wsrep::transaction::s_aborting: return "aborting"; + case wsrep::transaction::s_aborted: return "aborted"; + case wsrep::transaction::s_must_replay: return "must_replay"; + case wsrep::transaction::s_replaying: return "replaying"; + } + return "unknown"; + } + static inline std::string to_string(enum wsrep::transaction::state state) + { + return to_c_string(state); + } + + std::ostream& operator<<(std::ostream& os, enum wsrep::transaction::state); + +} + +#endif // WSREP_TRANSACTION_HPP |