summaryrefslogtreecommitdiffstats
path: root/storage/tokudb/PerconaFT/ft/txn
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/txn')
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/roll.cc692
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc258
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback-apply.h47
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.cc257
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.h80
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback.cc334
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback.h145
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.cc109
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.h63
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn.cc754
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn.h362
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn_child_manager.cc143
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn_child_manager.h66
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn_manager.cc1040
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn_manager.h223
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn_state.h50
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/xids.cc247
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/xids.h116
18 files changed, 4986 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/ft/txn/roll.cc b/storage/tokudb/PerconaFT/ft/txn/roll.cc
new file mode 100644
index 00000000..7228de06
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/roll.cc
@@ -0,0 +1,692 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+/* rollback and rollforward routines. */
+
+#include <memory>
+#include "ft/ft-ops.h"
+#include "ft/ft.h"
+#include "ft/log_header.h"
+#include "ft/logger/log-internal.h"
+#include "ft/txn/rollback-apply.h"
+#include "ft/txn/xids.h"
+
+// functionality provided by roll.c is exposed by an autogenerated
+// header file, logheader.h
+//
+// this (poorly) explains the absence of "roll.h"
+
+// these flags control whether or not we send commit messages for
+// various operations
+
+// When a transaction is committed, should we send a FT_COMMIT message
+// for each FT_INSERT message sent earlier by the transaction?
+#define TOKU_DO_COMMIT_CMD_INSERT 0
+
+// When a transaction is committed, should we send a FT_COMMIT message
+// for each FT_DELETE_ANY message sent earlier by the transaction?
+#define TOKU_DO_COMMIT_CMD_DELETE 1
+
+// When a transaction is committed, should we send a FT_COMMIT message
+// for each FT_UPDATE message sent earlier by the transaction?
+#define TOKU_DO_COMMIT_CMD_UPDATE 0
+
+int
+toku_commit_fdelete (FILENUM filenum,
+ TOKUTXN txn,
+ LSN UU(oplsn)) //oplsn is the lsn of the commit
+{
+ int r;
+ CACHEFILE cf;
+ CACHETABLE ct = txn->logger->ct;
+
+ // Try to get the cachefile for this filenum. A missing file on recovery
+ // is not an error, but a missing file outside of recovery is.
+ r = toku_cachefile_of_filenum(ct, filenum, &cf);
+ if (r == ENOENT) {
+ assert(txn->for_recovery);
+ r = 0;
+ goto done;
+ }
+ assert_zero(r);
+
+ // bug fix for #4718
+ // bug was introduced in with fix for #3590
+ // Before Maxwell (and fix for #3590),
+ // the recovery log was fsynced after the xcommit was loged but
+ // before we processed rollback entries and before we released
+ // the row locks (in the lock tree). Due to performance concerns,
+ // the fsync was moved to after the release of row locks, which comes
+ // after processing rollback entries. As a result, we may be unlinking a file
+ // here as part of a transactoin that may abort if we do not fsync the log.
+ // So, we fsync the log here.
+ if (txn->logger) {
+ toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
+ }
+
+ // Mark the cachefile as unlink on close. There are two ways for close
+ // to be eventually called on the cachefile:
+ //
+ // - when this txn completes, it will release a reference on the
+ // ft and close it, UNLESS it was pinned by checkpoint
+ // - if the cf was pinned by checkpoint, an unpin will release the
+ // final reference and call close. it must be the final reference
+ // since this txn has exclusive access to dictionary (by the
+ // directory row lock for its dname) and we would not get this
+ // far if there were other live handles.
+ toku_cachefile_unlink_on_close(cf);
+done:
+ return r;
+}
+
+int
+toku_rollback_fdelete (FILENUM UU(filenum),
+ TOKUTXN UU(txn),
+ LSN UU(oplsn)) //oplsn is the lsn of the abort
+{
+ //Rolling back an fdelete is an no-op.
+ return 0;
+}
+
+int
+toku_commit_fcreate (FILENUM UU(filenum),
+ BYTESTRING UU(bs_fname),
+ TOKUTXN UU(txn),
+ LSN UU(oplsn))
+{
+ return 0;
+}
+
+int
+toku_rollback_fcreate (FILENUM filenum,
+ BYTESTRING UU(bs_fname),
+ TOKUTXN txn,
+ LSN UU(oplsn))
+{
+ int r;
+ CACHEFILE cf;
+ CACHETABLE ct = txn->logger->ct;
+
+ // Try to get the cachefile for this filenum. A missing file on recovery
+ // is not an error, but a missing file outside of recovery is.
+ r = toku_cachefile_of_filenum(ct, filenum, &cf);
+ if (r == ENOENT) {
+ r = 0;
+ goto done;
+ }
+ assert_zero(r);
+
+ // Mark the cachefile as unlink on close. There are two ways for close
+ // to be eventually called on the cachefile:
+ //
+ // - when this txn completes, it will release a reference on the
+ // ft and close it, UNLESS it was pinned by checkpoint
+ // - if the cf was pinned by checkpoint, an unpin will release the
+ // final reference and call close. it must be the final reference
+ // since this txn has exclusive access to dictionary (by the
+ // directory row lock for its dname) and we would not get this
+ // far if there were other live handles.
+ toku_cachefile_unlink_on_close(cf);
+ toku_cachefile_skip_log_recover_on_close(cf);
+done:
+ return 0;
+}
+
+int toku_commit_frename(BYTESTRING /* old_name */,
+ BYTESTRING /* new_iname */,
+ TOKUTXN /* txn */,
+ LSN UU(oplsn)) {
+ return 0;
+}
+
+int toku_rollback_frename(BYTESTRING old_iname,
+ BYTESTRING new_iname,
+ TOKUTXN txn,
+ LSN UU(oplsn)) {
+ assert(txn);
+ assert(txn->logger);
+ assert(txn->logger->ct);
+
+ CACHETABLE cachetable = txn->logger->ct;
+
+ toku_struct_stat stat;
+ bool old_exist = true;
+ bool new_exist = true;
+
+ std::unique_ptr<char[], decltype(&toku_free)> old_iname_full(
+ toku_cachetable_get_fname_in_cwd(cachetable, old_iname.data),
+ &toku_free);
+ std::unique_ptr<char[], decltype(&toku_free)> new_iname_full(
+ toku_cachetable_get_fname_in_cwd(cachetable, new_iname.data),
+ &toku_free);
+
+ if (toku_stat(old_iname_full.get(), &stat, toku_uninstrumented) == -1) {
+ if (ENOENT == errno)
+ old_exist = false;
+ else
+ return 1;
+ }
+
+ if (toku_stat(new_iname_full.get(), &stat, toku_uninstrumented) == -1) {
+ if (ENOENT == errno || ENAMETOOLONG == errno)
+ new_exist = false;
+ else
+ return 1;
+ }
+
+ // Both old and new files can exist if:
+ // - rename() is not completed
+ // - fcreate was replayed during recovery
+ // 'Stalled cachefiles' container cachefile_list::m_stale_fileid contains
+ // closed but not yet evicted cachefiles and the key of this container is
+ // fs-dependent file id - (device id, inode number) pair. To preserve the
+ // new cachefile
+ // file's id and keep it in 'stalled cachefiles' container the old file is
+ // removed
+ // and the new file is renamed.
+ if (old_exist && new_exist &&
+ (toku_os_delete(old_iname_full.get()) == -1 ||
+ toku_os_rename(new_iname_full.get(), old_iname_full.get()) == -1 ||
+ toku_fsync_directory(new_iname_full.get()) == -1 ||
+ toku_fsync_directory(old_iname_full.get()) == -1))
+ return 1;
+
+ if (!old_exist && new_exist &&
+ (!toku_create_subdirs_if_needed(old_iname_full.get()) ||
+ toku_os_rename(new_iname_full.get(), old_iname_full.get()) == -1 ||
+ toku_fsync_directory(new_iname_full.get()) == -1 ||
+ toku_fsync_directory(old_iname_full.get()) == -1))
+ return 1;
+
+ // it's ok if both files do not exist on recovery
+ if (!old_exist && !new_exist)
+ assert(txn->for_recovery);
+
+ CACHEFILE cf;
+ int r = toku_cachefile_of_iname_in_env(cachetable, new_iname.data, &cf);
+ if (r != ENOENT) {
+ char *old_fname_in_cf = toku_cachefile_fname_in_env(cf);
+ toku_cachefile_set_fname_in_env(cf, toku_xstrdup(old_iname.data));
+ toku_free(old_fname_in_cf);
+ // There is at least one case when fclose logging cause error:
+ // 1) start transaction
+ // 2) create ft 'a'(write "fcreate" in recovery log)
+ // 3) rename ft 'a' to 'b'(write "frename" in recovery log)
+ // 4) abort transaction:
+ // a) rollback rename ft (renames 'b' to 'a')
+ // b) rollback create ft (removes 'a'):
+ // invokes toku_cachefile_unlink_on_close - lazy unlink on file
+ // close,
+ // it just sets corresponding flag in cachefile object
+ // c) write "unlink" for 'a' in recovery log
+ // (when transaction is aborted all locks are released,
+ // when file lock is released the file is closed and unlinked if
+ // corresponding flag is set in cachefile object)
+ // 5) crash
+ //
+ // After this we have the following records in recovery log:
+ // - create ft 'a',
+ // - rename 'a' to 'b',
+ // - unlink 'a'
+ //
+ // On recovery:
+ // - create 'a'
+ // - rename 'a' to 'b'
+ // - unlink 'a' - as 'a' file does not exist we have crash on assert
+ // here
+ //
+ // There is no need to write "unlink" in recovery log in (4a) because
+ // 'a' will be removed
+ // on transaction rollback on recovery.
+ toku_cachefile_skip_log_recover_on_close(cf);
+ }
+
+ return 0;
+}
+
+int find_ft_from_filenum (const FT &ft, const FILENUM &filenum);
+int find_ft_from_filenum (const FT &ft, const FILENUM &filenum) {
+ FILENUM thisfnum = toku_cachefile_filenum(ft->cf);
+ if (thisfnum.fileid<filenum.fileid) return -1;
+ if (thisfnum.fileid>filenum.fileid) return +1;
+ return 0;
+}
+
+// Input arg reset_root_xid_that_created true means that this operation has changed the definition of this dictionary.
+// (Example use is for schema change committed with txn that inserted cmdupdatebroadcast message.)
+// The oplsn argument is ZERO_LSN for normal operation. When this function is called for recovery, it has the LSN of
+// the operation (insert, delete, update, etc).
+static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data, TOKUTXN txn, LSN oplsn,
+ bool reset_root_xid_that_created) {
+ int r = 0;
+ //printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
+ FT ft = nullptr;
+ r = txn->open_fts.find_zero<FILENUM, find_ft_from_filenum>(filenum, &ft, NULL);
+ if (r == DB_NOTFOUND) {
+ assert(txn->for_recovery);
+ r = 0;
+ goto done;
+ }
+ assert(r==0);
+
+ if (oplsn.lsn != 0) { // if we are executing the recovery algorithm
+ LSN treelsn = toku_ft_checkpoint_lsn(ft);
+ if (oplsn.lsn <= treelsn.lsn) { // if operation was already applied to tree ...
+ r = 0; // ... do not apply it again.
+ goto done;
+ }
+ }
+
+ DBT key_dbt,data_dbt;
+ XIDS xids;
+ xids = toku_txn_get_xids(txn);
+ {
+ const DBT *kdbt = key.len > 0 ? toku_fill_dbt(&key_dbt, key.data, key.len) :
+ toku_init_dbt(&key_dbt);
+ const DBT *vdbt = data ? toku_fill_dbt(&data_dbt, data->data, data->len) :
+ toku_init_dbt(&data_dbt);
+ ft_msg msg(kdbt, vdbt, type, ZERO_MSN, xids);
+
+ TXN_MANAGER txn_manager = toku_logger_get_txn_manager(txn->logger);
+ txn_manager_state txn_state_for_gc(txn_manager);
+
+ TXNID oldest_referenced_xid_estimate = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
+ txn_gc_info gc_info(&txn_state_for_gc,
+ oldest_referenced_xid_estimate,
+ // no messages above us, we can implicitly promote uxrs based on this xid
+ oldest_referenced_xid_estimate,
+ !txn->for_recovery);
+ toku_ft_root_put_msg(ft, msg, &gc_info);
+ if (reset_root_xid_that_created) {
+ TXNID new_root_xid_that_created = toku_xids_get_outermost_xid(xids);
+ toku_reset_root_xid_that_created(ft, new_root_xid_that_created);
+ }
+ }
+done:
+ return r;
+}
+
+
+static int do_nothing_with_filenum(TOKUTXN UU(txn), FILENUM UU(filenum)) {
+ return 0;
+}
+
+
+int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING UU(key), TOKUTXN txn, LSN UU(oplsn)) {
+#if TOKU_DO_COMMIT_CMD_INSERT
+ return do_insertion (FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false);
+#else
+ return do_nothing_with_filenum(txn, filenum);
+#endif
+}
+
+int
+toku_rollback_cmdinsert (FILENUM filenum,
+ BYTESTRING key,
+ TOKUTXN txn,
+ LSN oplsn)
+{
+ return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false);
+}
+
+int
+toku_commit_cmdupdate(FILENUM filenum,
+ BYTESTRING UU(key),
+ TOKUTXN txn,
+ LSN UU(oplsn))
+{
+#if TOKU_DO_COMMIT_CMD_UPDATE
+ return do_insertion(FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false);
+#else
+ return do_nothing_with_filenum(txn, filenum);
+#endif
+}
+
+int
+toku_rollback_cmdupdate(FILENUM filenum,
+ BYTESTRING key,
+ TOKUTXN txn,
+ LSN oplsn)
+{
+ return do_insertion(FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false);
+}
+
+int
+toku_commit_cmdupdatebroadcast(FILENUM filenum,
+ bool is_resetting_op,
+ TOKUTXN txn,
+ LSN oplsn)
+{
+ // if is_resetting_op, reset root_xid_that_created in
+ // relevant ft.
+ bool reset_root_xid_that_created = (is_resetting_op ? true : false);
+ const enum ft_msg_type msg_type = (is_resetting_op
+ ? FT_COMMIT_BROADCAST_ALL
+ : FT_COMMIT_BROADCAST_TXN);
+ BYTESTRING nullkey = { 0, NULL };
+ return do_insertion(msg_type, filenum, nullkey, 0, txn, oplsn, reset_root_xid_that_created);
+}
+
+int
+toku_rollback_cmdupdatebroadcast(FILENUM filenum,
+ bool UU(is_resetting_op),
+ TOKUTXN txn,
+ LSN oplsn)
+{
+ BYTESTRING nullkey = { 0, NULL };
+ return do_insertion(FT_ABORT_BROADCAST_TXN, filenum, nullkey, 0, txn, oplsn, false);
+}
+
+int
+toku_commit_cmddelete (FILENUM filenum,
+ BYTESTRING key,
+ TOKUTXN txn,
+ LSN oplsn)
+{
+#if TOKU_DO_COMMIT_CMD_DELETE
+ return do_insertion (FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false);
+#else
+ key = key; oplsn = oplsn;
+ return do_nothing_with_filenum(txn, filenum);
+#endif
+}
+
+int
+toku_rollback_cmddelete (FILENUM filenum,
+ BYTESTRING key,
+ TOKUTXN txn,
+ LSN oplsn)
+{
+ return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false);
+}
+
+static int
+toku_apply_rollinclude (TXNID_PAIR xid,
+ uint64_t num_nodes,
+ BLOCKNUM spilled_head,
+ BLOCKNUM spilled_tail,
+ TOKUTXN txn,
+ LSN oplsn,
+ apply_rollback_item func) {
+ int r = 0;
+ struct roll_entry *item;
+
+ BLOCKNUM next_log = spilled_tail;
+ uint64_t last_sequence = num_nodes;
+
+ bool found_head = false;
+ assert(next_log.b != ROLLBACK_NONE.b);
+ while (next_log.b != ROLLBACK_NONE.b) {
+ //pin log
+ ROLLBACK_LOG_NODE log;
+ toku_get_and_pin_rollback_log(txn, next_log, &log);
+ toku_rollback_verify_contents(log, xid, last_sequence - 1);
+ last_sequence = log->sequence;
+
+ toku_maybe_prefetch_previous_rollback_log(txn, log);
+
+ while ((item=log->newest_logentry)) {
+ log->newest_logentry = item->prev;
+ r = func(txn, item, oplsn);
+ if (r!=0) return r;
+ }
+ if (next_log.b == spilled_head.b) {
+ assert(!found_head);
+ found_head = true;
+ assert(log->sequence == 0);
+ }
+ next_log = log->previous;
+ {
+ //Clean up transaction structure to prevent
+ //toku_txn_close from double-freeing
+ spilled_tail = next_log;
+ if (found_head) {
+ assert(next_log.b == ROLLBACK_NONE.b);
+ spilled_head = next_log;
+ }
+ }
+ toku_rollback_log_unpin_and_remove(txn, log);
+ }
+ return r;
+}
+
+int
+toku_commit_rollinclude (TXNID_PAIR xid,
+ uint64_t num_nodes,
+ BLOCKNUM spilled_head,
+ BLOCKNUM spilled_tail,
+ TOKUTXN txn,
+ LSN oplsn) {
+ int r;
+ r = toku_apply_rollinclude(xid, num_nodes,
+ spilled_head,
+ spilled_tail,
+ txn, oplsn,
+ toku_commit_rollback_item);
+ return r;
+}
+
+int
+toku_rollback_rollinclude (TXNID_PAIR xid,
+ uint64_t num_nodes,
+ BLOCKNUM spilled_head,
+ BLOCKNUM spilled_tail,
+ TOKUTXN txn,
+ LSN oplsn) {
+ int r;
+ r = toku_apply_rollinclude(xid, num_nodes,
+ spilled_head,
+ spilled_tail,
+ txn, oplsn,
+ toku_abort_rollback_item);
+ return r;
+}
+
+int
+toku_commit_load (FILENUM old_filenum,
+ BYTESTRING UU(new_iname),
+ TOKUTXN txn,
+ LSN UU(oplsn))
+{
+ int r;
+ CACHEFILE old_cf;
+ CACHETABLE ct = txn->logger->ct;
+
+ // To commit a dictionary load, we delete the old file
+ //
+ // Try to get the cachefile for the old filenum. A missing file on recovery
+ // is not an error, but a missing file outside of recovery is.
+ r = toku_cachefile_of_filenum(ct, old_filenum, &old_cf);
+ if (r == ENOENT) {
+ invariant(txn->for_recovery);
+ r = 0;
+ goto done;
+ }
+ lazy_assert(r == 0);
+
+ // bug fix for #4718
+ // bug was introduced in with fix for #3590
+ // Before Maxwell (and fix for #3590),
+ // the recovery log was fsynced after the xcommit was loged but
+ // before we processed rollback entries and before we released
+ // the row locks (in the lock tree). Due to performance concerns,
+ // the fsync was moved to after the release of row locks, which comes
+ // after processing rollback entries. As a result, we may be unlinking a file
+ // here as part of a transactoin that may abort if we do not fsync the log.
+ // So, we fsync the log here.
+ if (txn->logger) {
+ toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
+ }
+
+ // TODO: Zardosht
+ // Explain why this condition is valid, because I forget.
+ if (!toku_cachefile_is_unlink_on_close(old_cf)) {
+ toku_cachefile_unlink_on_close(old_cf);
+ }
+done:
+ return r;
+}
+
+int
+toku_rollback_load (FILENUM UU(old_filenum),
+ BYTESTRING new_iname,
+ TOKUTXN txn,
+ LSN UU(oplsn))
+{
+ int r;
+ CACHEFILE new_cf;
+ CACHETABLE ct = txn->logger->ct;
+
+ // To rollback a dictionary load, we delete the new file.
+ // Try to get the cachefile for the new fname.
+ char *fname_in_env = fixup_fname(&new_iname);
+ r = toku_cachefile_of_iname_in_env(ct, fname_in_env, &new_cf);
+ if (r == ENOENT) {
+ // It's possible the new iname was never created, so just try to
+ // unlink it if it's there and ignore the error if it's not.
+ char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(ct, fname_in_env);
+ r = unlink(fname_in_cwd);
+ assert(r == 0 || get_error_errno() == ENOENT);
+ toku_free(fname_in_cwd);
+ r = 0;
+ } else {
+ assert_zero(r);
+ toku_cachefile_unlink_on_close(new_cf);
+ }
+ toku_free(fname_in_env);
+ return r;
+}
+
+//2954
+int
+toku_commit_hot_index (FILENUMS UU(hot_index_filenums),
+ TOKUTXN UU(txn),
+ LSN UU(oplsn))
+{
+ // nothing
+ return 0;
+}
+
+int
+toku_rollback_hot_index (FILENUMS UU(hot_index_filenums),
+ TOKUTXN UU(txn),
+ LSN UU(oplsn))
+{
+ return 0;
+}
+
+int
+toku_commit_dictionary_redirect (FILENUM UU(old_filenum),
+ FILENUM UU(new_filenum),
+ TOKUTXN UU(txn),
+ LSN UU(oplsn)) //oplsn is the lsn of the commit
+{
+ //Redirect only has meaning during normal operation (NOT during recovery).
+ if (!txn->for_recovery) {
+ //NO-OP
+ }
+ return 0;
+}
+
+int
+toku_rollback_dictionary_redirect (FILENUM old_filenum,
+ FILENUM new_filenum,
+ TOKUTXN txn,
+ LSN UU(oplsn)) //oplsn is the lsn of the abort
+{
+ int r = 0;
+ //Redirect only has meaning during normal operation (NOT during recovery).
+ if (!txn->for_recovery) {
+ CACHEFILE new_cf = NULL;
+ r = toku_cachefile_of_filenum(txn->logger->ct, new_filenum, &new_cf);
+ assert(r == 0);
+ FT CAST_FROM_VOIDP(new_ft, toku_cachefile_get_userdata(new_cf));
+
+ CACHEFILE old_cf = NULL;
+ r = toku_cachefile_of_filenum(txn->logger->ct, old_filenum, &old_cf);
+ assert(r == 0);
+ FT CAST_FROM_VOIDP(old_ft, toku_cachefile_get_userdata(old_cf));
+
+ //Redirect back from new to old.
+ r = toku_dictionary_redirect_abort(old_ft, new_ft, txn);
+ assert(r==0);
+ }
+ return r;
+}
+
+int
+toku_commit_change_fdescriptor(FILENUM filenum,
+ BYTESTRING UU(old_descriptor),
+ TOKUTXN txn,
+ LSN UU(oplsn))
+{
+ return do_nothing_with_filenum(txn, filenum);
+}
+
+int
+toku_rollback_change_fdescriptor(FILENUM filenum,
+ BYTESTRING old_descriptor,
+ TOKUTXN txn,
+ LSN UU(oplsn))
+{
+ CACHEFILE cf;
+ int r;
+ r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
+ if (r == ENOENT) { //Missing file on recovered transaction is not an error
+ assert(txn->for_recovery);
+ r = 0;
+ goto done;
+ }
+ // file must be open, because the txn that created it opened it and
+ // noted it,
+ assert(r == 0);
+
+ FT ft;
+ ft = NULL;
+ r = txn->open_fts.find_zero<FILENUM, find_ft_from_filenum>(filenum, &ft, NULL);
+ assert(r == 0);
+
+ DESCRIPTOR_S d;
+ toku_fill_dbt(&d.dbt, old_descriptor.data, old_descriptor.len);
+ toku_ft_update_descriptor(ft, &d);
+done:
+ return r;
+}
+
+
+
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc
new file mode 100644
index 00000000..0f19c445
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc
@@ -0,0 +1,258 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "ft/logger/log-internal.h"
+#include "ft/txn/rollback-apply.h"
+
+static void poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
+ if (txn->progress_poll_fun) {
+ TOKU_TXN_PROGRESS_S progress = {
+ .entries_total = txn->roll_info.num_rollentries,
+ .entries_processed = txn->roll_info.num_rollentries_processed,
+ .is_commit = is_commit,
+ .stalled_on_checkpoint = stall_for_checkpoint};
+ txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra);
+ }
+}
+
+int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
+ int r=0;
+ rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn);
+ txn->roll_info.num_rollentries_processed++;
+ if (txn->roll_info.num_rollentries_processed % 1024 == 0) {
+ poll_txn_progress_function(txn, true, false);
+ }
+ return r;
+}
+
+int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
+ int r=0;
+ rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn);
+ txn->roll_info.num_rollentries_processed++;
+ if (txn->roll_info.num_rollentries_processed % 1024 == 0) {
+ poll_txn_progress_function(txn, false, false);
+ }
+ return r;
+}
+
+int note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child);
+int note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) {
+ TOKUTXN parent = child->parent;
+ toku_txn_maybe_note_ft(parent, ft);
+ return 0;
+}
+
+static int apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
+ int r = 0;
+ // do the commit/abort calls and free everything
+ // we do the commit/abort calls in reverse order too.
+ struct roll_entry *item;
+ //printf("%s:%d abort\n", __FILE__, __LINE__);
+
+ BLOCKNUM next_log = ROLLBACK_NONE;
+
+ bool is_current = false;
+ if (txn_has_current_rollback_log(txn)) {
+ next_log = txn->roll_info.current_rollback;
+ is_current = true;
+ }
+ else if (txn_has_spilled_rollback_logs(txn)) {
+ next_log = txn->roll_info.spilled_rollback_tail;
+ }
+
+ uint64_t last_sequence = txn->roll_info.num_rollback_nodes;
+ bool found_head = false;
+ while (next_log.b != ROLLBACK_NONE.b) {
+ ROLLBACK_LOG_NODE log;
+ //pin log
+ toku_get_and_pin_rollback_log(txn, next_log, &log);
+ toku_rollback_verify_contents(log, txn->txnid, last_sequence - 1);
+
+ toku_maybe_prefetch_previous_rollback_log(txn, log);
+
+ last_sequence = log->sequence;
+ if (func) {
+ while ((item=log->newest_logentry)) {
+ log->newest_logentry = item->prev;
+ r = func(txn, item, lsn);
+ if (r!=0) return r;
+ }
+ }
+ if (next_log.b == txn->roll_info.spilled_rollback_head.b) {
+ assert(!found_head);
+ found_head = true;
+ assert(log->sequence == 0);
+ }
+ next_log = log->previous;
+ {
+ //Clean up transaction structure to prevent
+ //toku_txn_close from double-freeing
+ if (is_current) {
+ txn->roll_info.current_rollback = ROLLBACK_NONE;
+ is_current = false;
+ }
+ else {
+ txn->roll_info.spilled_rollback_tail = next_log;
+ }
+ if (found_head) {
+ assert(next_log.b == ROLLBACK_NONE.b);
+ txn->roll_info.spilled_rollback_head = next_log;
+ }
+ }
+ bool give_back = false;
+ // each txn tries to give back at most one rollback log node
+ // to the cache.
+ if (next_log.b == ROLLBACK_NONE.b) {
+ give_back = txn->logger->rollback_cache.give_rollback_log_node(
+ txn,
+ log
+ );
+ }
+ if (!give_back) {
+ toku_rollback_log_unpin_and_remove(txn, log);
+ }
+ }
+ return r;
+}
+
+//Commit each entry in the rollback log.
+//If the transaction has a parent, it just promotes its information to its parent.
+int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
+ int r=0;
+ if (txn->parent!=0) {
+ // First we must put a rollinclude entry into the parent if we spilled
+
+ if (txn_has_spilled_rollback_logs(txn)) {
+ uint64_t num_nodes = txn->roll_info.num_rollback_nodes;
+ if (txn_has_current_rollback_log(txn)) {
+ num_nodes--; //Don't count the in-progress rollback log.
+ }
+ toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid, num_nodes,
+ txn->roll_info.spilled_rollback_head,
+ txn->roll_info.spilled_rollback_tail);
+ //Remove ownership from child.
+ txn->roll_info.spilled_rollback_head = ROLLBACK_NONE;
+ txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE;
+ }
+ // if we're committing a child rollback, put its entries into the parent
+ // by pinning both child and parent and then linking the child log entry
+ // list to the end of the parent log entry list.
+ if (txn_has_current_rollback_log(txn)) {
+ //Pin parent log
+ toku_txn_lock(txn->parent);
+ ROLLBACK_LOG_NODE parent_log;
+ toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
+
+ //Pin child log
+ ROLLBACK_LOG_NODE child_log;
+ toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &child_log);
+ toku_rollback_verify_contents(child_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
+
+ // Append the list to the front of the parent.
+ if (child_log->oldest_logentry) {
+ // There are some entries, so link them in.
+ parent_log->dirty = true;
+ child_log->oldest_logentry->prev = parent_log->newest_logentry;
+ if (!parent_log->oldest_logentry) {
+ parent_log->oldest_logentry = child_log->oldest_logentry;
+ }
+ parent_log->newest_logentry = child_log->newest_logentry;
+ parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount;
+ txn->parent->roll_info.rollentry_raw_count += txn->roll_info.rollentry_raw_count;
+ child_log->rollentry_resident_bytecount = 0;
+ }
+ if (parent_log->oldest_logentry==NULL) {
+ parent_log->oldest_logentry = child_log->oldest_logentry;
+ }
+ child_log->newest_logentry = child_log->oldest_logentry = 0;
+ // Put all the memarena data into the parent.
+ if (child_log->rollentry_arena.total_size_in_use() > 0) {
+ // If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
+ child_log->rollentry_arena.move_memory(&parent_log->rollentry_arena);
+ }
+ // each txn tries to give back at most one rollback log node
+ // to the cache. All other rollback log nodes for this child
+ // transaction are included in the parent's rollback log,
+ // so this is the only node we can give back to the cache
+ bool give_back = txn->logger->rollback_cache.give_rollback_log_node(
+ txn,
+ child_log
+ );
+ if (!give_back) {
+ toku_rollback_log_unpin_and_remove(txn, child_log);
+ }
+ txn->roll_info.current_rollback = ROLLBACK_NONE;
+
+ toku_maybe_spill_rollbacks(txn->parent, parent_log);
+ toku_rollback_log_unpin(txn->parent, parent_log);
+ assert(r == 0);
+ toku_txn_unlock(txn->parent);
+ }
+
+ // Note the open FTs, the omts must be merged
+ r = txn->open_fts.iterate<struct tokutxn, note_ft_used_in_txns_parent>(txn);
+ assert(r==0);
+
+ //If this transaction needs an fsync (if it commits)
+ //save that in the parent. Since the commit really happens in the root txn.
+ toku_txn_lock(txn->parent);
+ txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
+ txn->parent->roll_info.num_rollentries += txn->roll_info.num_rollentries;
+ toku_txn_unlock(txn->parent);
+ } else {
+ r = apply_txn(txn, lsn, toku_commit_rollback_item);
+ assert(r==0);
+ }
+
+ return r;
+}
+
+int toku_rollback_abort(TOKUTXN txn, LSN lsn) {
+ int r;
+ r = apply_txn(txn, lsn, toku_abort_rollback_item);
+ assert(r==0);
+ return r;
+}
+
+int toku_rollback_discard(TOKUTXN txn) {
+ txn->roll_info.current_rollback = ROLLBACK_NONE;
+ txn->roll_info.spilled_rollback_head = ROLLBACK_NONE;
+ txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE;
+ return 0;
+}
+
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-apply.h b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.h
new file mode 100644
index 00000000..bf87cd29
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.h
@@ -0,0 +1,47 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, LSN lsn);
+int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
+int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
+
+int toku_rollback_commit(TOKUTXN txn, LSN lsn);
+int toku_rollback_abort(TOKUTXN txn, LSN lsn);
+int toku_rollback_discard(TOKUTXN txn);
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.cc b/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.cc
new file mode 100644
index 00000000..08d7c887
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.cc
@@ -0,0 +1,257 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "portability/memory.h"
+#include "portability/toku_portability.h"
+
+#include "ft/serialize/block_table.h"
+#include "ft/ft-internal.h"
+#include "ft/serialize/ft_node-serialize.h"
+#include "ft/txn/rollback.h"
+#include "ft/txn/rollback-ct-callbacks.h"
+
+#include "util/memarena.h"
+
+// Address used as a sentinel. Otherwise unused.
+static struct serialized_rollback_log_node cloned_rollback;
+
+// Cleanup the rollback memory
+static void
+rollback_log_destroy(ROLLBACK_LOG_NODE log) {
+ make_rollback_log_empty(log);
+ toku_free(log);
+}
+
+// flush an ununused log to disk, by allocating a size 0 blocknum in
+// the blocktable
+static void toku_rollback_flush_unused_log(ROLLBACK_LOG_NODE log,
+ BLOCKNUM logname,
+ int fd,
+ FT ft,
+ bool write_me,
+ bool keep_me,
+ bool for_checkpoint,
+ bool is_clone) {
+ if (write_me) {
+ DISKOFF offset;
+ ft->blocktable.realloc_on_disk(
+ logname, 0, &offset, ft, fd, for_checkpoint);
+ }
+ if (!keep_me && !is_clone) {
+ toku_free(log);
+ }
+}
+
+// flush a used log to disk by serializing and writing the node out
+static void
+toku_rollback_flush_used_log (
+ ROLLBACK_LOG_NODE log,
+ SERIALIZED_ROLLBACK_LOG_NODE serialized,
+ int fd,
+ FT ft,
+ bool write_me,
+ bool keep_me,
+ bool for_checkpoint,
+ bool is_clone
+ )
+{
+
+ if (write_me) {
+ int r = toku_serialize_rollback_log_to(fd, log, serialized, is_clone, ft, for_checkpoint);
+ assert(r == 0);
+ }
+ if (!keep_me) {
+ if (is_clone) {
+ toku_serialized_rollback_log_destroy(serialized);
+ }
+ else {
+ rollback_log_destroy(log);
+ }
+ }
+}
+
+// Write something out. Keep trying even if partial writes occur.
+// On error: Return negative with errno set.
+// On success return nbytes.
+void toku_rollback_flush_callback (
+ CACHEFILE UU(cachefile),
+ int fd,
+ BLOCKNUM logname,
+ void *rollback_v,
+ void** UU(disk_data),
+ void *extraargs,
+ PAIR_ATTR size,
+ PAIR_ATTR* new_size,
+ bool write_me,
+ bool keep_me,
+ bool for_checkpoint,
+ bool is_clone
+ )
+{
+ ROLLBACK_LOG_NODE log = nullptr;
+ SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr;
+ bool is_unused = false;
+ if (is_clone) {
+ is_unused = (rollback_v == &cloned_rollback);
+ CAST_FROM_VOIDP(serialized, rollback_v);
+ }
+ else {
+ CAST_FROM_VOIDP(log, rollback_v);
+ is_unused = rollback_log_is_unused(log);
+ }
+ *new_size = size;
+ FT ft;
+ CAST_FROM_VOIDP(ft, extraargs);
+ if (is_unused) {
+ toku_rollback_flush_unused_log(
+ log,
+ logname,
+ fd,
+ ft,
+ write_me,
+ keep_me,
+ for_checkpoint,
+ is_clone
+ );
+ }
+ else {
+ toku_rollback_flush_used_log(
+ log,
+ serialized,
+ fd,
+ ft,
+ write_me,
+ keep_me,
+ for_checkpoint,
+ is_clone
+ );
+ }
+}
+
+int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash UU(),
+ void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
+ int r;
+ FT CAST_FROM_VOIDP(h, extraargs);
+ assert(h->cf == cachefile);
+ ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
+ r = toku_deserialize_rollback_log_from(fd, logname, result, h);
+ if (r==0) {
+ (*result)->ct_pair = p;
+ *sizep = rollback_memory_size(*result);
+ }
+ return r;
+}
+
+void toku_rollback_pe_est_callback(
+ void* rollback_v,
+ void* UU(disk_data),
+ long* bytes_freed_estimate,
+ enum partial_eviction_cost *cost,
+ void* UU(write_extraargs)
+ )
+{
+ assert(rollback_v != NULL);
+ *bytes_freed_estimate = 0;
+ *cost = PE_CHEAP;
+}
+
+// callback for partially evicting a cachetable entry
+int toku_rollback_pe_callback (
+ void *rollback_v,
+ PAIR_ATTR old_attr,
+ void* UU(extraargs),
+ void (*finalize)(PAIR_ATTR new_attr, void * extra),
+ void *finalize_extra
+ )
+{
+ assert(rollback_v != NULL);
+ finalize(old_attr, finalize_extra);
+ return 0;
+}
+
+// partial fetch is never required for a rollback log node
+bool toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
+ return false;
+}
+
+// a rollback node should never be partial fetched,
+// because we always say it is not required.
+// (pf req callback always returns false)
+int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
+ assert(false);
+ return 0;
+}
+
+// the cleaner thread should never choose a rollback node for cleaning
+int toku_rollback_cleaner_callback (
+ void* UU(ftnode_pv),
+ BLOCKNUM UU(blocknum),
+ uint32_t UU(fullhash),
+ void* UU(extraargs)
+ )
+{
+ assert(false);
+ return 0;
+}
+
+void toku_rollback_clone_callback(
+ void* value_data,
+ void** cloned_value_data,
+ long* clone_size,
+ PAIR_ATTR* new_attr,
+ bool UU(for_checkpoint),
+ void* UU(write_extraargs)
+ )
+{
+ ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
+ SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr;
+ if (!rollback_log_is_unused(log)) {
+ XMALLOC(serialized);
+ toku_serialize_rollback_log_to_memory_uncompressed(log, serialized);
+ *cloned_value_data = serialized;
+ *clone_size = sizeof(struct serialized_rollback_log_node) + serialized->len;
+ }
+ else {
+ *cloned_value_data = &cloned_rollback;
+ *clone_size = sizeof(cloned_rollback);
+ }
+ // clear the dirty bit, because the node has been cloned
+ log->dirty = 0;
+ new_attr->is_valid = false;
+}
+
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.h b/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.h
new file mode 100644
index 00000000..5fedb0e5
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.h
@@ -0,0 +1,80 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+#include "ft/cachetable/cachetable.h"
+
+void toku_rollback_flush_callback(CACHEFILE cachefile, int fd, BLOCKNUM logname, void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool UU(is_clone));
+int toku_rollback_fetch_callback(CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash, void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs);
+void toku_rollback_pe_est_callback(
+ void* rollback_v,
+ void* UU(disk_data),
+ long* bytes_freed_estimate,
+ enum partial_eviction_cost *cost,
+ void* UU(write_extraargs)
+ );
+int toku_rollback_pe_callback (
+ void *rollback_v,
+ PAIR_ATTR old_attr,
+ void* UU(extraargs),
+ void (*finalize)(PAIR_ATTR new_attr, void * extra),
+ void *finalize_extra
+ );
+bool toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) ;
+int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep));
+void toku_rollback_clone_callback(void* value_data, void** cloned_value_data, long* clone_size, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
+
+int toku_rollback_cleaner_callback (
+ void* UU(ftnode_pv),
+ BLOCKNUM UU(blocknum),
+ uint32_t UU(fullhash),
+ void* UU(extraargs)
+ );
+
+static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT ft) {
+ CACHETABLE_WRITE_CALLBACK wc;
+ wc.flush_callback = toku_rollback_flush_callback;
+ wc.pe_est_callback = toku_rollback_pe_est_callback;
+ wc.pe_callback = toku_rollback_pe_callback;
+ wc.cleaner_callback = toku_rollback_cleaner_callback;
+ wc.clone_callback = toku_rollback_clone_callback;
+ wc.checkpoint_complete_callback = nullptr;
+ wc.write_extraargs = ft;
+ return wc;
+}
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback.cc b/storage/tokudb/PerconaFT/ft/txn/rollback.cc
new file mode 100644
index 00000000..105f980d
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback.cc
@@ -0,0 +1,334 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include <toku_stdint.h>
+
+#include "ft/serialize/block_table.h"
+#include "ft/ft.h"
+#include "ft/logger/log-internal.h"
+#include "ft/txn/rollback-ct-callbacks.h"
+
+extern int writing_rollback;
+
+static void rollback_unpin_remove_callback(CACHEKEY* cachekey, bool for_checkpoint, void* extra) {
+ FT CAST_FROM_VOIDP(ft, extra);
+ ft->blocktable.free_blocknum(cachekey, ft, for_checkpoint);
+}
+
+void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
+ int r;
+ CACHEFILE cf = txn->logger->rollback_cachefile;
+ FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
+ r = toku_cachetable_unpin_and_remove (cf, log->ct_pair, rollback_unpin_remove_callback, ft);
+ assert(r == 0);
+}
+
+int
+toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind) {
+ if (xid<xidfind) return -1;
+ if (xid>xidfind) return +1;
+ return 0;
+}
+
+// TODO: fix this name
+// toku_rollback_malloc
+void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
+ return log->rollentry_arena.malloc_from_arena(size);
+}
+
+// TODO: fix this name
+// toku_rollback_memdup
+void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len) {
+ void *r = toku_malloc_in_rollback(log, len);
+ memcpy(r, v, len);
+ return r;
+}
+
+static inline PAIR_ATTR make_rollback_pair_attr(long size) {
+ PAIR_ATTR result={
+ .size = size,
+ .nonleaf_size = 0,
+ .leaf_size = 0,
+ .rollback_size = size,
+ .cache_pressure_size = 0,
+ .is_valid = true
+ };
+ return result;
+}
+
+PAIR_ATTR
+rollback_memory_size(ROLLBACK_LOG_NODE log) {
+ size_t size = sizeof(*log);
+ size += log->rollentry_arena.total_footprint();
+ return make_rollback_pair_attr(size);
+}
+
+static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
+ ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
+ log->ct_pair = p;
+}
+
+//
+// initializes an empty rollback log node
+// Does not touch the blocknum, that is the
+// responsibility of the caller
+//
+void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
+ // Having a txnid set to TXNID_NONE is how we determine if the
+ // rollback log node is empty or in use.
+ log->txnid.parent_id64 = TXNID_NONE;
+ log->txnid.child_id64 = TXNID_NONE;
+
+ log->layout_version = FT_LAYOUT_VERSION;
+ log->layout_version_original = FT_LAYOUT_VERSION;
+ log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
+ log->dirty = true;
+ log->sequence = 0;
+ log->previous = make_blocknum(0);
+ log->oldest_logentry = NULL;
+ log->newest_logentry = NULL;
+ log->rollentry_arena.create(0);
+ log->rollentry_resident_bytecount = 0;
+}
+
+static void rollback_initialize_for_txn(
+ ROLLBACK_LOG_NODE log,
+ TOKUTXN txn,
+ BLOCKNUM previous
+ )
+{
+ log->txnid = txn->txnid;
+ log->sequence = txn->roll_info.num_rollback_nodes++;
+ log->previous = previous;
+ log->oldest_logentry = NULL;
+ log->newest_logentry = NULL;
+ log->rollentry_arena.create(1024);
+ log->rollentry_resident_bytecount = 0;
+ log->dirty = true;
+}
+
+// TODO: fix this name
+void make_rollback_log_empty(ROLLBACK_LOG_NODE log) {
+ log->rollentry_arena.destroy();
+ rollback_empty_log_init(log);
+}
+
+// create and pin a new rollback log node. chain it to the other rollback nodes
+// by providing a previous blocknum and assigning the new rollback log
+// node the next sequence number
+static void rollback_log_create (
+ TOKUTXN txn,
+ BLOCKNUM previous,
+ ROLLBACK_LOG_NODE *result
+ )
+{
+ writing_rollback++;
+ ROLLBACK_LOG_NODE XMALLOC(log);
+ rollback_empty_log_init(log);
+
+ CACHEFILE cf = txn->logger->rollback_cachefile;
+ FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
+ rollback_initialize_for_txn(log, txn, previous);
+ ft->blocktable.allocate_blocknum(&log->blocknum, ft);
+ const uint32_t hash = toku_cachetable_hash(ft->cf, log->blocknum);
+ *result = log;
+ toku_cachetable_put(cf, log->blocknum, hash,
+ log, rollback_memory_size(log),
+ get_write_callbacks_for_rollback_log(ft),
+ toku_rollback_node_save_ct_pair);
+ txn->roll_info.current_rollback = log->blocknum;
+ writing_rollback --;
+}
+
+void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
+ int r;
+ CACHEFILE cf = txn->logger->rollback_cachefile;
+ r = toku_cachetable_unpin(
+ cf,
+ log->ct_pair,
+ (enum cachetable_dirty)log->dirty,
+ rollback_memory_size(log)
+ );
+ assert(r == 0);
+}
+
+//Requires: log is pinned
+// log is current
+//After:
+// Maybe there is no current after (if it spilled)
+void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
+ if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
+ assert(log->blocknum.b == txn->roll_info.current_rollback.b);
+ //spill
+ if (!txn_has_spilled_rollback_logs(txn)) {
+ //First spilled. Copy to head.
+ txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback;
+ }
+ //Unconditionally copy to tail. Old tail does not need to be cached anymore.
+ txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback;
+
+ txn->roll_info.current_rollback = ROLLBACK_NONE;
+ }
+}
+
+int find_filenum (const FT &h, const FT &hfind);
+int find_filenum (const FT &h, const FT &hfind) {
+ FILENUM fnum = toku_cachefile_filenum(h->cf);
+ FILENUM fnumfind = toku_cachefile_filenum(hfind->cf);
+ if (fnum.fileid<fnumfind.fileid) return -1;
+ if (fnum.fileid>fnumfind.fileid) return +1;
+ return 0;
+}
+
+//Notify a transaction that it has touched an ft.
+void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) {
+ toku_txn_lock(txn);
+ FT ftv;
+ uint32_t idx;
+ int r = txn->open_fts.find_zero<FT, find_filenum>(ft, &ftv, &idx);
+ if (r == 0) {
+ // already there
+ assert(ftv == ft);
+ goto exit;
+ }
+ r = txn->open_fts.insert_at(ft, idx);
+ assert_zero(r);
+ // TODO(leif): if there's anything that locks the reflock and then
+ // the txn lock, this may deadlock, because it grabs the reflock.
+ toku_ft_add_txn_ref(ft);
+exit:
+ toku_txn_unlock(txn);
+}
+
+// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
+int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat)
+{
+ toku_txn_lock(txn);
+ txn_stat->rollback_raw_count = txn->roll_info.rollentry_raw_count;
+ txn_stat->rollback_num_entries = txn->roll_info.num_rollentries;
+ toku_txn_unlock(txn);
+ return 0;
+}
+
+void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
+ //Currently processing 'log'. Prefetch the next (previous) log node.
+
+ BLOCKNUM name = log->previous;
+ int r = 0;
+ if (name.b != ROLLBACK_NONE.b) {
+ CACHEFILE cf = txn->logger->rollback_cachefile;
+ uint32_t hash = toku_cachetable_hash(cf, name);
+ FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
+ bool doing_prefetch = false;
+ r = toku_cachefile_prefetch(cf, name, hash,
+ get_write_callbacks_for_rollback_log(h),
+ toku_rollback_fetch_callback,
+ toku_rollback_pf_req_callback,
+ toku_rollback_pf_callback,
+ h,
+ &doing_prefetch);
+ assert(r == 0);
+ }
+}
+
+void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,
+ TXNID_PAIR txnid, uint64_t sequence)
+{
+ assert(log->txnid.parent_id64 == txnid.parent_id64);
+ assert(log->txnid.child_id64 == txnid.child_id64);
+ assert(log->sequence == sequence);
+}
+
+void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log) {
+ void * value;
+ CACHEFILE cf = txn->logger->rollback_cachefile;
+ FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
+ uint32_t hash = toku_cachetable_hash(cf, blocknum);
+ int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash,
+ &value,
+ get_write_callbacks_for_rollback_log(h),
+ toku_rollback_fetch_callback,
+ toku_rollback_pf_req_callback,
+ toku_rollback_pf_callback,
+ PL_WRITE_CHEAP, // lock_type
+ h,
+ 0, NULL, NULL
+ );
+ assert(r == 0);
+ ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
+ assert(pinned_log->blocknum.b == blocknum.b);
+ *log = pinned_log;
+}
+
+void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
+ ROLLBACK_LOG_NODE pinned_log = NULL;
+ invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
+ if (txn_has_current_rollback_log(txn)) {
+ toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &pinned_log);
+ toku_rollback_verify_contents(pinned_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
+ } else {
+ // For each transaction, we try to acquire the first rollback log
+ // from the rollback log node cache, so that we avoid
+ // putting something new into the cachetable. However,
+ // if transaction has spilled rollbacks, that means we
+ // have already done a lot of work for this transaction,
+ // and subsequent rollback log nodes are created
+ // and put into the cachetable. The idea is for
+ // transactions that don't do a lot of work to (hopefully)
+ // get a rollback log node from a cache, as opposed to
+ // taking the more expensive route of creating a new one.
+ if (!txn_has_spilled_rollback_logs(txn)) {
+ txn->logger->rollback_cache.get_rollback_log_node(txn, &pinned_log);
+ if (pinned_log != NULL) {
+ rollback_initialize_for_txn(
+ pinned_log,
+ txn,
+ txn->roll_info.spilled_rollback_tail
+ );
+ txn->roll_info.current_rollback = pinned_log->blocknum;
+ }
+ }
+ if (pinned_log == NULL) {
+ rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, &pinned_log);
+ }
+ }
+ assert(pinned_log->txnid.parent_id64 == txn->txnid.parent_id64);
+ assert(pinned_log->txnid.child_id64 == txn->txnid.child_id64);
+ assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
+ *log = pinned_log;
+}
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback.h b/storage/tokudb/PerconaFT/ft/txn/rollback.h
new file mode 100644
index 00000000..359f2317
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback.h
@@ -0,0 +1,145 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+#include "ft/cachetable/cachetable.h"
+#include "ft/serialize/sub_block.h"
+#include "ft/txn/txn.h"
+
+#include "util/memarena.h"
+
+typedef struct rollback_log_node *ROLLBACK_LOG_NODE;
+typedef struct serialized_rollback_log_node *SERIALIZED_ROLLBACK_LOG_NODE;
+
+void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
+
+// these functions assert internally that they succeed
+
+// get a rollback node this txn may use for a new entry. if there
+// is a current rollback node to use, pin it, otherwise create one.
+void toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn, ROLLBACK_LOG_NODE *log);
+
+// get a specific rollback by blocknum
+void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log);
+
+// unpin a rollback node from the cachetable
+void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log);
+
+// assert that the given log's txnid and sequence match the ones given
+void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log, TXNID_PAIR txnid, uint64_t sequence);
+
+// if there is a previous rollback log for the given log node, prefetch it
+void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
+
+// unpin and rmove a rollback log from the cachetable
+void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log);
+
+void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size);
+void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
+
+// given a transaction and a log node, and if the log is too full,
+// set the current rollback log to ROLLBACK_NONE and move the current
+// node onto the tail of the rollback node chain. further insertions
+// into the rollback log for this transaction will force the creation
+// of a new rollback log.
+//
+// this never unpins the rollback log if a spill occurs. the caller
+// is responsible for ensuring the given rollback node is unpinned
+// if necessary.
+void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log);
+
+void toku_txn_maybe_note_ft (TOKUTXN txn, struct ft *ft);
+int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat);
+
+int toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind);
+
+PAIR_ATTR rollback_memory_size(ROLLBACK_LOG_NODE log);
+
+// A high-level rollback log is made up of a chain of rollback log nodes.
+// Each rollback log node is represented (separately) in the cachetable by
+// this structure. Each portion of the rollback log chain has a block num
+// and a hash to identify it.
+struct rollback_log_node {
+ int layout_version;
+ int layout_version_original;
+ int layout_version_read_from_disk;
+ uint32_t build_id; // build_id (svn rev number) of software that wrote this node to disk
+ int dirty;
+ // to which transaction does this node belong?
+ TXNID_PAIR txnid;
+ // sequentially, where in the rollback log chain is this node?
+ // the sequence is between 0 and totalnodes-1
+ uint64_t sequence;
+ BLOCKNUM blocknum; // on which block does this node live?
+ // which block number is the previous in the chain of rollback nodes
+ // that make up this rollback log?
+ BLOCKNUM previous;
+ struct roll_entry *oldest_logentry;
+ struct roll_entry *newest_logentry;
+ memarena rollentry_arena;
+ size_t rollentry_resident_bytecount; // How many bytes for the rollentries that are stored in main memory.
+ PAIR ct_pair;
+};
+
+struct serialized_rollback_log_node {
+ char *data;
+ uint32_t len;
+ int n_sub_blocks;
+ BLOCKNUM blocknum;
+ struct sub_block sub_block[max_sub_blocks];
+};
+typedef struct serialized_rollback_log_node *SERIALIZED_ROLLBACK_LOG_NODE;
+
+static inline void
+toku_static_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) {
+ toku_free(log->data);
+}
+
+static inline void
+toku_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) {
+ toku_static_serialized_rollback_log_destroy(log);
+ toku_free(log);
+}
+
+void rollback_empty_log_init(ROLLBACK_LOG_NODE log);
+void make_rollback_log_empty(ROLLBACK_LOG_NODE log);
+
+static inline bool rollback_log_is_unused(ROLLBACK_LOG_NODE log) {
+ return (log->txnid.parent_id64 == TXNID_NONE);
+}
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.cc b/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.cc
new file mode 100644
index 00000000..5e1ab746
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.cc
@@ -0,0 +1,109 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include <memory.h>
+#include <portability/toku_portability.h>
+
+#include "txn/rollback_log_node_cache.h"
+
+toku_instr_key* rollback_log_node_cache_mutex_key;
+
+void rollback_log_node_cache::init(uint32_t max_num_avail_nodes) {
+ XMALLOC_N(max_num_avail_nodes, m_avail_blocknums);
+ m_max_num_avail = max_num_avail_nodes;
+ m_first = 0;
+ m_num_avail = 0;
+ toku_pthread_mutexattr_t attr;
+ toku_mutexattr_init(&attr);
+ toku_mutexattr_settype(&attr, TOKU_MUTEX_ADAPTIVE);
+ toku_mutex_init(*rollback_log_node_cache_mutex_key, &m_mutex, &attr);
+ toku_mutexattr_destroy(&attr);
+}
+
+void rollback_log_node_cache::destroy() {
+ toku_mutex_destroy(&m_mutex);
+ toku_free(m_avail_blocknums);
+}
+
+// returns true if rollback log node was successfully added,
+// false otherwise
+bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE log){
+ bool retval = false;
+ toku_mutex_lock(&m_mutex);
+ if (m_num_avail < m_max_num_avail) {
+ retval = true;
+ uint32_t index = m_first + m_num_avail;
+ if (index >= m_max_num_avail) {
+ index -= m_max_num_avail;
+ }
+ m_avail_blocknums[index].b = log->blocknum.b;
+ m_num_avail++;
+ }
+ toku_mutex_unlock(&m_mutex);
+ //
+ // now unpin the rollback log node
+ //
+ if (retval) {
+ make_rollback_log_empty(log);
+ toku_rollback_log_unpin(txn, log);
+ }
+ return retval;
+}
+
+// if a rollback log node is available, will set log to it,
+// otherwise, will set log to NULL and caller is on his own
+// for getting a rollback log node
+void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log){
+ BLOCKNUM b = ROLLBACK_NONE;
+ toku_mutex_lock(&m_mutex);
+ if (m_num_avail > 0) {
+ b.b = m_avail_blocknums[m_first].b;
+ m_num_avail--;
+ if (++m_first >= m_max_num_avail) {
+ m_first = 0;
+ }
+ }
+ toku_mutex_unlock(&m_mutex);
+ if (b.b != ROLLBACK_NONE.b) {
+ toku_get_and_pin_rollback_log(txn, b, log);
+ invariant(rollback_log_is_unused(*log));
+ } else {
+ *log = NULL;
+ }
+}
+
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.h b/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.h
new file mode 100644
index 00000000..c7f1b9a2
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.h
@@ -0,0 +1,63 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+#include "ft/txn/rollback.h"
+
+class rollback_log_node_cache {
+public:
+ void init (uint32_t max_num_avail_nodes);
+ void destroy();
+ // returns true if rollback log node was successfully added,
+ // false otherwise
+ bool give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE log);
+ // if a rollback log node is available, will set log to it,
+ // otherwise, will set log to NULL and caller is on his own
+ // for getting a rollback log node
+ void get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log);
+
+private:
+ BLOCKNUM* m_avail_blocknums;
+ uint32_t m_first;
+ uint32_t m_num_avail;
+ uint32_t m_max_num_avail;
+ toku_mutex_t m_mutex;
+};
+
+ENSURE_POD(rollback_log_node_cache);
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.cc b/storage/tokudb/PerconaFT/ft/txn/txn.cc
new file mode 100644
index 00000000..7152833d
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/txn.cc
@@ -0,0 +1,754 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "ft/cachetable/checkpoint.h"
+#include "ft/ft.h"
+#include "ft/logger/log-internal.h"
+#include "ft/ule.h"
+#include "ft/txn/rollback-apply.h"
+#include "ft/txn/txn.h"
+#include "ft/txn/txn_manager.h"
+#include "util/status.h"
+
+toku_instr_key *txn_lock_mutex_key;
+toku_instr_key *txn_state_lock_mutex_key;
+toku_instr_key *result_state_cond_key;
+
+void toku_txn_get_status(TXN_STATUS s) {
+ txn_status.init();
+ *s = txn_status;
+}
+
+void
+toku_txn_lock(TOKUTXN txn)
+{
+ toku_mutex_lock(&txn->txn_lock);
+}
+
+void
+toku_txn_unlock(TOKUTXN txn)
+{
+ toku_mutex_unlock(&txn->txn_lock);
+}
+
+uint64_t
+toku_txn_get_root_id(TOKUTXN txn)
+{
+ return txn->txnid.parent_id64;
+}
+
+bool txn_declared_read_only(TOKUTXN txn) {
+ return txn->declared_read_only;
+}
+
+int
+toku_txn_begin_txn (
+ DB_TXN *container_db_txn,
+ TOKUTXN parent_tokutxn,
+ TOKUTXN *tokutxn,
+ TOKULOGGER logger,
+ TXN_SNAPSHOT_TYPE snapshot_type,
+ bool read_only
+ )
+{
+ int r = toku_txn_begin_with_xid(
+ parent_tokutxn,
+ tokutxn,
+ logger,
+ TXNID_PAIR_NONE,
+ snapshot_type,
+ container_db_txn,
+ false, // for_recovery
+ read_only
+ );
+ return r;
+}
+
+
+static void
+txn_create_xids(TOKUTXN txn, TOKUTXN parent) {
+ XIDS xids;
+ XIDS parent_xids;
+ if (parent == NULL) {
+ parent_xids = toku_xids_get_root_xids();
+ } else {
+ parent_xids = parent->xids;
+ }
+ toku_xids_create_unknown_child(parent_xids, &xids);
+ TXNID finalized_xid = (parent == NULL) ? txn->txnid.parent_id64 : txn->txnid.child_id64;
+ toku_xids_finalize_with_child(xids, finalized_xid);
+ txn->xids = xids;
+}
+
+// Allocate and initialize a txn
+static void toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, bool for_checkpoint, bool read_only);
+
+int
+toku_txn_begin_with_xid (
+ TOKUTXN parent,
+ TOKUTXN *txnp,
+ TOKULOGGER logger,
+ TXNID_PAIR xid,
+ TXN_SNAPSHOT_TYPE snapshot_type,
+ DB_TXN *container_db_txn,
+ bool for_recovery,
+ bool read_only
+ )
+{
+ int r = 0;
+ TOKUTXN txn;
+ // check for case where we are trying to
+ // create too many nested transactions
+ if (!read_only && parent && !toku_xids_can_create_child(parent->xids)) {
+ r = EINVAL;
+ goto exit;
+ }
+ if (read_only && parent) {
+ invariant(txn_declared_read_only(parent));
+ }
+ toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, for_recovery, read_only);
+ // txnid64, snapshot_txnid64
+ // will be set in here.
+ if (for_recovery) {
+ if (parent == NULL) {
+ invariant(xid.child_id64 == TXNID_NONE);
+ toku_txn_manager_start_txn_for_recovery(
+ txn,
+ logger->txn_manager,
+ xid.parent_id64
+ );
+ }
+ else {
+ parent->child_manager->start_child_txn_for_recovery(txn, parent, xid);
+ }
+ }
+ else {
+ assert(xid.parent_id64 == TXNID_NONE);
+ assert(xid.child_id64 == TXNID_NONE);
+ if (parent == NULL) {
+ toku_txn_manager_start_txn(
+ txn,
+ logger->txn_manager,
+ snapshot_type,
+ read_only
+ );
+ }
+ else {
+ parent->child_manager->start_child_txn(txn, parent);
+ toku_txn_manager_handle_snapshot_create_for_child_txn(
+ txn,
+ logger->txn_manager,
+ snapshot_type
+ );
+ }
+ }
+ if (!read_only) {
+ // this call will set txn->xids
+ txn_create_xids(txn, parent);
+ }
+ toku_unsafe_set(txnp, txn);
+exit:
+ return r;
+}
+
+DB_TXN *
+toku_txn_get_container_db_txn (TOKUTXN tokutxn) {
+ DB_TXN * container = tokutxn->container_db_txn;
+ return container;
+}
+
+void toku_txn_set_container_db_txn (TOKUTXN tokutxn, DB_TXN*container) {
+ tokutxn->container_db_txn = container;
+}
+
+static void invalidate_xa_xid (TOKU_XA_XID *xid) {
+ TOKU_ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind
+ xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
+}
+
+static void toku_txn_create_txn (
+ TOKUTXN *tokutxn,
+ TOKUTXN parent_tokutxn,
+ TOKULOGGER logger,
+ TXN_SNAPSHOT_TYPE snapshot_type,
+ DB_TXN *container_db_txn,
+ bool for_recovery,
+ bool read_only
+ )
+{
+ assert(logger->rollback_cachefile);
+
+ omt<FT> open_fts;
+ open_fts.create_no_array();
+
+ struct txn_roll_info roll_info = {
+ .num_rollback_nodes = 0,
+ .num_rollentries = 0,
+ .num_rollentries_processed = 0,
+ .rollentry_raw_count = 0,
+ .spilled_rollback_head = ROLLBACK_NONE,
+ .spilled_rollback_tail = ROLLBACK_NONE,
+ .current_rollback = ROLLBACK_NONE,
+ };
+
+static txn_child_manager tcm;
+
+struct tokutxn new_txn = {
+ .txnid = {.parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE },
+ .snapshot_txnid64 = TXNID_NONE,
+ .snapshot_type = for_recovery ? TXN_SNAPSHOT_NONE : snapshot_type,
+ .for_recovery = for_recovery,
+ .logger = logger,
+ .parent = parent_tokutxn,
+ .child = NULL,
+ .child_manager_s = tcm,
+ .child_manager = NULL,
+ .container_db_txn = container_db_txn,
+ .live_root_txn_list = nullptr,
+ .xids = NULL,
+ .snapshot_next = NULL,
+ .snapshot_prev = NULL,
+ .begin_was_logged = false,
+ .declared_read_only = read_only,
+ .do_fsync = false,
+ .force_fsync_on_commit = false,
+ .do_fsync_lsn = ZERO_LSN,
+ .xa_xid = {0, 0, 0, ""},
+ .progress_poll_fun = NULL,
+ .progress_poll_fun_extra = NULL,
+
+ // You cannot initialize txn_lock a TOKU_MUTEX_INITIALIZER, because we
+ // will initialize it in the code below, and it cannot already
+ // be initialized at that point. Also, in general, you don't
+ // get to use PTHREAD_MUTEX_INITALIZER (which is what is inside
+ // TOKU_MUTEX_INITIALIZER) except in static variables, and this
+ // is initializing an auto variable.
+ //
+ // And we cannot simply avoid initializing these fields
+ // because, although it avoids -Wmissing-field-initializer
+ // errors under gcc, it gets other errors about non-trivial
+ // designated initializers not being supported.
+
+ .txn_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER
+ .open_fts = open_fts,
+ .roll_info = roll_info,
+ .state_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER
+ .state_cond = ZERO_COND_INITIALIZER, // Not TOKU_COND_INITIALIZER
+ .state = TOKUTXN_LIVE,
+ .num_pin = 0,
+ .client_id = 0,
+ .client_extra = nullptr,
+ .start_time = time(NULL),
+};
+
+TOKUTXN result = NULL;
+XMEMDUP(result, &new_txn);
+invalidate_xa_xid(&result->xa_xid);
+if (parent_tokutxn == NULL) {
+ result->child_manager = &result->child_manager_s;
+ result->child_manager->init(result);
+ }
+ else {
+ result->child_manager = parent_tokutxn->child_manager;
+ }
+
+ toku_mutex_init(*txn_lock_mutex_key, &result->txn_lock, nullptr);
+
+ toku_pthread_mutexattr_t attr;
+ toku_mutexattr_init(&attr);
+ toku_mutexattr_settype(&attr, TOKU_MUTEX_ADAPTIVE);
+ toku_mutex_init(*txn_state_lock_mutex_key, &result->state_lock, &attr);
+ toku_mutexattr_destroy(&attr);
+
+ toku_cond_init(*result_state_cond_key, &result->state_cond, nullptr);
+
+ *tokutxn = result;
+
+ if (read_only) {
+ TXN_STATUS_INC(TXN_READ_BEGIN, 1);
+ }
+ else {
+ TXN_STATUS_INC(TXN_BEGIN, 1);
+ }
+}
+
+void
+toku_txn_update_xids_in_txn(TOKUTXN txn, TXNID xid)
+{
+ // these should not have been set yet
+ invariant(txn->txnid.parent_id64 == TXNID_NONE);
+ invariant(txn->txnid.child_id64 == TXNID_NONE);
+ txn->txnid.parent_id64 = xid;
+ txn->txnid.child_id64 = TXNID_NONE;
+}
+
+//Used on recovery to recover a transaction.
+int
+toku_txn_load_txninfo (TOKUTXN txn, struct txninfo *info) {
+ txn->roll_info.rollentry_raw_count = info->rollentry_raw_count;
+ uint32_t i;
+ for (i = 0; i < info->num_fts; i++) {
+ FT ft = info->open_fts[i];
+ toku_txn_maybe_note_ft(txn, ft);
+ }
+ txn->force_fsync_on_commit = info->force_fsync_on_commit;
+ txn->roll_info.num_rollback_nodes = info->num_rollback_nodes;
+ txn->roll_info.num_rollentries = info->num_rollentries;
+
+ txn->roll_info.spilled_rollback_head = info->spilled_rollback_head;
+ txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail;
+ txn->roll_info.current_rollback = info->current_rollback;
+ return 0;
+}
+
+int toku_txn_commit_txn(TOKUTXN txn, int nosync,
+ TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
+// Effect: Doesn't close the txn, just performs the commit operations.
+// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
+{
+ return toku_txn_commit_with_lsn(txn, nosync, ZERO_LSN,
+ poll, poll_extra);
+}
+
+struct xcommit_info {
+ int r;
+ TOKUTXN txn;
+};
+
+static void txn_note_commit(TOKUTXN txn) {
+ // Purpose:
+ // Delay until any indexer is done pinning this transaction.
+ // Update status of a transaction from live->committing (or prepared->committing)
+ // Do so in a thread-safe manner that does not conflict with hot indexing or
+ // begin checkpoint.
+ if (toku_txn_is_read_only(txn)) {
+ // Neither hot indexing nor checkpoint do any work with readonly txns,
+ // so we can skip taking the txn_manager lock here.
+ invariant(txn->state==TOKUTXN_LIVE);
+ txn->state = TOKUTXN_COMMITTING;
+ goto done;
+ }
+ if (txn->state==TOKUTXN_PREPARING) {
+ invalidate_xa_xid(&txn->xa_xid);
+ }
+ // for hot indexing, if hot index is processing
+ // this transaction in some leafentry, then we cannot change
+ // the state to commit or abort until
+ // hot index is done with that leafentry
+ toku_txn_lock_state(txn);
+ while (txn->num_pin > 0) {
+ toku_cond_wait(
+ &txn->state_cond,
+ &txn->state_lock
+ );
+ }
+ txn->state = TOKUTXN_COMMITTING;
+ toku_txn_unlock_state(txn);
+done:
+ return;
+}
+
+int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
+ TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
+{
+ // there should be no child when we commit or abort a TOKUTXN
+ invariant(txn->child == NULL);
+ txn_note_commit(txn);
+
+ // Child transactions do not actually 'commit'. They promote their
+ // changes to parent, so no need to fsync if this txn has a parent. The
+ // do_sync state is captured in the txn for txn_maybe_fsync_log function
+ // Additionally, if the transaction was first prepared, we do not need to
+ // fsync because the prepare caused an fsync of the log. In this case,
+ // we do not need an additional of the log. We rely on the client running
+ // recovery to properly recommit this transaction if the commit
+ // does not make it to disk. In the case of MySQL, that would be the
+ // binary log.
+ txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0));
+
+ txn->progress_poll_fun = poll;
+ txn->progress_poll_fun_extra = poll_extra;
+
+ if (!toku_txn_is_read_only(txn)) {
+ toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid);
+ }
+ // If !txn->begin_was_logged, we could skip toku_rollback_commit
+ // but it's cheap (only a number of function calls that return immediately)
+ // since there were no writes. Skipping it would mean we would need to be careful
+ // in case we added any additional required cleanup into those functions in the future.
+ int r = toku_rollback_commit(txn, oplsn);
+ TXN_STATUS_INC(TXN_COMMIT, 1);
+ return r;
+}
+
+int toku_txn_abort_txn(TOKUTXN txn,
+ TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
+// Effect: Doesn't close the txn, just performs the abort operations.
+// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
+{
+ return toku_txn_abort_with_lsn(txn, ZERO_LSN, poll, poll_extra);
+}
+
+static void txn_note_abort(TOKUTXN txn) {
+ // Purpose:
+ // Delay until any indexer is done pinning this transaction.
+ // Update status of a transaction from live->aborting (or prepared->aborting)
+ // Do so in a thread-safe manner that does not conflict with hot indexing or
+ // begin checkpoint.
+ if (toku_txn_is_read_only(txn)) {
+ // Neither hot indexing nor checkpoint do any work with readonly txns,
+ // so we can skip taking the state lock here.
+ invariant(txn->state==TOKUTXN_LIVE);
+ txn->state = TOKUTXN_ABORTING;
+ goto done;
+ }
+ if (txn->state==TOKUTXN_PREPARING) {
+ invalidate_xa_xid(&txn->xa_xid);
+ }
+ // for hot indexing, if hot index is processing
+ // this transaction in some leafentry, then we cannot change
+ // the state to commit or abort until
+ // hot index is done with that leafentry
+ toku_txn_lock_state(txn);
+ while (txn->num_pin > 0) {
+ toku_cond_wait(
+ &txn->state_cond,
+ &txn->state_lock
+ );
+ }
+ txn->state = TOKUTXN_ABORTING;
+ toku_txn_unlock_state(txn);
+done:
+ return;
+}
+
+int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
+ TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
+{
+ // there should be no child when we commit or abort a TOKUTXN
+ invariant(txn->child == NULL);
+ txn_note_abort(txn);
+
+ txn->progress_poll_fun = poll;
+ txn->progress_poll_fun_extra = poll_extra;
+ txn->do_fsync = false;
+
+ if (!toku_txn_is_read_only(txn)) {
+ toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid);
+ }
+ // If !txn->begin_was_logged, we could skip toku_rollback_abort
+ // but it's cheap (only a number of function calls that return immediately)
+ // since there were no writes. Skipping it would mean we would need to be careful
+ // in case we added any additional required cleanup into those functions in the future.
+ int r = toku_rollback_abort(txn, oplsn);
+ TXN_STATUS_INC(TXN_ABORT, 1);
+ return r;
+}
+
+static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) {
+ TOKU_ANNOTATE_NEW_MEMORY(dest, sizeof(*dest));
+ dest->formatID = source->formatID;
+ dest->gtrid_length = source->gtrid_length;
+ dest->bqual_length = source->bqual_length;
+ memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length);
+}
+
+void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid, int nosync) {
+ if (txn->parent || toku_txn_is_read_only(txn)) {
+ // We do not prepare children.
+ //
+ // Readonly transactions do the same if they commit or abort, so
+ // XA guarantees are free. No need to pay for overhead of prepare.
+ return;
+ }
+ assert(txn->state==TOKUTXN_LIVE);
+ // This state transition must be protected against begin_checkpoint
+ // Therefore, the caller must have the mo lock held
+ toku_txn_lock_state(txn);
+ txn->state = TOKUTXN_PREPARING;
+ toku_txn_unlock_state(txn);
+ // Do we need to do an fsync?
+ txn->do_fsync = txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0);
+ copy_xid(&txn->xa_xid, xa_xid);
+ // This list will go away with #4683, so we wn't need the ydb lock for this anymore.
+ toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid, xa_xid);
+}
+
+void toku_txn_get_prepared_xa_xid (TOKUTXN txn, TOKU_XA_XID *xid) {
+ copy_xid(xid, &txn->xa_xid);
+}
+
+int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
+ return toku_txn_manager_recover_root_txn(
+ logger->txn_manager,
+ preplist,
+ count,
+ retp,
+ flags
+ );
+}
+
+void toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, bool do_fsync) {
+ if (logger && do_fsync) {
+ toku_logger_fsync_if_lsn_not_fsynced(logger, do_fsync_lsn);
+ }
+}
+
+void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn) {
+ *do_fsync = ttxn->do_fsync;
+ *do_fsync_lsn = ttxn->do_fsync_lsn;
+}
+
+void toku_txn_close_txn(TOKUTXN txn) {
+ toku_txn_complete_txn(txn);
+ toku_txn_destroy_txn(txn);
+}
+
+int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn);
+int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const UU(txn))
+// Effect: This function is called on every open FT that a transaction used.
+// This function removes the transaction from that FT.
+{
+ toku_ft_remove_txn_ref(h);
+
+ return 0;
+}
+
+// for every ft in txn, remove it.
+static void note_txn_closing (TOKUTXN txn) {
+ txn->open_fts.iterate<struct tokutxn, remove_txn>(txn);
+}
+
+void toku_txn_complete_txn(TOKUTXN txn) {
+ assert(txn->roll_info.spilled_rollback_head.b == ROLLBACK_NONE.b);
+ assert(txn->roll_info.spilled_rollback_tail.b == ROLLBACK_NONE.b);
+ assert(txn->roll_info.current_rollback.b == ROLLBACK_NONE.b);
+ assert(txn->num_pin == 0);
+ assert(txn->state == TOKUTXN_COMMITTING || txn->state == TOKUTXN_ABORTING || txn->state == TOKUTXN_PREPARING);
+ if (txn->parent) {
+ toku_txn_manager_handle_snapshot_destroy_for_child_txn(
+ txn,
+ txn->logger->txn_manager,
+ txn->snapshot_type
+ );
+ txn->parent->child_manager->finish_child_txn(txn);
+ }
+ else {
+ toku_txn_manager_finish_txn(txn->logger->txn_manager, txn);
+ txn->child_manager->destroy();
+ }
+ // note that here is another place we depend on
+ // this function being called with the multi operation lock
+ note_txn_closing(txn);
+}
+
+void toku_txn_destroy_txn(TOKUTXN txn) {
+ txn->open_fts.destroy();
+ if (txn->xids) {
+ toku_xids_destroy(&txn->xids);
+ }
+ toku_mutex_destroy(&txn->txn_lock);
+ toku_mutex_destroy(&txn->state_lock);
+ toku_cond_destroy(&txn->state_cond);
+ toku_free(txn);
+}
+
+XIDS toku_txn_get_xids (TOKUTXN txn) {
+ if (txn==0) return toku_xids_get_root_xids();
+ else return txn->xids;
+}
+
+void toku_txn_force_fsync_on_commit(TOKUTXN txn) {
+ txn->force_fsync_on_commit = true;
+}
+
+TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
+ TXNID xid;
+ if (txn->live_root_txn_list->size()>0) {
+ int r = txn->live_root_txn_list->fetch(0, &xid);
+ assert_zero(r);
+ }
+ else {
+ xid = TXNID_NONE;
+ }
+ return xid;
+}
+
+bool toku_is_txn_in_live_root_txn_list(const xid_omt_t &live_root_txn_list, TXNID xid) {
+ TXNID txnid;
+ bool retval = false;
+ int r = live_root_txn_list.find_zero<TXNID, toku_find_xid_by_xid>(xid, &txnid, nullptr);
+ if (r==0) {
+ invariant(txnid == xid);
+ retval = true;
+ }
+ else {
+ invariant(r==DB_NOTFOUND);
+ }
+ return retval;
+}
+
+TOKUTXN_STATE
+toku_txn_get_state(TOKUTXN txn) {
+ return txn->state;
+}
+
+static void
+maybe_log_begin_txn_for_write_operation_unlocked(TOKUTXN txn) {
+ // We now hold the lock.
+ if (txn->begin_was_logged) {
+ return;
+ }
+ TOKUTXN parent;
+ parent = txn->parent;
+ TXNID_PAIR xid;
+ xid = txn->txnid;
+ TXNID_PAIR pxid;
+ pxid = TXNID_PAIR_NONE;
+ if (parent) {
+ // Recursively log parent first if necessary.
+ // Transactions cannot do work if they have children,
+ // so the lowest level child's lock is sufficient for ancestors.
+ maybe_log_begin_txn_for_write_operation_unlocked(parent);
+ pxid = parent->txnid;
+ }
+
+ toku_log_xbegin(txn->logger, NULL, 0, xid, pxid);
+ txn->begin_was_logged = true;
+}
+
+void
+toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn) {
+ toku_txn_lock(txn);
+ maybe_log_begin_txn_for_write_operation_unlocked(txn);
+ toku_txn_unlock(txn);
+}
+
+bool
+toku_txn_is_read_only(TOKUTXN txn) {
+ // No need to recursively check children because parents are
+ // recursively logged before children.
+ if (!txn->begin_was_logged) {
+ // Did no work.
+ invariant(txn->roll_info.num_rollentries == 0);
+ invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
+ invariant(txn->open_fts.size() == 0);
+ invariant(txn->num_pin==0);
+ return true;
+ }
+ return false;
+}
+
+// needed for hot indexing
+void toku_txn_lock_state(TOKUTXN txn) {
+ toku_mutex_lock(&txn->state_lock);
+}
+void toku_txn_unlock_state(TOKUTXN txn){
+ toku_mutex_unlock(&txn->state_lock);
+}
+
+
+// prevents a client thread from transitioning txn from LIVE|PREPARING -> COMMITTING|ABORTING
+// hot indexing may need a transactions to stay in the LIVE|PREPARING state while it processes
+// a leafentry.
+void toku_txn_pin_live_txn_unlocked(TOKUTXN txn) {
+ assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING);
+ assert(!toku_txn_is_read_only(txn));
+ txn->num_pin++;
+}
+
+// allows a client thread to go back to being able to transition txn
+// from LIVE|PREPARING -> COMMITTING|ABORTING
+void toku_txn_unpin_live_txn(TOKUTXN txn) {
+ assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING);
+ assert(txn->num_pin > 0);
+ toku_txn_lock_state(txn);
+ txn->num_pin--;
+ if (txn->num_pin == 0) {
+ toku_cond_broadcast(&txn->state_cond);
+ }
+ toku_txn_unlock_state(txn);
+}
+
+bool toku_txn_has_spilled_rollback(TOKUTXN txn) {
+ return txn_has_spilled_rollback_logs(txn);
+}
+
+void toku_txn_get_client_id(TOKUTXN txn, uint64_t *client_id, void **client_extra) {
+ if (client_id) *client_id = txn->client_id;
+ if (client_extra) *client_extra = txn->client_extra;
+}
+
+void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *client_extra) {
+ txn->client_id = client_id;
+ txn->client_extra = client_extra;
+}
+
+time_t toku_txn_get_start_time(struct tokutxn *txn) {
+ return txn->start_time;
+}
+
+extern uint force_recovery;
+int toku_txn_reads_txnid(TXNID txnid, TOKUTXN txn, bool is_provisional UU()) {
+ if(force_recovery) {
+ return TOKUDB_ACCEPT;
+ }
+ int r = 0;
+ TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(txn);
+ if (oldest_live_in_snapshot == TXNID_NONE && txnid < txn->snapshot_txnid64) {
+ r = TOKUDB_ACCEPT;
+ } else if (txnid < oldest_live_in_snapshot || txnid == txn->txnid.parent_id64) {
+ r = TOKUDB_ACCEPT;
+ } else if (txnid > txn->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(*txn->live_root_txn_list, txnid)) {
+ r = 0;
+ } else {
+ r = TOKUDB_ACCEPT;
+ }
+ return r;
+}
+
+int toku_txn_discard_txn(TOKUTXN txn) {
+ int r = toku_rollback_discard(txn);
+ return r;
+}
+
+#include <toku_race_tools.h>
+void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
+void toku_txn_status_helgrind_ignore(void) {
+ TOKU_VALGRIND_HG_DISABLE_CHECKING(&txn_status, sizeof txn_status);
+}
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.h b/storage/tokudb/PerconaFT/ft/txn/txn.h
new file mode 100644
index 00000000..34a76aa9
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/txn.h
@@ -0,0 +1,362 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+#include "portability/toku_stdint.h"
+
+#include "ft/txn/txn_state.h"
+#include "ft/serialize/block_table.h"
+#include "ft/ft-status.h"
+#include "util/omt.h"
+
+typedef uint64_t TXNID;
+
+typedef struct tokutxn *TOKUTXN;
+
+#define TXNID_NONE_LIVING ((TXNID)0)
+#define TXNID_NONE ((TXNID)0)
+#define TXNID_MAX ((TXNID)-1)
+
+typedef struct txnid_pair_s {
+ TXNID parent_id64;
+ TXNID child_id64;
+} TXNID_PAIR;
+
+static const TXNID_PAIR TXNID_PAIR_NONE = { .parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE };
+
+// We include the child manager here beacuse it uses the TXNID / TOKUTXN types
+#include "ft/txn/txn_child_manager.h"
+
+/* Log Sequence Number (LSN)
+ * Make the LSN be a struct instead of an integer so that we get better type checking. */
+typedef struct __toku_lsn { uint64_t lsn; } LSN;
+static const LSN ZERO_LSN = { .lsn = 0 };
+static const LSN MAX_LSN = { .lsn = UINT64_MAX };
+
+//
+// Types of snapshots that can be taken by a tokutxn
+// - TXN_SNAPSHOT_NONE: means that there is no snapshot. Reads do not use snapshot reads.
+// used for SERIALIZABLE and READ UNCOMMITTED
+// - TXN_SNAPSHOT_ROOT: means that all tokutxns use their root transaction's snapshot
+// used for REPEATABLE READ
+// - TXN_SNAPSHOT_CHILD: means that each child tokutxn creates its own snapshot
+// used for READ COMMITTED
+//
+
+typedef enum __TXN_SNAPSHOT_TYPE {
+ TXN_SNAPSHOT_NONE=0,
+ TXN_SNAPSHOT_ROOT=1,
+ TXN_SNAPSHOT_CHILD=2,
+ TXN_COPIES_SNAPSHOT=3
+} TXN_SNAPSHOT_TYPE;
+
+typedef toku::omt<struct tokutxn *> txn_omt_t;
+typedef toku::omt<TXNID> xid_omt_t;
+typedef toku::omt<struct referenced_xid_tuple, struct referenced_xid_tuple *> rx_omt_t;
+
+inline bool txn_pair_is_none(TXNID_PAIR txnid) {
+ return txnid.parent_id64 == TXNID_NONE && txnid.child_id64 == TXNID_NONE;
+}
+
+struct tokulogger;
+
+struct txn_roll_info {
+ // these are number of rollback nodes and rollback entries for this txn.
+ //
+ // the current rollback node below has sequence number num_rollback_nodes - 1
+ // (because they are numbered 0...num-1). often, the current rollback is
+ // already set to this block num, which means it exists and is available to
+ // log some entries. if the current rollback is NONE and the number of
+ // rollback nodes for this transaction is non-zero, then we will use
+ // the number of rollback nodes to know which sequence number to assign
+ // to a new one we create
+ uint64_t num_rollback_nodes;
+ uint64_t num_rollentries;
+ uint64_t num_rollentries_processed;
+ uint64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
+
+ // spilled rollback nodes are rollback nodes that were gorged by this
+ // transaction, retired, and saved in a list.
+
+ // the spilled rollback head is the block number of the first rollback node
+ // that makes up the rollback log chain
+ BLOCKNUM spilled_rollback_head;
+
+ // the spilled rollback is the block number of the last rollback node that
+ // makes up the rollback log chain.
+ BLOCKNUM spilled_rollback_tail;
+
+ // the current rollback node block number we may use. if this is ROLLBACK_NONE,
+ // then we need to create one and set it here before using it.
+ BLOCKNUM current_rollback;
+};
+
+struct tokutxn {
+ // These don't change after create:
+
+ TXNID_PAIR txnid;
+
+ uint64_t snapshot_txnid64; // this is the lsn of the snapshot
+ const TXN_SNAPSHOT_TYPE snapshot_type;
+ const bool for_recovery;
+ struct tokulogger *const logger;
+ struct tokutxn *const parent;
+ // The child txn is protected by the child_txn_manager lock
+ // and by the user contract. The user contract states (and is
+ // enforced at the ydb layer) that a child txn should not be created
+ // while another child exists. The txn_child_manager will protect
+ // other threads from trying to read this value while another
+ // thread commits/aborts the child
+ struct tokutxn *child;
+
+ // statically allocated child manager, if this
+ // txn is a root txn, this manager will be used and set to
+ // child_manager for this transaction and all of its children
+ txn_child_manager child_manager_s;
+
+ // child manager for this transaction, all of its children,
+ // and all of its ancestors
+ txn_child_manager* child_manager;
+
+ // These don't change but they're created in a way that's hard to make
+ // strictly const.
+ DB_TXN *container_db_txn; // reference to DB_TXN that contains this tokutxn
+ xid_omt_t *live_root_txn_list; // the root txns live when the root ancestor (self if a root) started.
+ struct XIDS_S *xids; // Represents the xid list
+
+ struct tokutxn *snapshot_next;
+ struct tokutxn *snapshot_prev;
+
+ bool begin_was_logged;
+ bool declared_read_only; // true if the txn was declared read only when began
+
+ // These are not read until a commit, prepare, or abort starts, and
+ // they're "monotonic" (only go false->true) during operation:
+ bool do_fsync;
+ bool force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
+
+ // Not used until commit, prepare, or abort starts:
+ LSN do_fsync_lsn;
+ TOKU_XA_XID xa_xid; // for prepared transactions
+ TXN_PROGRESS_POLL_FUNCTION progress_poll_fun;
+ void *progress_poll_fun_extra;
+
+ toku_mutex_t txn_lock;
+ // Protected by the txn lock:
+ toku::omt<struct ft*> open_fts; // a collection of the fts that we touched. Indexed by filenum.
+ struct txn_roll_info roll_info; // Info used to manage rollback entries
+
+ // mutex that protects the transition of the state variable
+ // the rest of the variables are used by the txn code and
+ // hot indexing to ensure that when hot indexing is processing a
+ // leafentry, a TOKUTXN cannot dissappear or change state out from
+ // underneath it
+ toku_mutex_t state_lock;
+ toku_cond_t state_cond;
+ TOKUTXN_STATE state;
+ uint32_t num_pin; // number of threads (all hot indexes) that want this
+ // txn to not transition to commit or abort
+ uint64_t client_id;
+ void *client_extra;
+ time_t start_time;
+};
+typedef struct tokutxn *TOKUTXN;
+
+void toku_txn_lock(struct tokutxn *txn);
+void toku_txn_unlock(struct tokutxn *txn);
+
+uint64_t toku_txn_get_root_id(struct tokutxn *txn);
+bool txn_declared_read_only(struct tokutxn *txn);
+
+int toku_txn_begin_txn (
+ DB_TXN *container_db_txn,
+ struct tokutxn *parent_tokutxn,
+ struct tokutxn **tokutxn,
+ struct tokulogger *logger,
+ TXN_SNAPSHOT_TYPE snapshot_type,
+ bool read_only
+ );
+
+DB_TXN * toku_txn_get_container_db_txn (struct tokutxn *tokutxn);
+void toku_txn_set_container_db_txn(struct tokutxn *txn, DB_TXN *db_txn);
+
+// toku_txn_begin_with_xid is called from recovery and has no containing DB_TXN
+int toku_txn_begin_with_xid (
+ struct tokutxn *parent_tokutxn,
+ struct tokutxn **tokutxn,
+ struct tokulogger *logger,
+ TXNID_PAIR xid,
+ TXN_SNAPSHOT_TYPE snapshot_type,
+ DB_TXN *container_db_txn,
+ bool for_recovery,
+ bool read_only
+ );
+
+void toku_txn_update_xids_in_txn(struct tokutxn *txn, TXNID xid);
+
+int toku_txn_load_txninfo (struct tokutxn *txn, struct txninfo *info);
+
+int toku_txn_commit_txn (struct tokutxn *txn, int nosync,
+ TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
+int toku_txn_commit_with_lsn(struct tokutxn *txn, int nosync, LSN oplsn,
+ TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
+
+int toku_txn_abort_txn(struct tokutxn *txn,
+ TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
+int toku_txn_abort_with_lsn(struct tokutxn *txn, LSN oplsn,
+ TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
+
+int toku_txn_discard_txn(struct tokutxn *txn);
+
+void toku_txn_prepare_txn (struct tokutxn *txn, TOKU_XA_XID *xid, int nosync);
+// Effect: Do the internal work of preparing a transaction (does not log the prepare record).
+
+void toku_txn_get_prepared_xa_xid(struct tokutxn *txn, TOKU_XA_XID *xa_xid);
+// Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values.
+
+void toku_txn_maybe_fsync_log(struct tokulogger *logger, LSN do_fsync_lsn, bool do_fsync);
+
+void toku_txn_get_fsync_info(struct tokutxn *ttxn, bool* do_fsync, LSN* do_fsync_lsn);
+
+// Complete and destroy a txn
+void toku_txn_close_txn(struct tokutxn *txn);
+
+// Remove a txn from any live txn lists
+void toku_txn_complete_txn(struct tokutxn *txn);
+
+// Free the memory of a txn
+void toku_txn_destroy_txn(struct tokutxn *txn);
+
+struct XIDS_S *toku_txn_get_xids(struct tokutxn *txn);
+
+// Force fsync on commit
+void toku_txn_force_fsync_on_commit(struct tokutxn *txn);
+
+void toku_txn_get_status(TXN_STATUS s);
+
+bool toku_is_txn_in_live_root_txn_list(const xid_omt_t &live_root_txn_list, TXNID xid);
+
+TXNID toku_get_oldest_in_live_root_txn_list(struct tokutxn *txn);
+
+TOKUTXN_STATE toku_txn_get_state(struct tokutxn *txn);
+
+struct tokulogger_preplist {
+ TOKU_XA_XID xid;
+ DB_TXN *txn;
+};
+int toku_logger_recover_txn (struct tokulogger *logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags);
+
+void toku_maybe_log_begin_txn_for_write_operation(struct tokutxn *txn);
+
+// Return whether txn (or it's descendents) have done no work.
+bool toku_txn_is_read_only(struct tokutxn *txn);
+
+void toku_txn_lock_state(struct tokutxn *txn);
+void toku_txn_unlock_state(struct tokutxn *txn);
+void toku_txn_pin_live_txn_unlocked(struct tokutxn *txn);
+void toku_txn_unpin_live_txn(struct tokutxn *txn);
+
+bool toku_txn_has_spilled_rollback(struct tokutxn *txn);
+
+void toku_txn_get_client_id(struct tokutxn *txn, uint64_t *client_id, void **client_extra);
+void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id, void *client_extra);
+
+time_t toku_txn_get_start_time(struct tokutxn *txn);
+
+//
+// This function is used by the leafentry iterators.
+// returns TOKUDB_ACCEPT if live transaction context is allowed to read a value
+// that is written by transaction with LSN of id
+// live transaction context may read value if either id is the root ancestor of context, or if
+// id was committed before context's snapshot was taken.
+// For id to be committed before context's snapshot was taken, the following must be true:
+// - id < context->snapshot_txnid64 AND id is not in context's live root transaction list
+// For the above to NOT be true:
+// - id > context->snapshot_txnid64 OR id is in context's live root transaction list
+//
+int toku_txn_reads_txnid(TXNID txnid, struct tokutxn *txn, bool is_provisional UU());
+
+// For serialize / deserialize
+
+#include "ft/serialize/wbuf.h"
+
+static inline void wbuf_TXNID(struct wbuf *wb, TXNID txnid) {
+ wbuf_ulonglong(wb, txnid);
+}
+
+static inline void wbuf_nocrc_TXNID(struct wbuf *wb, TXNID txnid) {
+ wbuf_nocrc_ulonglong(wb, txnid);
+}
+
+static inline void wbuf_nocrc_TXNID_PAIR(struct wbuf *wb, TXNID_PAIR txnid) {
+ wbuf_nocrc_ulonglong(wb, txnid.parent_id64);
+ wbuf_nocrc_ulonglong(wb, txnid.child_id64);
+}
+
+static inline void wbuf_nocrc_LSN(struct wbuf *wb, LSN lsn) {
+ wbuf_nocrc_ulonglong(wb, lsn.lsn);
+}
+
+static inline void wbuf_LSN(struct wbuf *wb, LSN lsn) {
+ wbuf_ulonglong(wb, lsn.lsn);
+}
+
+#include "ft/serialize/rbuf.h"
+
+static inline void rbuf_TXNID(struct rbuf *rb, TXNID *txnid) {
+ *txnid = rbuf_ulonglong(rb);
+}
+
+static inline void rbuf_TXNID_PAIR(struct rbuf *rb, TXNID_PAIR *txnid) {
+ txnid->parent_id64 = rbuf_ulonglong(rb);
+ txnid->child_id64 = rbuf_ulonglong(rb);
+}
+
+static inline void rbuf_ma_TXNID(struct rbuf *rb, memarena *UU(ma), TXNID *txnid) {
+ rbuf_TXNID(rb, txnid);
+}
+
+static inline void rbuf_ma_TXNID_PAIR (struct rbuf *r, memarena *ma __attribute__((__unused__)), TXNID_PAIR *txnid) {
+ rbuf_TXNID_PAIR(r, txnid);
+}
+
+static inline LSN rbuf_LSN(struct rbuf *rb) {
+ LSN lsn = { .lsn = rbuf_ulonglong(rb) };
+ return lsn;
+}
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.cc b/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.cc
new file mode 100644
index 00000000..99a21331
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.cc
@@ -0,0 +1,143 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "ft/logger/log-internal.h"
+#include "ft/txn/txn_child_manager.h"
+
+toku_instr_key *txn_child_manager_mutex_key;
+
+//
+// initialized a txn_child_manager,
+// when called, root->txnid.parent_id64 may not yet be set
+//
+void txn_child_manager::init(TOKUTXN root) {
+ invariant(root->txnid.child_id64 == TXNID_NONE);
+ invariant(root->parent == NULL);
+ m_root = root;
+ m_last_xid = TXNID_NONE;
+ ZERO_STRUCT(m_mutex);
+
+ toku_pthread_mutexattr_t attr;
+ toku_mutexattr_init(&attr);
+ toku_mutexattr_settype(&attr, TOKU_MUTEX_ADAPTIVE);
+ toku_mutex_init(*txn_child_manager_mutex_key, &m_mutex, &attr);
+ toku_mutexattr_destroy(&attr);
+}
+
+void txn_child_manager::destroy() {
+ toku_mutex_destroy(&m_mutex);
+}
+
+void txn_child_manager::start_child_txn_for_recovery(TOKUTXN child, TOKUTXN parent, TXNID_PAIR txnid) {
+ invariant(parent->txnid.parent_id64 == m_root->txnid.parent_id64);
+ invariant(txnid.parent_id64 == m_root->txnid.parent_id64);
+
+ child->txnid = txnid;
+ toku_mutex_lock(&m_mutex);
+ if (txnid.child_id64 > m_last_xid) {
+ m_last_xid = txnid.child_id64;
+ }
+ parent->child = child;
+ toku_mutex_unlock(&m_mutex);
+}
+
+void txn_child_manager::start_child_txn(TOKUTXN child, TOKUTXN parent) {
+ invariant(parent->txnid.parent_id64 == m_root->txnid.parent_id64);
+ child->txnid.parent_id64 = m_root->txnid.parent_id64;
+ toku_mutex_lock(&m_mutex);
+
+ ++m_last_xid;
+ // Here we ensure that the child_id64 is never equal to the parent_id64
+ // We do this to make this feature work more easily with the XIDs
+ // struct and message application. The XIDs struct stores the parent id
+ // as the first TXNID, and subsequent TXNIDs store child ids. So, if we
+ // have a case where the parent id is the same as the child id, we will
+ // have to do some tricky maneuvering in the message application code
+ // in ule.cc. So, to lessen the probability of bugs, we ensure that the
+ // parent id is not the same as the child id.
+ if (m_last_xid == m_root->txnid.parent_id64) {
+ ++m_last_xid;
+ }
+ child->txnid.child_id64 = m_last_xid;
+
+ parent->child = child;
+ toku_mutex_unlock(&m_mutex);
+}
+
+void txn_child_manager::finish_child_txn(TOKUTXN child) {
+ invariant(child->txnid.parent_id64 == m_root->txnid.parent_id64);
+ toku_mutex_lock(&m_mutex);
+ child->parent->child = NULL;
+ toku_mutex_unlock(&m_mutex);
+}
+
+void txn_child_manager::suspend() {
+ toku_mutex_lock(&m_mutex);
+}
+
+void txn_child_manager::resume() {
+ toku_mutex_unlock(&m_mutex);
+}
+
+void txn_child_manager::find_tokutxn_by_xid_unlocked(TXNID_PAIR xid, TOKUTXN* result) {
+ invariant(xid.parent_id64 == m_root->txnid.parent_id64);
+ TOKUTXN curr_txn = m_root;
+ while (curr_txn != NULL) {
+ if (xid.child_id64 == curr_txn->txnid.child_id64) {
+ *result = curr_txn;
+ break;
+ }
+ curr_txn = curr_txn->child;
+ }
+}
+
+int txn_child_manager::iterate(txn_mgr_iter_callback cb, void* extra) {
+ TOKUTXN curr_txn = m_root;
+ int ret = 0;
+ toku_mutex_lock(&m_mutex);
+ while (curr_txn != NULL) {
+ ret = cb(curr_txn, extra);
+ if (ret != 0) {
+ break;
+ }
+ curr_txn = curr_txn->child;
+ }
+ toku_mutex_unlock(&m_mutex);
+ return ret;
+}
+
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.h b/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.h
new file mode 100644
index 00000000..76db3705
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.h
@@ -0,0 +1,66 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+// We should be including ft/txn/txn.h here but that header includes this one,
+// so we don't.
+#include "portability/toku_pthread.h"
+
+class txn_child_manager {
+public:
+ void init (TOKUTXN root);
+ void destroy();
+ void start_child_txn_for_recovery(TOKUTXN child, TOKUTXN parent, TXNID_PAIR txnid);
+ void start_child_txn(TOKUTXN child, TOKUTXN parent);
+ void finish_child_txn(TOKUTXN child);
+ void suspend();
+ void resume();
+ void find_tokutxn_by_xid_unlocked(TXNID_PAIR xid, TOKUTXN* result);
+ int iterate(int (*cb)(TOKUTXN txn, void *extra), void* extra);
+
+private:
+ TXNID m_last_xid;
+ TOKUTXN m_root;
+ toku_mutex_t m_mutex;
+
+ friend class txn_child_manager_unit_test;
+};
+
+
+ENSURE_POD(txn_child_manager);
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc b/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc
new file mode 100644
index 00000000..1b55844b
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc
@@ -0,0 +1,1040 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "portability/toku_race_tools.h"
+
+#include "ft/cachetable/checkpoint.h"
+#include "ft/logger/log-internal.h"
+#include "ft/ule.h"
+#include "ft/txn/txn.h"
+#include "ft/txn/txn_manager.h"
+#include "ft/txn/rollback.h"
+#include "util/omt.h"
+//this is only for testing
+
+static void (* test_txn_sync_callback) (pthread_t, void *) = NULL;
+static void * test_txn_sync_callback_extra = NULL;
+
+void set_test_txn_sync_callback(void (*cb) (pthread_t, void *), void *extra) {
+ test_txn_sync_callback = cb;
+ test_txn_sync_callback_extra = extra;
+}
+bool garbage_collection_debug = false;
+
+toku_instr_key *txn_manager_lock_mutex_key;
+
+static bool txn_records_snapshot(TXN_SNAPSHOT_TYPE snapshot_type,
+ struct tokutxn *parent) {
+ if (snapshot_type == TXN_COPIES_SNAPSHOT) {
+ return false;
+ }
+ // we need a snapshot if the snapshot type is a child or
+ // if the snapshot type is root and we have no parent.
+ // Cases that we don't need a snapshot: when snapshot type is NONE
+ // or when it is ROOT and we have a parent
+ return (snapshot_type != TXN_SNAPSHOT_NONE && (parent==NULL || snapshot_type == TXN_SNAPSHOT_CHILD));
+}
+
+static bool txn_copies_snapshot(TXN_SNAPSHOT_TYPE snapshot_type, struct tokutxn *parent) {
+ return (snapshot_type == TXN_COPIES_SNAPSHOT) || txn_records_snapshot(snapshot_type, parent);
+}
+
+// internal locking functions, should use this instead of accessing lock directly
+static void txn_manager_lock(TXN_MANAGER txn_manager);
+static void txn_manager_unlock(TXN_MANAGER txn_manager);
+
+#if 0
+static bool is_txnid_live(TXN_MANAGER txn_manager, TXNID txnid) {
+ TOKUTXN result = NULL;
+ toku_txn_manager_id2txn_unlocked(txn_manager, txnid, &result);
+ return (result != NULL);
+}
+#endif
+
+//Heaviside function to search through an OMT by a TXNID
+int find_by_xid (const TOKUTXN &txn, const TXNID &txnidfind);
+
+static bool is_txnid_live(TXN_MANAGER txn_manager, TXNID txnid) {
+ TOKUTXN result = NULL;
+ TXNID_PAIR id = { .parent_id64 = txnid, .child_id64 = TXNID_NONE };
+ toku_txn_manager_id2txn_unlocked(txn_manager, id, &result);
+ return (result != NULL);
+}
+
+static void toku_txn_manager_clone_state_for_gc_unlocked(
+ TXN_MANAGER txn_manager,
+ xid_omt_t* snapshot_xids,
+ rx_omt_t* referenced_xids,
+ xid_omt_t* live_root_txns
+ );
+
+static void
+verify_snapshot_system(TXN_MANAGER txn_manager UU()) {
+ uint32_t num_snapshot_txnids = txn_manager->num_snapshots;
+ TXNID snapshot_txnids[num_snapshot_txnids];
+ TOKUTXN snapshot_txns[num_snapshot_txnids];
+ uint32_t num_live_txns = txn_manager->live_root_txns.size();
+ TOKUTXN live_txns[num_live_txns];
+ uint32_t num_referenced_xid_tuples = txn_manager->referenced_xids.size();
+ struct referenced_xid_tuple *referenced_xid_tuples[num_referenced_xid_tuples];
+
+ // do this to get an omt of snapshot_txnids
+ xid_omt_t snapshot_txnids_omt;
+ rx_omt_t referenced_xids_omt;
+ xid_omt_t live_root_txns_omt;
+ toku_txn_manager_clone_state_for_gc_unlocked(
+ txn_manager,
+ &snapshot_txnids_omt,
+ &referenced_xids_omt,
+ &live_root_txns_omt
+ );
+
+ int r;
+ uint32_t i;
+ uint32_t j;
+ //set up arrays for easier access
+ {
+ TOKUTXN curr_txn = txn_manager->snapshot_head;
+ uint32_t curr_index = 0;
+ while (curr_txn != NULL) {
+ snapshot_txns[curr_index] = curr_txn;
+ snapshot_txnids[curr_index] = curr_txn->snapshot_txnid64;
+ curr_txn = curr_txn->snapshot_next;
+ curr_index++;
+ }
+ }
+
+ for (i = 0; i < num_live_txns; i++) {
+ r = txn_manager->live_root_txns.fetch(i, &live_txns[i]);
+ assert_zero(r);
+ }
+ for (i = 0; i < num_referenced_xid_tuples; i++) {
+ r = txn_manager->referenced_xids.fetch(i, &referenced_xid_tuples[i]);
+ assert_zero(r);
+ }
+
+ {
+ //Verify snapshot_txnids
+ for (i = 0; i < num_snapshot_txnids; i++) {
+ TXNID snapshot_xid = snapshot_txnids[i];
+ TOKUTXN snapshot_txn = snapshot_txns[i];
+ uint32_t num_live_root_txn_list = snapshot_txn->live_root_txn_list->size();
+ TXNID live_root_txn_list[num_live_root_txn_list];
+ {
+ for (j = 0; j < num_live_root_txn_list; j++) {
+ r = snapshot_txn->live_root_txn_list->fetch(j, &live_root_txn_list[j]);
+ assert_zero(r);
+ }
+ }
+ {
+ // Only committed entries have return a youngest.
+ TXNID youngest = toku_get_youngest_live_list_txnid_for(
+ snapshot_xid,
+ snapshot_txnids_omt,
+ txn_manager->referenced_xids
+ );
+ invariant(youngest == TXNID_NONE);
+ }
+ for (j = 0; j < num_live_root_txn_list; j++) {
+ TXNID live_xid = live_root_txn_list[j];
+ invariant(live_xid <= snapshot_xid);
+ TXNID youngest = toku_get_youngest_live_list_txnid_for(
+ live_xid,
+ snapshot_txnids_omt,
+ txn_manager->referenced_xids
+ );
+ if (is_txnid_live(txn_manager, live_xid)) {
+ // Only committed entries have return a youngest.
+ invariant(youngest == TXNID_NONE);
+ }
+ else {
+ invariant(youngest != TXNID_NONE);
+ // A committed entry might have been read-only, in which case it won't return anything.
+ // This snapshot reads 'live_xid' so it's youngest cannot be older than snapshot_xid.
+ invariant(youngest >= snapshot_xid);
+ }
+ }
+ }
+ }
+ {
+ // Verify referenced_xids.
+ for (i = 0; i < num_referenced_xid_tuples; i++) {
+ struct referenced_xid_tuple *tuple = referenced_xid_tuples[i];
+ invariant(tuple->begin_id < tuple->end_id);
+ invariant(tuple->references > 0);
+
+ {
+ //verify neither pair->begin_id nor end_id is in live_list
+ r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(tuple->begin_id, nullptr, nullptr);
+ invariant(r == DB_NOTFOUND);
+ r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(tuple->end_id, nullptr, nullptr);
+ invariant(r == DB_NOTFOUND);
+ }
+ {
+ //verify neither pair->begin_id nor end_id is in snapshot_xids
+ TOKUTXN curr_txn = txn_manager->snapshot_head;
+ uint32_t curr_index = 0;
+ while (curr_txn != NULL) {
+ invariant(tuple->begin_id != curr_txn->txnid.parent_id64);
+ invariant(tuple->end_id != curr_txn->txnid.parent_id64);
+ curr_txn = curr_txn->snapshot_next;
+ curr_index++;
+ }
+ }
+ {
+ // Verify number of references is correct
+ uint32_t refs_found = 0;
+ for (j = 0; j < num_snapshot_txnids; j++) {
+ TOKUTXN snapshot_txn = snapshot_txns[j];
+ if (toku_is_txn_in_live_root_txn_list(*snapshot_txn->live_root_txn_list, tuple->begin_id)) {
+ refs_found++;
+ }
+ invariant(!toku_is_txn_in_live_root_txn_list(
+ *snapshot_txn->live_root_txn_list,
+ tuple->end_id));
+ }
+ invariant(refs_found == tuple->references);
+ }
+ {
+ // Verify youngest makes sense.
+ TXNID youngest = toku_get_youngest_live_list_txnid_for(
+ tuple->begin_id,
+ snapshot_txnids_omt,
+ txn_manager->referenced_xids
+ );
+ invariant(youngest != TXNID_NONE);
+ invariant(youngest > tuple->begin_id);
+ invariant(youngest < tuple->end_id);
+ // Youngest must be found, and must be a snapshot txn
+ r = snapshot_txnids_omt.find_zero<TXNID, toku_find_xid_by_xid>(youngest, nullptr, nullptr);
+ invariant_zero(r);
+ }
+ }
+ }
+ snapshot_txnids_omt.destroy();
+ referenced_xids_omt.destroy();
+ live_root_txns_omt.destroy();
+}
+
+void toku_txn_manager_init(TXN_MANAGER *txn_managerp) {
+ TXN_MANAGER XCALLOC(txn_manager);
+ toku_mutex_init(
+ *txn_manager_lock_mutex_key, &txn_manager->txn_manager_lock, nullptr);
+ txn_manager->live_root_txns.create();
+ txn_manager->live_root_ids.create();
+ txn_manager->snapshot_head = NULL;
+ txn_manager->snapshot_tail = NULL;
+ txn_manager->num_snapshots = 0;
+ txn_manager->referenced_xids.create();
+ txn_manager->last_xid = 0;
+
+ txn_manager->last_xid_seen_for_recover = TXNID_NONE;
+ txn_manager->last_calculated_oldest_referenced_xid = TXNID_NONE;
+
+ *txn_managerp = txn_manager;
+}
+
+void toku_txn_manager_destroy(TXN_MANAGER txn_manager) {
+ toku_mutex_destroy(&txn_manager->txn_manager_lock);
+ invariant(txn_manager->live_root_txns.size() == 0);
+ txn_manager->live_root_txns.destroy();
+ invariant(txn_manager->live_root_ids.size() == 0);
+ txn_manager->live_root_ids.destroy();
+ invariant(txn_manager->snapshot_head == NULL);
+ invariant(txn_manager->referenced_xids.size() == 0);
+ txn_manager->referenced_xids.destroy();
+ toku_free(txn_manager);
+}
+
+TXNID
+toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager) {
+ TOKUTXN rtxn = NULL;
+ TXNID rval = TXNID_NONE_LIVING;
+ txn_manager_lock(txn_manager);
+
+ if (txn_manager->live_root_txns.size() > 0) {
+ int r = txn_manager->live_root_txns.fetch(0, &rtxn);
+ invariant_zero(r);
+ }
+ if (rtxn) {
+ rval = rtxn->txnid.parent_id64;
+ }
+ txn_manager_unlock(txn_manager);
+ return rval;
+}
+
+TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager) {
+ return toku_unsafe_fetch(&txn_manager->last_calculated_oldest_referenced_xid);
+}
+
+int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids);
+int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids){
+ (*referenced_xids)[index] = live_xid->txnid.parent_id64;
+ return 0;
+}
+
+
+// Create list of root transactions that were live when this txn began.
+static inline void
+setup_live_root_txn_list(xid_omt_t* live_root_txnid, xid_omt_t* live_root_txn_list) {
+ if (live_root_txnid->size() > 0) {
+ live_root_txn_list->clone(*live_root_txnid);
+ } else {
+ live_root_txn_list->create_no_array();
+ }
+}
+
+//Heaviside function to search through an OMT by a TXNID
+int
+find_by_xid (const TOKUTXN &txn, const TXNID &txnidfind) {
+ if (txn->txnid.parent_id64 < txnidfind) return -1;
+ if (txn->txnid.parent_id64 > txnidfind) return +1;
+ return 0;
+}
+
+static TXNID
+max_xid(TXNID a, TXNID b) {
+ return a < b ? b : a;
+}
+
+static void set_oldest_referenced_xid(TXN_MANAGER txn_manager) {
+ TXNID oldest_referenced_xid = TXNID_MAX;
+ int r;
+ if (txn_manager->live_root_ids.size() > 0) {
+ r = txn_manager->live_root_ids.fetch(0, &oldest_referenced_xid);
+ // this function should only be called when we know there is at least
+ // one live transaction
+ invariant_zero(r);
+ }
+
+ if (txn_manager->referenced_xids.size() > 0) {
+ struct referenced_xid_tuple* tuple;
+ r = txn_manager->referenced_xids.fetch(0, &tuple);
+ if (r == 0 && tuple->begin_id < oldest_referenced_xid) {
+ oldest_referenced_xid = tuple->begin_id;
+ }
+ }
+ if (txn_manager->snapshot_head != NULL) {
+ TXNID id = txn_manager->snapshot_head->snapshot_txnid64;
+ if (id < oldest_referenced_xid) {
+ oldest_referenced_xid = id;
+ }
+ }
+ if (txn_manager->last_xid < oldest_referenced_xid) {
+ oldest_referenced_xid = txn_manager->last_xid;
+ }
+ invariant(oldest_referenced_xid != TXNID_MAX);
+ toku_unsafe_set(&txn_manager->last_calculated_oldest_referenced_xid, oldest_referenced_xid);
+}
+
+//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
+// template-only function, but must be extern
+int find_xid (const TOKUTXN &txn, const TOKUTXN &txnfind);
+int
+find_xid (const TOKUTXN &txn, const TOKUTXN &txnfind)
+{
+ if (txn->txnid.parent_id64 < txnfind->txnid.parent_id64) return -1;
+ if (txn->txnid.parent_id64 > txnfind->txnid.parent_id64) return +1;
+ return 0;
+}
+
+static inline void txn_manager_create_snapshot_unlocked(
+ TXN_MANAGER txn_manager,
+ TOKUTXN txn
+ )
+{
+ txn->snapshot_txnid64 = ++txn_manager->last_xid;
+ // Add this txn to the global list of txns that have their own snapshots.
+ // (Note, if a txn is a child that creates its own snapshot, then that child xid
+ // is the xid stored in the global list.)
+ if (txn_manager->snapshot_head == NULL) {
+ invariant(txn_manager->snapshot_tail == NULL);
+ txn_manager->snapshot_head = txn;
+ txn_manager->snapshot_tail = txn;
+ }
+ else {
+ txn_manager->snapshot_tail->snapshot_next = txn;
+ txn->snapshot_prev = txn_manager->snapshot_tail;
+ txn_manager->snapshot_tail = txn;
+ }
+ txn_manager->num_snapshots++;
+}
+
+// template-only function, but must be extern
+int find_tuple_by_xid (const struct referenced_xid_tuple &tuple, const TXNID &xidfind);
+int
+find_tuple_by_xid (const struct referenced_xid_tuple &tuple, const TXNID &xidfind)
+{
+ if (tuple.begin_id < xidfind) return -1;
+ if (tuple.begin_id > xidfind) return +1;
+ return 0;
+}
+
+// template-only function, but must be extern
+int referenced_xids_note_snapshot_txn_end_iter(const TXNID &live_xid, const uint32_t UU(index), rx_omt_t *const referenced_xids)
+ __attribute__((nonnull(3)));
+int referenced_xids_note_snapshot_txn_end_iter(const TXNID &live_xid, const uint32_t UU(index), rx_omt_t *const referenced_xids)
+{
+ int r;
+ uint32_t idx;
+ struct referenced_xid_tuple *tuple;
+
+ r = referenced_xids->find_zero<TXNID, find_tuple_by_xid>(live_xid, &tuple, &idx);
+ if (r == DB_NOTFOUND) {
+ goto done;
+ }
+ invariant_zero(r);
+ invariant(tuple->references > 0);
+ if (--tuple->references == 0) {
+ r = referenced_xids->delete_at(idx);
+ lazy_assert_zero(r);
+ }
+done:
+ return 0;
+}
+
+// When txn ends, update reverse live list. To do that, examine each txn in this (closing) txn's live list.
+static inline int
+note_snapshot_txn_end_by_ref_xids(TXN_MANAGER mgr, const xid_omt_t &live_root_txn_list) {
+ int r;
+ r = live_root_txn_list.iterate<rx_omt_t, referenced_xids_note_snapshot_txn_end_iter>(&mgr->referenced_xids);
+ invariant_zero(r);
+ return r;
+}
+
+typedef struct snapshot_iter_extra {
+ uint32_t* indexes_to_delete;
+ uint32_t num_indexes;
+ xid_omt_t* live_root_txn_list;
+} SNAPSHOT_ITER_EXTRA;
+
+// template-only function, but must be extern
+int note_snapshot_txn_end_by_txn_live_list_iter(referenced_xid_tuple* tuple, const uint32_t index, SNAPSHOT_ITER_EXTRA *const sie)
+ __attribute__((nonnull(3)));
+int note_snapshot_txn_end_by_txn_live_list_iter(
+ referenced_xid_tuple* tuple,
+ const uint32_t index,
+ SNAPSHOT_ITER_EXTRA *const sie
+ )
+{
+ int r;
+ uint32_t idx;
+ TXNID txnid;
+ r = sie->live_root_txn_list->find_zero<TXNID, toku_find_xid_by_xid>(tuple->begin_id, &txnid, &idx);
+ if (r == DB_NOTFOUND) {
+ goto done;
+ }
+ invariant_zero(r);
+ invariant(txnid == tuple->begin_id);
+ invariant(tuple->references > 0);
+ if (--tuple->references == 0) {
+ sie->indexes_to_delete[sie->num_indexes] = index;
+ sie->num_indexes++;
+ }
+done:
+ return 0;
+}
+
+static inline int
+note_snapshot_txn_end_by_txn_live_list(TXN_MANAGER mgr, xid_omt_t* live_root_txn_list) {
+ uint32_t size = mgr->referenced_xids.size();
+ uint32_t indexes_to_delete[size];
+ SNAPSHOT_ITER_EXTRA sie = { .indexes_to_delete = indexes_to_delete, .num_indexes = 0, .live_root_txn_list = live_root_txn_list};
+ mgr->referenced_xids.iterate_ptr<SNAPSHOT_ITER_EXTRA, note_snapshot_txn_end_by_txn_live_list_iter>(&sie);
+ for (uint32_t i = 0; i < sie.num_indexes; i++) {
+ uint32_t curr_index = sie.indexes_to_delete[sie.num_indexes-i-1];
+ mgr->referenced_xids.delete_at(curr_index);
+ }
+ return 0;
+}
+
+static inline void txn_manager_remove_snapshot_unlocked(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager
+ )
+{
+ // Remove from linked list of snapshot txns
+ if (txn_manager->snapshot_head == txn) {
+ txn_manager->snapshot_head = txn->snapshot_next;
+ }
+ if (txn_manager->snapshot_tail == txn) {
+ txn_manager->snapshot_tail = txn->snapshot_prev;
+ }
+ if (txn->snapshot_next) {
+ txn->snapshot_next->snapshot_prev = txn->snapshot_prev;
+ }
+ if (txn->snapshot_prev) {
+ txn->snapshot_prev->snapshot_next = txn->snapshot_next;
+ }
+ txn_manager->num_snapshots--;
+ uint32_t ref_xids_size = txn_manager->referenced_xids.size();
+ uint32_t live_list_size = txn->live_root_txn_list->size();
+ if (ref_xids_size > 0 && live_list_size > 0) {
+ if (live_list_size > ref_xids_size && ref_xids_size < 2000) {
+ note_snapshot_txn_end_by_txn_live_list(txn_manager, txn->live_root_txn_list);
+ }
+ else {
+ note_snapshot_txn_end_by_ref_xids(txn_manager, *txn->live_root_txn_list);
+ }
+ }
+}
+
+static inline void inherit_snapshot_from_parent(TOKUTXN child) {
+ if (child->parent) {
+ child->snapshot_txnid64 = child->parent->snapshot_txnid64;
+ child->live_root_txn_list = child->parent->live_root_txn_list;
+ }
+}
+void toku_txn_manager_handle_snapshot_create_for_child_txn(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager,
+ TXN_SNAPSHOT_TYPE snapshot_type
+ )
+{
+ // this is a function for child txns, so just doint a sanity check
+ invariant(txn->parent != NULL);
+ bool copies_snapshot = txn_copies_snapshot(snapshot_type, txn->parent);
+ bool records_snapshot = txn_records_snapshot(snapshot_type, txn->parent);
+ // assert that if records_snapshot is true, then copies_snapshot is true
+ invariant(!records_snapshot || copies_snapshot);
+ if (records_snapshot) {
+ invariant(txn->live_root_txn_list == nullptr);
+ XMALLOC(txn->live_root_txn_list);
+ txn_manager_lock(txn_manager);
+ txn_manager_create_snapshot_unlocked(txn_manager, txn);
+ }
+ else {
+ inherit_snapshot_from_parent(txn);
+ }
+
+ toku_debug_txn_sync(pthread_self());
+
+ if (copies_snapshot) {
+ if(!records_snapshot)
+ txn_manager_lock(txn_manager);
+ setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list);
+ txn_manager_unlock(txn_manager);
+ }
+}
+
+void toku_txn_manager_handle_snapshot_destroy_for_child_txn(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager,
+ TXN_SNAPSHOT_TYPE snapshot_type
+ )
+{
+ // this is a function for child txns, so just doint a sanity check
+ invariant(txn->parent != NULL);
+ bool copies_snapshot = txn_copies_snapshot(snapshot_type, txn->parent);
+ bool records_snapshot = txn_records_snapshot(snapshot_type, txn->parent);
+ if (records_snapshot) {
+ txn_manager_lock(txn_manager);
+ txn_manager_remove_snapshot_unlocked(txn, txn_manager);
+ txn_manager_unlock(txn_manager);
+ }
+ if (copies_snapshot) {
+ invariant(txn->live_root_txn_list != nullptr);
+ txn->live_root_txn_list->destroy();
+ toku_free(txn->live_root_txn_list);
+ }
+}
+
+void toku_txn_manager_start_txn_for_recovery(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager,
+ TXNID xid
+ )
+{
+ txn_manager_lock(txn_manager);
+ // using xid that is passed in
+ txn_manager->last_xid = max_xid(txn_manager->last_xid, xid);
+ toku_txn_update_xids_in_txn(txn, xid);
+
+ uint32_t idx;
+ int r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, nullptr, &idx);
+ invariant(r == DB_NOTFOUND);
+ r = txn_manager->live_root_txns.insert_at(txn, idx);
+ invariant_zero(r);
+ r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
+ invariant_zero(r);
+
+ txn_manager_unlock(txn_manager);
+}
+
+void toku_txn_manager_start_txn(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager,
+ TXN_SNAPSHOT_TYPE snapshot_type,
+ bool read_only
+ )
+{
+ int r;
+ TXNID xid = TXNID_NONE;
+ // if we are running in recovery, we don't need to make snapshots
+ bool copies_snapshot = txn_copies_snapshot(snapshot_type, NULL);
+ bool records_snapshot = txn_records_snapshot(snapshot_type, NULL);
+ // assert that if records_snapshot is true, then copies_snapshot is true
+ invariant(!records_snapshot || copies_snapshot);
+
+ // perform a malloc outside of the txn_manager lock
+ // will be used in txn_manager_create_snapshot_unlocked below
+ if (copies_snapshot) {
+ invariant(txn->live_root_txn_list == nullptr);
+ XMALLOC(txn->live_root_txn_list);
+ }
+ // the act of getting a transaction ID and adding the
+ // txn to the proper OMTs must be atomic. MVCC depends
+ // on this.
+ txn_manager_lock(txn_manager);
+ if (garbage_collection_debug) {
+ verify_snapshot_system(txn_manager);
+ }
+
+ //
+ // maintain the data structures necessary for MVCC:
+ // 1. add txn to list of live_root_txns if this is a root transaction
+ // 2. if the transaction is creating a snapshot:
+ // - create a live list for the transaction
+ // - add the id to the list of snapshot ids
+ //
+ // The order of operations is important here, and must be taken
+ // into account when the transaction is closed. The txn is added
+ // to the live_root_txns first (if it is a root txn). This has the implication
+ // that a root level snapshot transaction is in its own live list. This fact
+ // is taken into account when the transaction is closed.
+
+ // add ancestor information, and maintain global live root txn list
+ xid = ++txn_manager->last_xid; // we always need an ID, needed for lock tree
+ toku_txn_update_xids_in_txn(txn, xid);
+ if (!read_only) {
+ uint32_t idx = txn_manager->live_root_txns.size();
+ r = txn_manager->live_root_txns.insert_at(txn, idx);
+ invariant_zero(r);
+ r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
+ invariant_zero(r);
+ }
+ set_oldest_referenced_xid(txn_manager);
+
+ if (records_snapshot) {
+ txn_manager_create_snapshot_unlocked(
+ txn_manager,
+ txn
+ );
+ }
+ if (copies_snapshot) {
+ setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list);
+ }
+
+ if (garbage_collection_debug) {
+ verify_snapshot_system(txn_manager);
+ }
+ txn_manager_unlock(txn_manager);
+ return;
+}
+
+TXNID
+toku_get_youngest_live_list_txnid_for(TXNID xc, const xid_omt_t &snapshot_txnids, const rx_omt_t &referenced_xids) {
+ struct referenced_xid_tuple *tuple;
+ int r;
+ TXNID rval = TXNID_NONE;
+
+ r = referenced_xids.find_zero<TXNID, find_tuple_by_xid>(xc, &tuple, nullptr);
+ if (r == DB_NOTFOUND) {
+ goto done;
+ }
+ TXNID live;
+
+ r = snapshot_txnids.find<TXNID, toku_find_xid_by_xid>(tuple->end_id, -1, &live, nullptr);
+ if (r == DB_NOTFOUND) {
+ goto done;
+ }
+ invariant(live < tuple->end_id);
+ if (live > tuple->begin_id) {
+ rval = live;
+ }
+done:
+ return rval;
+}
+
+void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
+ int r;
+ invariant(txn->parent == NULL);
+ bool records_snapshot = txn_records_snapshot(txn->snapshot_type, NULL);
+ txn_manager_lock(txn_manager);
+
+ if (garbage_collection_debug) {
+ verify_snapshot_system(txn_manager);
+ }
+
+ if (records_snapshot) {
+ txn_manager_remove_snapshot_unlocked(
+ txn,
+ txn_manager
+ );
+ }
+
+ if (!txn_declared_read_only(txn)) {
+ uint32_t idx;
+ //Remove txn from list of live root txns
+ TOKUTXN txnagain;
+ r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, &txnagain, &idx);
+ invariant_zero(r);
+ invariant(txn==txnagain);
+
+ r = txn_manager->live_root_txns.delete_at(idx);
+ invariant_zero(r);
+ r = txn_manager->live_root_ids.delete_at(idx);
+ invariant_zero(r);
+
+ if (!toku_txn_is_read_only(txn) || garbage_collection_debug) {
+ uint32_t num_references = 0;
+ TOKUTXN curr_txn = txn_manager->snapshot_tail;
+ while(curr_txn != NULL) {
+ if (curr_txn->snapshot_txnid64 > txn->txnid.parent_id64) {
+ num_references++;
+ }
+ else {
+ break;
+ }
+ curr_txn = curr_txn->snapshot_prev;
+ }
+
+ if (num_references > 0) {
+ // This transaction exists in a live list of another transaction.
+ struct referenced_xid_tuple tuple = {
+ .begin_id = txn->txnid.parent_id64,
+ .end_id = ++txn_manager->last_xid,
+ .references = num_references
+ };
+ r = txn_manager->referenced_xids.insert<TXNID, find_tuple_by_xid>(tuple, txn->txnid.parent_id64, nullptr);
+ lazy_assert_zero(r);
+ }
+ }
+ }
+
+ if (garbage_collection_debug) {
+ verify_snapshot_system(txn_manager);
+ }
+ txn_manager_unlock(txn_manager);
+
+ //Cleanup that does not require the txn_manager lock
+ if (txn->live_root_txn_list) {
+ txn->live_root_txn_list->destroy();
+ toku_free(txn->live_root_txn_list);
+ }
+ return;
+}
+
+static void toku_txn_manager_clone_state_for_gc_unlocked(
+ TXN_MANAGER txn_manager,
+ xid_omt_t* snapshot_xids,
+ rx_omt_t* referenced_xids,
+ xid_omt_t* live_root_txns
+ )
+{
+ TXNID* snapshot_xids_array = NULL;
+ XMALLOC_N(txn_manager->num_snapshots, snapshot_xids_array);
+ TOKUTXN curr_txn = txn_manager->snapshot_head;
+ uint32_t curr_index = 0;
+ while (curr_txn != NULL) {
+ snapshot_xids_array[curr_index] = curr_txn->snapshot_txnid64;
+ curr_txn = curr_txn->snapshot_next;
+ curr_index++;
+ }
+ snapshot_xids->create_steal_sorted_array(
+ &snapshot_xids_array,
+ txn_manager->num_snapshots,
+ txn_manager->num_snapshots
+ );
+
+ referenced_xids->clone(txn_manager->referenced_xids);
+ setup_live_root_txn_list(&txn_manager->live_root_ids, live_root_txns);
+}
+
+void toku_txn_manager_clone_state_for_gc(
+ TXN_MANAGER txn_manager,
+ xid_omt_t* snapshot_xids,
+ rx_omt_t* referenced_xids,
+ xid_omt_t* live_root_txns
+ )
+{
+ txn_manager_lock(txn_manager);
+ toku_txn_manager_clone_state_for_gc_unlocked(
+ txn_manager,
+ snapshot_xids,
+ referenced_xids,
+ live_root_txns
+ );
+ txn_manager_unlock(txn_manager);
+}
+
+void txn_manager_state::init() {
+ invariant(!initialized);
+ invariant_notnull(txn_manager);
+ toku_txn_manager_clone_state_for_gc(
+ txn_manager,
+ &snapshot_xids,
+ &referenced_xids,
+ &live_root_txns
+ );
+ initialized = true;
+}
+
+void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID_PAIR txnid, TOKUTXN *result) {
+ TOKUTXN txn;
+ int r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(txnid.parent_id64, &txn, nullptr);
+ if (r==0) {
+ assert(txn->txnid.parent_id64 == txnid.parent_id64);
+ *result = txn;
+ }
+ else {
+ assert(r==DB_NOTFOUND);
+ // If there is no txn, then we treat it as the null txn.
+ *result = NULL;
+ }
+}
+
+int toku_txn_manager_get_root_txn_from_xid (TXN_MANAGER txn_manager, TOKU_XA_XID *xid, DB_TXN **txnp) {
+ txn_manager_lock(txn_manager);
+ int ret_val = 0;
+ int num_live_txns = txn_manager->live_root_txns.size();
+ for (int i = 0; i < num_live_txns; i++) {
+ TOKUTXN txn;
+ {
+ int r = txn_manager->live_root_txns.fetch(i, &txn);
+ assert_zero(r);
+ }
+ if (txn->xa_xid.formatID == xid->formatID
+ && txn->xa_xid.gtrid_length == xid->gtrid_length
+ && txn->xa_xid.bqual_length == xid->bqual_length
+ && 0==memcmp(txn->xa_xid.data, xid->data, xid->gtrid_length + xid->bqual_length)) {
+ *txnp = txn->container_db_txn;
+ ret_val = 0;
+ goto exit;
+ }
+ }
+ ret_val = DB_NOTFOUND;
+exit:
+ txn_manager_unlock(txn_manager);
+ return ret_val;
+}
+
+uint32_t toku_txn_manager_num_live_root_txns(TXN_MANAGER txn_manager) {
+ int ret_val = 0;
+ txn_manager_lock(txn_manager);
+ ret_val = txn_manager->live_root_txns.size();
+ txn_manager_unlock(txn_manager);
+ return ret_val;
+}
+
+static int txn_manager_iter(
+ TXN_MANAGER txn_manager,
+ txn_mgr_iter_callback cb,
+ void* extra,
+ bool just_root_txns
+ )
+{
+ int r = 0;
+ toku_mutex_lock(&txn_manager->txn_manager_lock);
+ uint32_t size = txn_manager->live_root_txns.size();
+ for (uint32_t i = 0; i < size; i++) {
+ TOKUTXN curr_txn = NULL;
+ r = txn_manager->live_root_txns.fetch(i, &curr_txn);
+ assert_zero(r);
+ if (just_root_txns) {
+ r = cb(curr_txn, extra);
+ }
+ else {
+ r = curr_txn->child_manager->iterate(cb, extra);
+ }
+ if (r) {
+ break;
+ }
+ }
+ toku_mutex_unlock(&txn_manager->txn_manager_lock);
+ return r;
+}
+
+int toku_txn_manager_iter_over_live_txns(
+ TXN_MANAGER txn_manager,
+ txn_mgr_iter_callback cb,
+ void* extra
+ )
+{
+ return txn_manager_iter(
+ txn_manager,
+ cb,
+ extra,
+ false
+ );
+}
+
+int toku_txn_manager_iter_over_live_root_txns(
+ TXN_MANAGER txn_manager,
+ txn_mgr_iter_callback cb,
+ void* extra
+ )
+{
+ return txn_manager_iter(
+ txn_manager,
+ cb,
+ extra,
+ true
+ );
+}
+
+
+//
+// This function is called only via env_txn_xa_recover and env_txn_recover.
+// See comments for those functions to understand assumptions that
+// can be made when calling this function. Namely, that the system is
+// quiescant, in that we are right after recovery and before user operations
+// commence.
+//
+// Another key assumption made here is that only root transactions
+// may be prepared and that child transactions cannot be prepared.
+// This assumption is made by the fact that we iterate over the live root txns
+// to find prepared transactions.
+//
+// I (Zardosht), don't think we take advantage of this fact, as we are holding
+// the txn_manager_lock in this function, but in the future we might want
+// to take these assumptions into account.
+//
+int toku_txn_manager_recover_root_txn (
+ TXN_MANAGER txn_manager,
+ struct tokulogger_preplist preplist[/*count*/],
+ long count,
+ long *retp, /*out*/
+ uint32_t flags
+ )
+{
+ int ret_val = 0;
+ txn_manager_lock(txn_manager);
+ uint32_t num_txns_returned = 0;
+ // scan through live root txns to find
+ // prepared transactions and return them
+ uint32_t size = txn_manager->live_root_txns.size();
+ if (flags==DB_FIRST) {
+ txn_manager->last_xid_seen_for_recover = TXNID_NONE;
+ }
+ else if (flags!=DB_NEXT) {
+ ret_val = EINVAL;
+ goto exit;
+ }
+ for (uint32_t i = 0; i < size; i++) {
+ TOKUTXN curr_txn = NULL;
+ txn_manager->live_root_txns.fetch(i, &curr_txn);
+ // skip over TOKUTXNs whose txnid64 is too small, meaning
+ // we have already processed them.
+ if (curr_txn->txnid.parent_id64 <= txn_manager->last_xid_seen_for_recover) {
+ continue;
+ }
+ if (curr_txn->state == TOKUTXN_PREPARING) {
+ assert(curr_txn->container_db_txn);
+ preplist[num_txns_returned].txn = curr_txn->container_db_txn;
+ preplist[num_txns_returned].xid = curr_txn->xa_xid;
+ txn_manager->last_xid_seen_for_recover = curr_txn->txnid.parent_id64;
+ num_txns_returned++;
+ }
+ txn_manager->last_xid_seen_for_recover = curr_txn->txnid.parent_id64;
+ // if we found the maximum number of prepared transactions we are
+ // allowed to find, then break
+ if ((long) num_txns_returned >= count) {
+ break;
+ }
+ }
+ invariant((long) num_txns_returned <= count);
+ *retp = num_txns_returned;
+ ret_val = 0;
+exit:
+ txn_manager_unlock(txn_manager);
+ return ret_val;
+}
+
+static void txn_manager_lock(TXN_MANAGER txn_manager) {
+ toku_mutex_lock(&txn_manager->txn_manager_lock);
+}
+
+static void txn_manager_unlock(TXN_MANAGER txn_manager) {
+ toku_mutex_unlock(&txn_manager->txn_manager_lock);
+}
+
+void toku_txn_manager_suspend(TXN_MANAGER txn_manager) {
+ txn_manager_lock(txn_manager);
+}
+
+void toku_txn_manager_resume(TXN_MANAGER txn_manager) {
+ txn_manager_unlock(txn_manager);
+}
+
+void
+toku_txn_manager_set_last_xid_from_logger(TXN_MANAGER txn_manager, TXNID last_xid) {
+ invariant(txn_manager->last_xid == TXNID_NONE);
+ txn_manager->last_xid = last_xid;
+}
+
+void
+toku_txn_manager_set_last_xid_from_recovered_checkpoint(TXN_MANAGER txn_manager, TXNID last_xid) {
+ txn_manager->last_xid = last_xid;
+}
+
+TXNID
+toku_txn_manager_get_last_xid(TXN_MANAGER mgr) {
+ txn_manager_lock(mgr);
+ TXNID last_xid = mgr->last_xid;
+ txn_manager_unlock(mgr);
+ return last_xid;
+}
+
+bool
+toku_txn_manager_txns_exist(TXN_MANAGER mgr) {
+ txn_manager_lock(mgr);
+ bool retval = mgr->live_root_txns.size() > 0;
+ txn_manager_unlock(mgr);
+ return retval;
+}
+
+
+// Test-only function
+void
+toku_txn_manager_increase_last_xid(TXN_MANAGER mgr, uint64_t increment) {
+ txn_manager_lock(mgr);
+ mgr->last_xid += increment;
+ txn_manager_unlock(mgr);
+}
+
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_manager.h b/storage/tokudb/PerconaFT/ft/txn/txn_manager.h
new file mode 100644
index 00000000..25fa6032
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/txn_manager.h
@@ -0,0 +1,223 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+#include "portability/toku_portability.h"
+#include "portability/toku_pthread.h"
+
+#include "ft/txn/txn.h"
+
+void set_test_txn_sync_callback(void (*) (pthread_t, void*), void*);
+#define toku_test_txn_sync_callback(a) ((test_txn_sync_callback)? test_txn_sync_callback( a,test_txn_sync_callback_extra) : (void) 0)
+
+#if defined(TOKU_DEBUG_TXN_SYNC)
+#define toku_debug_txn_sync(a) toku_test_txn_sync_callback(a)
+#else
+#define toku_debug_txn_sync(a) ((void) 0)
+#endif // defined(TOKU_DEBUG_TXN_SYNC)
+
+typedef struct txn_manager *TXN_MANAGER;
+
+struct referenced_xid_tuple {
+ TXNID begin_id;
+ TXNID end_id;
+ uint32_t references;
+};
+
+struct txn_manager {
+ toku_mutex_t txn_manager_lock; // a lock protecting this object
+ txn_omt_t live_root_txns; // a sorted tree.
+ xid_omt_t live_root_ids; //contains TXNID x | x is snapshot txn
+ TOKUTXN snapshot_head;
+ TOKUTXN snapshot_tail;
+ uint32_t num_snapshots;
+ // Contains 3-tuples: (TXNID begin_id, TXNID end_id, uint64_t num_live_list_references)
+ // for committed root transaction ids that are still referenced by a live list.
+ rx_omt_t referenced_xids;
+
+ TXNID last_xid;
+ TXNID last_xid_seen_for_recover;
+ TXNID last_calculated_oldest_referenced_xid;
+};
+typedef struct txn_manager *TXN_MANAGER;
+
+struct txn_manager_state {
+ txn_manager_state(TXN_MANAGER mgr) :
+ txn_manager(mgr),
+ initialized(false) {
+ snapshot_xids.create_no_array();
+ referenced_xids.create_no_array();
+ live_root_txns.create_no_array();
+ }
+
+ // should not copy construct
+ txn_manager_state &operator=(txn_manager_state &rhs) = delete;
+ txn_manager_state(txn_manager_state &rhs) = delete;
+
+ ~txn_manager_state() {
+ snapshot_xids.destroy();
+ referenced_xids.destroy();
+ live_root_txns.destroy();
+ }
+
+ void init();
+
+ TXN_MANAGER txn_manager;
+ bool initialized;
+
+ // a snapshot of the txn manager's mvcc state
+ // only valid if initialized = true
+ xid_omt_t snapshot_xids;
+ rx_omt_t referenced_xids;
+ xid_omt_t live_root_txns;
+};
+
+// represents all of the information needed to run garbage collection
+struct txn_gc_info {
+ txn_gc_info(txn_manager_state *st, TXNID xid_sgc, TXNID xid_ip, bool mvcc)
+ : txn_state_for_gc(st),
+ oldest_referenced_xid_for_simple_gc(xid_sgc),
+ oldest_referenced_xid_for_implicit_promotion(xid_ip),
+ mvcc_needed(mvcc) {
+ }
+
+ // a snapshot of the transcation system. may be null.
+ txn_manager_state *txn_state_for_gc;
+
+ // the oldest xid in any live list
+ //
+ // suitible for simple garbage collection that cleans up multiple committed
+ // transaction records into one. not suitible for implicit promotions, which
+ // must be correct in the face of abort messages - see ftnode->oldest_referenced_xid
+ TXNID oldest_referenced_xid_for_simple_gc;
+
+ // lower bound on the oldest xid in any live when the messages to be cleaned
+ // had no messages above them. suitable for implicitly promoting a provisonal uxr.
+ TXNID oldest_referenced_xid_for_implicit_promotion;
+
+ // whether or not mvcc is actually needed - false during recovery and non-transactional systems
+ const bool mvcc_needed;
+};
+
+void toku_txn_manager_init(TXN_MANAGER* txn_manager);
+void toku_txn_manager_destroy(TXN_MANAGER txn_manager);
+
+TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager);
+
+TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager);
+
+void toku_txn_manager_handle_snapshot_create_for_child_txn(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager,
+ TXN_SNAPSHOT_TYPE snapshot_type
+ );
+void toku_txn_manager_handle_snapshot_destroy_for_child_txn(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager,
+ TXN_SNAPSHOT_TYPE snapshot_type
+ );
+
+
+// Assign a txnid. Log the txn begin in the recovery log. Initialize the txn live lists.
+void toku_txn_manager_start_txn(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager,
+ TXN_SNAPSHOT_TYPE snapshot_type,
+ bool read_only
+ );
+
+void toku_txn_manager_start_txn_for_recovery(
+ TOKUTXN txn,
+ TXN_MANAGER txn_manager,
+ TXNID xid
+ );
+
+void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
+
+void toku_txn_manager_clone_state_for_gc(
+ TXN_MANAGER txn_manager,
+ xid_omt_t* snapshot_xids,
+ rx_omt_t* referenced_xids,
+ xid_omt_t* live_root_txns
+ );
+
+void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID_PAIR txnid, TOKUTXN *result);
+
+// Returns a root txn associated with xid. The system as a whole
+// assumes that only root txns get prepared, adn therefore only
+// root txns will have XIDs associated with them.
+int toku_txn_manager_get_root_txn_from_xid (TXN_MANAGER txn_manager, TOKU_XA_XID *xid, DB_TXN **txnp);
+
+uint32_t toku_txn_manager_num_live_root_txns(TXN_MANAGER txn_manager);
+
+typedef int (*txn_mgr_iter_callback)(TOKUTXN txn, void* extra);
+
+int toku_txn_manager_iter_over_live_txns(
+ TXN_MANAGER txn_manager,
+ txn_mgr_iter_callback cb,
+ void* extra
+ );
+
+int toku_txn_manager_iter_over_live_root_txns(
+ TXN_MANAGER txn_manager,
+ txn_mgr_iter_callback cb,
+ void* extra
+ );
+
+int toku_txn_manager_recover_root_txn(
+ TXN_MANAGER txn_manager,
+ struct tokulogger_preplist preplist[/*count*/],
+ long count,
+ long *retp, /*out*/
+ uint32_t flags
+ );
+
+void toku_txn_manager_suspend(TXN_MANAGER txn_manager);
+void toku_txn_manager_resume(TXN_MANAGER txn_manager);
+
+void toku_txn_manager_set_last_xid_from_logger(TXN_MANAGER txn_manager, TXNID last_xid);
+void toku_txn_manager_set_last_xid_from_recovered_checkpoint(TXN_MANAGER txn_manager, TXNID last_xid);
+TXNID toku_txn_manager_get_last_xid(TXN_MANAGER mgr);
+
+bool toku_txn_manager_txns_exist(TXN_MANAGER mgr);
+
+// Test-only function
+void toku_txn_manager_increase_last_xid(TXN_MANAGER mgr, uint64_t increment);
+
+TXNID toku_get_youngest_live_list_txnid_for(TXNID xc, const xid_omt_t &snapshot_txnids, const rx_omt_t &referenced_xids);
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_state.h b/storage/tokudb/PerconaFT/ft/txn/txn_state.h
new file mode 100644
index 00000000..3301cc68
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/txn_state.h
@@ -0,0 +1,50 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+// this is a separate file so that the hotindexing tests can see the txn states
+
+enum tokutxn_state {
+ TOKUTXN_LIVE, // initial txn state
+ TOKUTXN_PREPARING, // txn is preparing (or prepared)
+ TOKUTXN_COMMITTING, // txn in the process of committing
+ TOKUTXN_ABORTING, // txn in the process of aborting
+ TOKUTXN_RETIRED, // txn no longer exists
+};
+typedef enum tokutxn_state TOKUTXN_STATE;
diff --git a/storage/tokudb/PerconaFT/ft/txn/xids.cc b/storage/tokudb/PerconaFT/ft/txn/xids.cc
new file mode 100644
index 00000000..59bf3c9b
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/xids.cc
@@ -0,0 +1,247 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+/* Purpose of this file is to implement xids list of nested transactions
+ * ids.
+ *
+ * See design documentation for nested transactions at
+ * TokuWiki/Imp/TransactionsOverview.
+ *
+ * NOTE: xids are always stored in disk byte order.
+ * Accessors are responsible for transposing bytes to
+ * host order.
+ */
+
+#include <errno.h>
+#include <string.h>
+
+#include "portability/memory.h"
+#include "portability/toku_assert.h"
+#include "portability/toku_htod.h"
+#include "portability/toku_portability.h"
+
+#include "ft/txn/xids.h"
+
+/////////////////////////////////////////////////////////////////////////////////
+// This layer of abstraction (xids_xxx) understands xids<> and nothing else.
+// It contains all the functions that understand xids<>
+//
+// xids<> do not store the implicit transaction id of 0 at index 0.
+// The accessor functions make the id of 0 explicit at index 0.
+// The number of xids physically stored in the xids array is in
+// the variable num_xids.
+//
+// The xids struct is immutable. The caller gets an initial version of XIDS
+// by calling toku_xids_get_root_xids(), which returns the constant struct
+// representing the root transaction (id 0). When a transaction begins,
+// a new XIDS is created with the id of the current transaction appended to
+// the list.
+//
+//
+
+// This is the xids list for a transactionless environment.
+// It is also the initial state of any xids list created for
+// nested transactions.
+
+XIDS
+toku_xids_get_root_xids(void) {
+ static const struct XIDS_S root_xids = {
+ .num_xids = 0
+ };
+
+ XIDS rval = (XIDS)&root_xids;
+ return rval;
+}
+
+bool
+toku_xids_can_create_child(XIDS xids) {
+ invariant(xids->num_xids < MAX_TRANSACTION_RECORDS);
+ return (xids->num_xids + 1) != MAX_TRANSACTION_RECORDS;
+}
+
+int
+toku_xids_create_unknown_child(XIDS parent_xids, XIDS *xids_p) {
+ // Postcondition:
+ // xids_p points to an xids that is an exact copy of parent_xids, but with room for one more xid.
+ int rval;
+ invariant(parent_xids);
+ uint32_t num_child_xids = parent_xids->num_xids + 1;
+ // assumes that caller has verified that num_child_xids will
+ // be less than MAX_TRANSACTIN_RECORDS
+ invariant(num_child_xids < MAX_TRANSACTION_RECORDS);
+ size_t new_size = sizeof(*parent_xids) + num_child_xids*sizeof(parent_xids->ids[0]);
+ XIDS CAST_FROM_VOIDP(xids, toku_xmalloc(new_size));
+ // Clone everything (parent does not have the newest xid).
+ memcpy(xids, parent_xids, new_size - sizeof(xids->ids[0]));
+ *xids_p = xids;
+ rval = 0;
+ return rval;
+}
+
+void
+toku_xids_finalize_with_child(XIDS xids, TXNID this_xid) {
+ // Precondition:
+ // - xids was created by toku_xids_create_unknown_child
+ TXNID this_xid_disk = toku_htod64(this_xid);
+ uint32_t num_child_xids = ++xids->num_xids;
+ xids->ids[num_child_xids - 1] = this_xid_disk;
+}
+
+// xids is immutable. This function creates a new xids by copying the
+// parent's list and then appending the xid of the new transaction.
+int
+toku_xids_create_child(XIDS parent_xids, // xids list for parent transaction
+ XIDS *xids_p, // xids list created
+ TXNID this_xid) { // xid of this transaction (new innermost)
+ bool can_create_child = toku_xids_can_create_child(parent_xids);
+ if (!can_create_child) {
+ return EINVAL;
+ }
+ toku_xids_create_unknown_child(parent_xids, xids_p);
+ toku_xids_finalize_with_child(*xids_p, this_xid);
+ return 0;
+}
+
+void
+toku_xids_create_from_buffer(struct rbuf *rb, // xids list for parent transaction
+ XIDS *xids_p) { // xids list created
+ uint8_t num_xids = rbuf_char(rb);
+ invariant(num_xids < MAX_TRANSACTION_RECORDS);
+ XIDS CAST_FROM_VOIDP(xids, toku_xmalloc(sizeof(*xids) + num_xids*sizeof(xids->ids[0])));
+ xids->num_xids = num_xids;
+ uint8_t index;
+ for (index = 0; index < xids->num_xids; index++) {
+ rbuf_TXNID(rb, &xids->ids[index]);
+ }
+ *xids_p = xids;
+}
+
+void
+toku_xids_destroy(XIDS *xids_p) {
+ if (*xids_p != toku_xids_get_root_xids()) toku_free(*xids_p);
+ *xids_p = NULL;
+}
+
+// Return xid at requested position.
+// If requesting an xid out of range (which will be the case if xids array is empty)
+// then return 0, the xid of the root transaction.
+TXNID
+toku_xids_get_xid(XIDS xids, uint8_t index) {
+ invariant(index < toku_xids_get_num_xids(xids));
+ TXNID rval = xids->ids[index];
+ rval = toku_dtoh64(rval);
+ return rval;
+}
+
+uint8_t
+toku_xids_get_num_xids(XIDS xids) {
+ uint8_t rval = xids->num_xids;
+ return rval;
+}
+
+// Return innermost xid
+TXNID
+toku_xids_get_innermost_xid(XIDS xids) {
+ TXNID rval = TXNID_NONE;
+ if (toku_xids_get_num_xids(xids)) {
+ // if clause above makes this cast ok
+ uint8_t innermost_xid = (uint8_t) (toku_xids_get_num_xids(xids) - 1);
+ rval = toku_xids_get_xid(xids, innermost_xid);
+ }
+ return rval;
+}
+
+TXNID
+toku_xids_get_outermost_xid(XIDS xids) {
+ TXNID rval = TXNID_NONE;
+ if (toku_xids_get_num_xids(xids)) {
+ rval = toku_xids_get_xid(xids, 0);
+ }
+ return rval;
+}
+
+void
+toku_xids_cpy(XIDS target, XIDS source) {
+ size_t size = toku_xids_get_size(source);
+ memcpy(target, source, size);
+}
+
+// return size in bytes
+uint32_t
+toku_xids_get_size(XIDS xids) {
+ uint32_t rval;
+ uint8_t num_xids = xids->num_xids;
+ rval = sizeof(*xids) + num_xids * sizeof(xids->ids[0]);
+ return rval;
+}
+
+uint32_t
+toku_xids_get_serialize_size(XIDS xids) {
+ uint32_t rval;
+ uint8_t num_xids = xids->num_xids;
+ rval = 1 + //num xids
+ 8 * num_xids;
+ return rval;
+}
+
+unsigned char *
+toku_xids_get_end_of_array(XIDS xids) {
+ TXNID *r = xids->ids + xids->num_xids;
+ return (unsigned char*)r;
+}
+
+void wbuf_nocrc_xids(struct wbuf *wb, XIDS xids) {
+ wbuf_nocrc_char(wb, (unsigned char)xids->num_xids);
+ uint8_t index;
+ for (index = 0; index < xids->num_xids; index++) {
+ wbuf_nocrc_TXNID(wb, xids->ids[index]);
+ }
+}
+
+void
+toku_xids_fprintf(FILE *fp, XIDS xids) {
+ uint8_t index;
+ unsigned num_xids = toku_xids_get_num_xids(xids);
+ fprintf(fp, "[|%u| ", num_xids);
+ for (index = 0; index < toku_xids_get_num_xids(xids); index++) {
+ if (index) fprintf(fp, ",");
+ fprintf(fp, "%" PRIx64, toku_xids_get_xid(xids, index));
+ }
+ fprintf(fp, "]");
+}
+
diff --git a/storage/tokudb/PerconaFT/ft/txn/xids.h b/storage/tokudb/PerconaFT/ft/txn/xids.h
new file mode 100644
index 00000000..83ad5e57
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/txn/xids.h
@@ -0,0 +1,116 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+/* Purpose of this file is to provide the world with everything necessary
+ * to use the xids and nothing else.
+ * Internal requirements of the xids logic do not belong here.
+ *
+ * xids is (abstractly) an immutable list of nested transaction ids, accessed only
+ * via the functions in this file.
+ *
+ * See design documentation for nested transactions at
+ * TokuWiki/Imp/TransactionsOverview.
+ */
+
+#pragma once
+
+#include "ft/txn/txn.h"
+#include "ft/serialize/rbuf.h"
+#include "ft/serialize/wbuf.h"
+
+/* The number of transaction ids stored in the xids structure is
+ * represented by an 8-bit value. The value 255 is reserved.
+ * The constant MAX_NESTED_TRANSACTIONS is one less because
+ * one slot in the packed leaf entry is used for the implicit
+ * root transaction (id 0).
+ */
+enum {
+ MAX_NESTED_TRANSACTIONS = 253,
+ MAX_TRANSACTION_RECORDS = MAX_NESTED_TRANSACTIONS + 1
+};
+
+// Variable size list of transaction ids (known in design doc as xids<>).
+// ids[0] is the outermost transaction.
+// ids[num_xids - 1] is the innermost transaction.
+// Should only be accessed by accessor functions toku_xids_xxx, not directly.
+
+// If the xids struct is unpacked, the compiler aligns the ids[] and we waste a lot of space
+struct __attribute__((__packed__)) XIDS_S {
+ // maximum value of MAX_TRANSACTION_RECORDS - 1 because transaction 0 is implicit
+ uint8_t num_xids;
+ TXNID ids[];
+};
+typedef struct XIDS_S *XIDS;
+
+// Retrieve an XIDS representing the root transaction.
+XIDS toku_xids_get_root_xids(void);
+
+bool toku_xids_can_create_child(XIDS xids);
+
+void toku_xids_cpy(XIDS target, XIDS source);
+
+//Creates an XIDS representing this transaction.
+//You must pass in an XIDS representing the parent of this transaction.
+int toku_xids_create_child(XIDS parent_xids, XIDS *xids_p, TXNID this_xid);
+
+// The following two functions (in order) are equivalent to toku_xids_create child,
+// but allow you to do most of the work without knowing the new xid.
+int toku_xids_create_unknown_child(XIDS parent_xids, XIDS *xids_p);
+void toku_xids_finalize_with_child(XIDS xids, TXNID this_xid);
+
+void toku_xids_create_from_buffer(struct rbuf *rb, XIDS *xids_p);
+
+void toku_xids_destroy(XIDS *xids_p);
+
+TXNID toku_xids_get_xid(XIDS xids, uint8_t index);
+
+uint8_t toku_xids_get_num_xids(XIDS xids);
+
+TXNID toku_xids_get_innermost_xid(XIDS xids);
+TXNID toku_xids_get_outermost_xid(XIDS xids);
+
+// return size in bytes
+uint32_t toku_xids_get_size(XIDS xids);
+
+uint32_t toku_xids_get_serialize_size(XIDS xids);
+
+unsigned char *toku_xids_get_end_of_array(XIDS xids);
+
+void wbuf_nocrc_xids(struct wbuf *wb, XIDS xids);
+
+void toku_xids_fprintf(FILE* fp, XIDS xids);