diff options
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/txn')
18 files changed, 4986 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/ft/txn/roll.cc b/storage/tokudb/PerconaFT/ft/txn/roll.cc new file mode 100644 index 00000000..7228de06 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/roll.cc @@ -0,0 +1,692 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +/* rollback and rollforward routines. */ + +#include <memory> +#include "ft/ft-ops.h" +#include "ft/ft.h" +#include "ft/log_header.h" +#include "ft/logger/log-internal.h" +#include "ft/txn/rollback-apply.h" +#include "ft/txn/xids.h" + +// functionality provided by roll.c is exposed by an autogenerated +// header file, logheader.h +// +// this (poorly) explains the absence of "roll.h" + +// these flags control whether or not we send commit messages for +// various operations + +// When a transaction is committed, should we send a FT_COMMIT message +// for each FT_INSERT message sent earlier by the transaction? +#define TOKU_DO_COMMIT_CMD_INSERT 0 + +// When a transaction is committed, should we send a FT_COMMIT message +// for each FT_DELETE_ANY message sent earlier by the transaction? +#define TOKU_DO_COMMIT_CMD_DELETE 1 + +// When a transaction is committed, should we send a FT_COMMIT message +// for each FT_UPDATE message sent earlier by the transaction? +#define TOKU_DO_COMMIT_CMD_UPDATE 0 + +int +toku_commit_fdelete (FILENUM filenum, + TOKUTXN txn, + LSN UU(oplsn)) //oplsn is the lsn of the commit +{ + int r; + CACHEFILE cf; + CACHETABLE ct = txn->logger->ct; + + // Try to get the cachefile for this filenum. A missing file on recovery + // is not an error, but a missing file outside of recovery is. + r = toku_cachefile_of_filenum(ct, filenum, &cf); + if (r == ENOENT) { + assert(txn->for_recovery); + r = 0; + goto done; + } + assert_zero(r); + + // bug fix for #4718 + // bug was introduced in with fix for #3590 + // Before Maxwell (and fix for #3590), + // the recovery log was fsynced after the xcommit was loged but + // before we processed rollback entries and before we released + // the row locks (in the lock tree). Due to performance concerns, + // the fsync was moved to after the release of row locks, which comes + // after processing rollback entries. As a result, we may be unlinking a file + // here as part of a transactoin that may abort if we do not fsync the log. + // So, we fsync the log here. + if (txn->logger) { + toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn); + } + + // Mark the cachefile as unlink on close. There are two ways for close + // to be eventually called on the cachefile: + // + // - when this txn completes, it will release a reference on the + // ft and close it, UNLESS it was pinned by checkpoint + // - if the cf was pinned by checkpoint, an unpin will release the + // final reference and call close. it must be the final reference + // since this txn has exclusive access to dictionary (by the + // directory row lock for its dname) and we would not get this + // far if there were other live handles. + toku_cachefile_unlink_on_close(cf); +done: + return r; +} + +int +toku_rollback_fdelete (FILENUM UU(filenum), + TOKUTXN UU(txn), + LSN UU(oplsn)) //oplsn is the lsn of the abort +{ + //Rolling back an fdelete is an no-op. + return 0; +} + +int +toku_commit_fcreate (FILENUM UU(filenum), + BYTESTRING UU(bs_fname), + TOKUTXN UU(txn), + LSN UU(oplsn)) +{ + return 0; +} + +int +toku_rollback_fcreate (FILENUM filenum, + BYTESTRING UU(bs_fname), + TOKUTXN txn, + LSN UU(oplsn)) +{ + int r; + CACHEFILE cf; + CACHETABLE ct = txn->logger->ct; + + // Try to get the cachefile for this filenum. A missing file on recovery + // is not an error, but a missing file outside of recovery is. + r = toku_cachefile_of_filenum(ct, filenum, &cf); + if (r == ENOENT) { + r = 0; + goto done; + } + assert_zero(r); + + // Mark the cachefile as unlink on close. There are two ways for close + // to be eventually called on the cachefile: + // + // - when this txn completes, it will release a reference on the + // ft and close it, UNLESS it was pinned by checkpoint + // - if the cf was pinned by checkpoint, an unpin will release the + // final reference and call close. it must be the final reference + // since this txn has exclusive access to dictionary (by the + // directory row lock for its dname) and we would not get this + // far if there were other live handles. + toku_cachefile_unlink_on_close(cf); + toku_cachefile_skip_log_recover_on_close(cf); +done: + return 0; +} + +int toku_commit_frename(BYTESTRING /* old_name */, + BYTESTRING /* new_iname */, + TOKUTXN /* txn */, + LSN UU(oplsn)) { + return 0; +} + +int toku_rollback_frename(BYTESTRING old_iname, + BYTESTRING new_iname, + TOKUTXN txn, + LSN UU(oplsn)) { + assert(txn); + assert(txn->logger); + assert(txn->logger->ct); + + CACHETABLE cachetable = txn->logger->ct; + + toku_struct_stat stat; + bool old_exist = true; + bool new_exist = true; + + std::unique_ptr<char[], decltype(&toku_free)> old_iname_full( + toku_cachetable_get_fname_in_cwd(cachetable, old_iname.data), + &toku_free); + std::unique_ptr<char[], decltype(&toku_free)> new_iname_full( + toku_cachetable_get_fname_in_cwd(cachetable, new_iname.data), + &toku_free); + + if (toku_stat(old_iname_full.get(), &stat, toku_uninstrumented) == -1) { + if (ENOENT == errno) + old_exist = false; + else + return 1; + } + + if (toku_stat(new_iname_full.get(), &stat, toku_uninstrumented) == -1) { + if (ENOENT == errno || ENAMETOOLONG == errno) + new_exist = false; + else + return 1; + } + + // Both old and new files can exist if: + // - rename() is not completed + // - fcreate was replayed during recovery + // 'Stalled cachefiles' container cachefile_list::m_stale_fileid contains + // closed but not yet evicted cachefiles and the key of this container is + // fs-dependent file id - (device id, inode number) pair. To preserve the + // new cachefile + // file's id and keep it in 'stalled cachefiles' container the old file is + // removed + // and the new file is renamed. + if (old_exist && new_exist && + (toku_os_delete(old_iname_full.get()) == -1 || + toku_os_rename(new_iname_full.get(), old_iname_full.get()) == -1 || + toku_fsync_directory(new_iname_full.get()) == -1 || + toku_fsync_directory(old_iname_full.get()) == -1)) + return 1; + + if (!old_exist && new_exist && + (!toku_create_subdirs_if_needed(old_iname_full.get()) || + toku_os_rename(new_iname_full.get(), old_iname_full.get()) == -1 || + toku_fsync_directory(new_iname_full.get()) == -1 || + toku_fsync_directory(old_iname_full.get()) == -1)) + return 1; + + // it's ok if both files do not exist on recovery + if (!old_exist && !new_exist) + assert(txn->for_recovery); + + CACHEFILE cf; + int r = toku_cachefile_of_iname_in_env(cachetable, new_iname.data, &cf); + if (r != ENOENT) { + char *old_fname_in_cf = toku_cachefile_fname_in_env(cf); + toku_cachefile_set_fname_in_env(cf, toku_xstrdup(old_iname.data)); + toku_free(old_fname_in_cf); + // There is at least one case when fclose logging cause error: + // 1) start transaction + // 2) create ft 'a'(write "fcreate" in recovery log) + // 3) rename ft 'a' to 'b'(write "frename" in recovery log) + // 4) abort transaction: + // a) rollback rename ft (renames 'b' to 'a') + // b) rollback create ft (removes 'a'): + // invokes toku_cachefile_unlink_on_close - lazy unlink on file + // close, + // it just sets corresponding flag in cachefile object + // c) write "unlink" for 'a' in recovery log + // (when transaction is aborted all locks are released, + // when file lock is released the file is closed and unlinked if + // corresponding flag is set in cachefile object) + // 5) crash + // + // After this we have the following records in recovery log: + // - create ft 'a', + // - rename 'a' to 'b', + // - unlink 'a' + // + // On recovery: + // - create 'a' + // - rename 'a' to 'b' + // - unlink 'a' - as 'a' file does not exist we have crash on assert + // here + // + // There is no need to write "unlink" in recovery log in (4a) because + // 'a' will be removed + // on transaction rollback on recovery. + toku_cachefile_skip_log_recover_on_close(cf); + } + + return 0; +} + +int find_ft_from_filenum (const FT &ft, const FILENUM &filenum); +int find_ft_from_filenum (const FT &ft, const FILENUM &filenum) { + FILENUM thisfnum = toku_cachefile_filenum(ft->cf); + if (thisfnum.fileid<filenum.fileid) return -1; + if (thisfnum.fileid>filenum.fileid) return +1; + return 0; +} + +// Input arg reset_root_xid_that_created true means that this operation has changed the definition of this dictionary. +// (Example use is for schema change committed with txn that inserted cmdupdatebroadcast message.) +// The oplsn argument is ZERO_LSN for normal operation. When this function is called for recovery, it has the LSN of +// the operation (insert, delete, update, etc). +static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data, TOKUTXN txn, LSN oplsn, + bool reset_root_xid_that_created) { + int r = 0; + //printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data); + FT ft = nullptr; + r = txn->open_fts.find_zero<FILENUM, find_ft_from_filenum>(filenum, &ft, NULL); + if (r == DB_NOTFOUND) { + assert(txn->for_recovery); + r = 0; + goto done; + } + assert(r==0); + + if (oplsn.lsn != 0) { // if we are executing the recovery algorithm + LSN treelsn = toku_ft_checkpoint_lsn(ft); + if (oplsn.lsn <= treelsn.lsn) { // if operation was already applied to tree ... + r = 0; // ... do not apply it again. + goto done; + } + } + + DBT key_dbt,data_dbt; + XIDS xids; + xids = toku_txn_get_xids(txn); + { + const DBT *kdbt = key.len > 0 ? toku_fill_dbt(&key_dbt, key.data, key.len) : + toku_init_dbt(&key_dbt); + const DBT *vdbt = data ? toku_fill_dbt(&data_dbt, data->data, data->len) : + toku_init_dbt(&data_dbt); + ft_msg msg(kdbt, vdbt, type, ZERO_MSN, xids); + + TXN_MANAGER txn_manager = toku_logger_get_txn_manager(txn->logger); + txn_manager_state txn_state_for_gc(txn_manager); + + TXNID oldest_referenced_xid_estimate = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager); + txn_gc_info gc_info(&txn_state_for_gc, + oldest_referenced_xid_estimate, + // no messages above us, we can implicitly promote uxrs based on this xid + oldest_referenced_xid_estimate, + !txn->for_recovery); + toku_ft_root_put_msg(ft, msg, &gc_info); + if (reset_root_xid_that_created) { + TXNID new_root_xid_that_created = toku_xids_get_outermost_xid(xids); + toku_reset_root_xid_that_created(ft, new_root_xid_that_created); + } + } +done: + return r; +} + + +static int do_nothing_with_filenum(TOKUTXN UU(txn), FILENUM UU(filenum)) { + return 0; +} + + +int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING UU(key), TOKUTXN txn, LSN UU(oplsn)) { +#if TOKU_DO_COMMIT_CMD_INSERT + return do_insertion (FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false); +#else + return do_nothing_with_filenum(txn, filenum); +#endif +} + +int +toku_rollback_cmdinsert (FILENUM filenum, + BYTESTRING key, + TOKUTXN txn, + LSN oplsn) +{ + return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false); +} + +int +toku_commit_cmdupdate(FILENUM filenum, + BYTESTRING UU(key), + TOKUTXN txn, + LSN UU(oplsn)) +{ +#if TOKU_DO_COMMIT_CMD_UPDATE + return do_insertion(FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false); +#else + return do_nothing_with_filenum(txn, filenum); +#endif +} + +int +toku_rollback_cmdupdate(FILENUM filenum, + BYTESTRING key, + TOKUTXN txn, + LSN oplsn) +{ + return do_insertion(FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false); +} + +int +toku_commit_cmdupdatebroadcast(FILENUM filenum, + bool is_resetting_op, + TOKUTXN txn, + LSN oplsn) +{ + // if is_resetting_op, reset root_xid_that_created in + // relevant ft. + bool reset_root_xid_that_created = (is_resetting_op ? true : false); + const enum ft_msg_type msg_type = (is_resetting_op + ? FT_COMMIT_BROADCAST_ALL + : FT_COMMIT_BROADCAST_TXN); + BYTESTRING nullkey = { 0, NULL }; + return do_insertion(msg_type, filenum, nullkey, 0, txn, oplsn, reset_root_xid_that_created); +} + +int +toku_rollback_cmdupdatebroadcast(FILENUM filenum, + bool UU(is_resetting_op), + TOKUTXN txn, + LSN oplsn) +{ + BYTESTRING nullkey = { 0, NULL }; + return do_insertion(FT_ABORT_BROADCAST_TXN, filenum, nullkey, 0, txn, oplsn, false); +} + +int +toku_commit_cmddelete (FILENUM filenum, + BYTESTRING key, + TOKUTXN txn, + LSN oplsn) +{ +#if TOKU_DO_COMMIT_CMD_DELETE + return do_insertion (FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false); +#else + key = key; oplsn = oplsn; + return do_nothing_with_filenum(txn, filenum); +#endif +} + +int +toku_rollback_cmddelete (FILENUM filenum, + BYTESTRING key, + TOKUTXN txn, + LSN oplsn) +{ + return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false); +} + +static int +toku_apply_rollinclude (TXNID_PAIR xid, + uint64_t num_nodes, + BLOCKNUM spilled_head, + BLOCKNUM spilled_tail, + TOKUTXN txn, + LSN oplsn, + apply_rollback_item func) { + int r = 0; + struct roll_entry *item; + + BLOCKNUM next_log = spilled_tail; + uint64_t last_sequence = num_nodes; + + bool found_head = false; + assert(next_log.b != ROLLBACK_NONE.b); + while (next_log.b != ROLLBACK_NONE.b) { + //pin log + ROLLBACK_LOG_NODE log; + toku_get_and_pin_rollback_log(txn, next_log, &log); + toku_rollback_verify_contents(log, xid, last_sequence - 1); + last_sequence = log->sequence; + + toku_maybe_prefetch_previous_rollback_log(txn, log); + + while ((item=log->newest_logentry)) { + log->newest_logentry = item->prev; + r = func(txn, item, oplsn); + if (r!=0) return r; + } + if (next_log.b == spilled_head.b) { + assert(!found_head); + found_head = true; + assert(log->sequence == 0); + } + next_log = log->previous; + { + //Clean up transaction structure to prevent + //toku_txn_close from double-freeing + spilled_tail = next_log; + if (found_head) { + assert(next_log.b == ROLLBACK_NONE.b); + spilled_head = next_log; + } + } + toku_rollback_log_unpin_and_remove(txn, log); + } + return r; +} + +int +toku_commit_rollinclude (TXNID_PAIR xid, + uint64_t num_nodes, + BLOCKNUM spilled_head, + BLOCKNUM spilled_tail, + TOKUTXN txn, + LSN oplsn) { + int r; + r = toku_apply_rollinclude(xid, num_nodes, + spilled_head, + spilled_tail, + txn, oplsn, + toku_commit_rollback_item); + return r; +} + +int +toku_rollback_rollinclude (TXNID_PAIR xid, + uint64_t num_nodes, + BLOCKNUM spilled_head, + BLOCKNUM spilled_tail, + TOKUTXN txn, + LSN oplsn) { + int r; + r = toku_apply_rollinclude(xid, num_nodes, + spilled_head, + spilled_tail, + txn, oplsn, + toku_abort_rollback_item); + return r; +} + +int +toku_commit_load (FILENUM old_filenum, + BYTESTRING UU(new_iname), + TOKUTXN txn, + LSN UU(oplsn)) +{ + int r; + CACHEFILE old_cf; + CACHETABLE ct = txn->logger->ct; + + // To commit a dictionary load, we delete the old file + // + // Try to get the cachefile for the old filenum. A missing file on recovery + // is not an error, but a missing file outside of recovery is. + r = toku_cachefile_of_filenum(ct, old_filenum, &old_cf); + if (r == ENOENT) { + invariant(txn->for_recovery); + r = 0; + goto done; + } + lazy_assert(r == 0); + + // bug fix for #4718 + // bug was introduced in with fix for #3590 + // Before Maxwell (and fix for #3590), + // the recovery log was fsynced after the xcommit was loged but + // before we processed rollback entries and before we released + // the row locks (in the lock tree). Due to performance concerns, + // the fsync was moved to after the release of row locks, which comes + // after processing rollback entries. As a result, we may be unlinking a file + // here as part of a transactoin that may abort if we do not fsync the log. + // So, we fsync the log here. + if (txn->logger) { + toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn); + } + + // TODO: Zardosht + // Explain why this condition is valid, because I forget. + if (!toku_cachefile_is_unlink_on_close(old_cf)) { + toku_cachefile_unlink_on_close(old_cf); + } +done: + return r; +} + +int +toku_rollback_load (FILENUM UU(old_filenum), + BYTESTRING new_iname, + TOKUTXN txn, + LSN UU(oplsn)) +{ + int r; + CACHEFILE new_cf; + CACHETABLE ct = txn->logger->ct; + + // To rollback a dictionary load, we delete the new file. + // Try to get the cachefile for the new fname. + char *fname_in_env = fixup_fname(&new_iname); + r = toku_cachefile_of_iname_in_env(ct, fname_in_env, &new_cf); + if (r == ENOENT) { + // It's possible the new iname was never created, so just try to + // unlink it if it's there and ignore the error if it's not. + char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(ct, fname_in_env); + r = unlink(fname_in_cwd); + assert(r == 0 || get_error_errno() == ENOENT); + toku_free(fname_in_cwd); + r = 0; + } else { + assert_zero(r); + toku_cachefile_unlink_on_close(new_cf); + } + toku_free(fname_in_env); + return r; +} + +//2954 +int +toku_commit_hot_index (FILENUMS UU(hot_index_filenums), + TOKUTXN UU(txn), + LSN UU(oplsn)) +{ + // nothing + return 0; +} + +int +toku_rollback_hot_index (FILENUMS UU(hot_index_filenums), + TOKUTXN UU(txn), + LSN UU(oplsn)) +{ + return 0; +} + +int +toku_commit_dictionary_redirect (FILENUM UU(old_filenum), + FILENUM UU(new_filenum), + TOKUTXN UU(txn), + LSN UU(oplsn)) //oplsn is the lsn of the commit +{ + //Redirect only has meaning during normal operation (NOT during recovery). + if (!txn->for_recovery) { + //NO-OP + } + return 0; +} + +int +toku_rollback_dictionary_redirect (FILENUM old_filenum, + FILENUM new_filenum, + TOKUTXN txn, + LSN UU(oplsn)) //oplsn is the lsn of the abort +{ + int r = 0; + //Redirect only has meaning during normal operation (NOT during recovery). + if (!txn->for_recovery) { + CACHEFILE new_cf = NULL; + r = toku_cachefile_of_filenum(txn->logger->ct, new_filenum, &new_cf); + assert(r == 0); + FT CAST_FROM_VOIDP(new_ft, toku_cachefile_get_userdata(new_cf)); + + CACHEFILE old_cf = NULL; + r = toku_cachefile_of_filenum(txn->logger->ct, old_filenum, &old_cf); + assert(r == 0); + FT CAST_FROM_VOIDP(old_ft, toku_cachefile_get_userdata(old_cf)); + + //Redirect back from new to old. + r = toku_dictionary_redirect_abort(old_ft, new_ft, txn); + assert(r==0); + } + return r; +} + +int +toku_commit_change_fdescriptor(FILENUM filenum, + BYTESTRING UU(old_descriptor), + TOKUTXN txn, + LSN UU(oplsn)) +{ + return do_nothing_with_filenum(txn, filenum); +} + +int +toku_rollback_change_fdescriptor(FILENUM filenum, + BYTESTRING old_descriptor, + TOKUTXN txn, + LSN UU(oplsn)) +{ + CACHEFILE cf; + int r; + r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf); + if (r == ENOENT) { //Missing file on recovered transaction is not an error + assert(txn->for_recovery); + r = 0; + goto done; + } + // file must be open, because the txn that created it opened it and + // noted it, + assert(r == 0); + + FT ft; + ft = NULL; + r = txn->open_fts.find_zero<FILENUM, find_ft_from_filenum>(filenum, &ft, NULL); + assert(r == 0); + + DESCRIPTOR_S d; + toku_fill_dbt(&d.dbt, old_descriptor.data, old_descriptor.len); + toku_ft_update_descriptor(ft, &d); +done: + return r; +} + + + diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc new file mode 100644 index 00000000..0f19c445 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc @@ -0,0 +1,258 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "ft/logger/log-internal.h" +#include "ft/txn/rollback-apply.h" + +static void poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) { + if (txn->progress_poll_fun) { + TOKU_TXN_PROGRESS_S progress = { + .entries_total = txn->roll_info.num_rollentries, + .entries_processed = txn->roll_info.num_rollentries_processed, + .is_commit = is_commit, + .stalled_on_checkpoint = stall_for_checkpoint}; + txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra); + } +} + +int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) { + int r=0; + rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn); + txn->roll_info.num_rollentries_processed++; + if (txn->roll_info.num_rollentries_processed % 1024 == 0) { + poll_txn_progress_function(txn, true, false); + } + return r; +} + +int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) { + int r=0; + rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn); + txn->roll_info.num_rollentries_processed++; + if (txn->roll_info.num_rollentries_processed % 1024 == 0) { + poll_txn_progress_function(txn, false, false); + } + return r; +} + +int note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child); +int note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) { + TOKUTXN parent = child->parent; + toku_txn_maybe_note_ft(parent, ft); + return 0; +} + +static int apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) { + int r = 0; + // do the commit/abort calls and free everything + // we do the commit/abort calls in reverse order too. + struct roll_entry *item; + //printf("%s:%d abort\n", __FILE__, __LINE__); + + BLOCKNUM next_log = ROLLBACK_NONE; + + bool is_current = false; + if (txn_has_current_rollback_log(txn)) { + next_log = txn->roll_info.current_rollback; + is_current = true; + } + else if (txn_has_spilled_rollback_logs(txn)) { + next_log = txn->roll_info.spilled_rollback_tail; + } + + uint64_t last_sequence = txn->roll_info.num_rollback_nodes; + bool found_head = false; + while (next_log.b != ROLLBACK_NONE.b) { + ROLLBACK_LOG_NODE log; + //pin log + toku_get_and_pin_rollback_log(txn, next_log, &log); + toku_rollback_verify_contents(log, txn->txnid, last_sequence - 1); + + toku_maybe_prefetch_previous_rollback_log(txn, log); + + last_sequence = log->sequence; + if (func) { + while ((item=log->newest_logentry)) { + log->newest_logentry = item->prev; + r = func(txn, item, lsn); + if (r!=0) return r; + } + } + if (next_log.b == txn->roll_info.spilled_rollback_head.b) { + assert(!found_head); + found_head = true; + assert(log->sequence == 0); + } + next_log = log->previous; + { + //Clean up transaction structure to prevent + //toku_txn_close from double-freeing + if (is_current) { + txn->roll_info.current_rollback = ROLLBACK_NONE; + is_current = false; + } + else { + txn->roll_info.spilled_rollback_tail = next_log; + } + if (found_head) { + assert(next_log.b == ROLLBACK_NONE.b); + txn->roll_info.spilled_rollback_head = next_log; + } + } + bool give_back = false; + // each txn tries to give back at most one rollback log node + // to the cache. + if (next_log.b == ROLLBACK_NONE.b) { + give_back = txn->logger->rollback_cache.give_rollback_log_node( + txn, + log + ); + } + if (!give_back) { + toku_rollback_log_unpin_and_remove(txn, log); + } + } + return r; +} + +//Commit each entry in the rollback log. +//If the transaction has a parent, it just promotes its information to its parent. +int toku_rollback_commit(TOKUTXN txn, LSN lsn) { + int r=0; + if (txn->parent!=0) { + // First we must put a rollinclude entry into the parent if we spilled + + if (txn_has_spilled_rollback_logs(txn)) { + uint64_t num_nodes = txn->roll_info.num_rollback_nodes; + if (txn_has_current_rollback_log(txn)) { + num_nodes--; //Don't count the in-progress rollback log. + } + toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid, num_nodes, + txn->roll_info.spilled_rollback_head, + txn->roll_info.spilled_rollback_tail); + //Remove ownership from child. + txn->roll_info.spilled_rollback_head = ROLLBACK_NONE; + txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE; + } + // if we're committing a child rollback, put its entries into the parent + // by pinning both child and parent and then linking the child log entry + // list to the end of the parent log entry list. + if (txn_has_current_rollback_log(txn)) { + //Pin parent log + toku_txn_lock(txn->parent); + ROLLBACK_LOG_NODE parent_log; + toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log); + + //Pin child log + ROLLBACK_LOG_NODE child_log; + toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &child_log); + toku_rollback_verify_contents(child_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1); + + // Append the list to the front of the parent. + if (child_log->oldest_logentry) { + // There are some entries, so link them in. + parent_log->dirty = true; + child_log->oldest_logentry->prev = parent_log->newest_logentry; + if (!parent_log->oldest_logentry) { + parent_log->oldest_logentry = child_log->oldest_logentry; + } + parent_log->newest_logentry = child_log->newest_logentry; + parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount; + txn->parent->roll_info.rollentry_raw_count += txn->roll_info.rollentry_raw_count; + child_log->rollentry_resident_bytecount = 0; + } + if (parent_log->oldest_logentry==NULL) { + parent_log->oldest_logentry = child_log->oldest_logentry; + } + child_log->newest_logentry = child_log->oldest_logentry = 0; + // Put all the memarena data into the parent. + if (child_log->rollentry_arena.total_size_in_use() > 0) { + // If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed. + child_log->rollentry_arena.move_memory(&parent_log->rollentry_arena); + } + // each txn tries to give back at most one rollback log node + // to the cache. All other rollback log nodes for this child + // transaction are included in the parent's rollback log, + // so this is the only node we can give back to the cache + bool give_back = txn->logger->rollback_cache.give_rollback_log_node( + txn, + child_log + ); + if (!give_back) { + toku_rollback_log_unpin_and_remove(txn, child_log); + } + txn->roll_info.current_rollback = ROLLBACK_NONE; + + toku_maybe_spill_rollbacks(txn->parent, parent_log); + toku_rollback_log_unpin(txn->parent, parent_log); + assert(r == 0); + toku_txn_unlock(txn->parent); + } + + // Note the open FTs, the omts must be merged + r = txn->open_fts.iterate<struct tokutxn, note_ft_used_in_txns_parent>(txn); + assert(r==0); + + //If this transaction needs an fsync (if it commits) + //save that in the parent. Since the commit really happens in the root txn. + toku_txn_lock(txn->parent); + txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit; + txn->parent->roll_info.num_rollentries += txn->roll_info.num_rollentries; + toku_txn_unlock(txn->parent); + } else { + r = apply_txn(txn, lsn, toku_commit_rollback_item); + assert(r==0); + } + + return r; +} + +int toku_rollback_abort(TOKUTXN txn, LSN lsn) { + int r; + r = apply_txn(txn, lsn, toku_abort_rollback_item); + assert(r==0); + return r; +} + +int toku_rollback_discard(TOKUTXN txn) { + txn->roll_info.current_rollback = ROLLBACK_NONE; + txn->roll_info.spilled_rollback_head = ROLLBACK_NONE; + txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE; + return 0; +} + diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-apply.h b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.h new file mode 100644 index 00000000..bf87cd29 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.h @@ -0,0 +1,47 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, LSN lsn); +int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn); +int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn); + +int toku_rollback_commit(TOKUTXN txn, LSN lsn); +int toku_rollback_abort(TOKUTXN txn, LSN lsn); +int toku_rollback_discard(TOKUTXN txn); diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.cc b/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.cc new file mode 100644 index 00000000..08d7c887 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.cc @@ -0,0 +1,257 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "portability/memory.h" +#include "portability/toku_portability.h" + +#include "ft/serialize/block_table.h" +#include "ft/ft-internal.h" +#include "ft/serialize/ft_node-serialize.h" +#include "ft/txn/rollback.h" +#include "ft/txn/rollback-ct-callbacks.h" + +#include "util/memarena.h" + +// Address used as a sentinel. Otherwise unused. +static struct serialized_rollback_log_node cloned_rollback; + +// Cleanup the rollback memory +static void +rollback_log_destroy(ROLLBACK_LOG_NODE log) { + make_rollback_log_empty(log); + toku_free(log); +} + +// flush an ununused log to disk, by allocating a size 0 blocknum in +// the blocktable +static void toku_rollback_flush_unused_log(ROLLBACK_LOG_NODE log, + BLOCKNUM logname, + int fd, + FT ft, + bool write_me, + bool keep_me, + bool for_checkpoint, + bool is_clone) { + if (write_me) { + DISKOFF offset; + ft->blocktable.realloc_on_disk( + logname, 0, &offset, ft, fd, for_checkpoint); + } + if (!keep_me && !is_clone) { + toku_free(log); + } +} + +// flush a used log to disk by serializing and writing the node out +static void +toku_rollback_flush_used_log ( + ROLLBACK_LOG_NODE log, + SERIALIZED_ROLLBACK_LOG_NODE serialized, + int fd, + FT ft, + bool write_me, + bool keep_me, + bool for_checkpoint, + bool is_clone + ) +{ + + if (write_me) { + int r = toku_serialize_rollback_log_to(fd, log, serialized, is_clone, ft, for_checkpoint); + assert(r == 0); + } + if (!keep_me) { + if (is_clone) { + toku_serialized_rollback_log_destroy(serialized); + } + else { + rollback_log_destroy(log); + } + } +} + +// Write something out. Keep trying even if partial writes occur. +// On error: Return negative with errno set. +// On success return nbytes. +void toku_rollback_flush_callback ( + CACHEFILE UU(cachefile), + int fd, + BLOCKNUM logname, + void *rollback_v, + void** UU(disk_data), + void *extraargs, + PAIR_ATTR size, + PAIR_ATTR* new_size, + bool write_me, + bool keep_me, + bool for_checkpoint, + bool is_clone + ) +{ + ROLLBACK_LOG_NODE log = nullptr; + SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr; + bool is_unused = false; + if (is_clone) { + is_unused = (rollback_v == &cloned_rollback); + CAST_FROM_VOIDP(serialized, rollback_v); + } + else { + CAST_FROM_VOIDP(log, rollback_v); + is_unused = rollback_log_is_unused(log); + } + *new_size = size; + FT ft; + CAST_FROM_VOIDP(ft, extraargs); + if (is_unused) { + toku_rollback_flush_unused_log( + log, + logname, + fd, + ft, + write_me, + keep_me, + for_checkpoint, + is_clone + ); + } + else { + toku_rollback_flush_used_log( + log, + serialized, + fd, + ft, + write_me, + keep_me, + for_checkpoint, + is_clone + ); + } +} + +int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash UU(), + void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) { + int r; + FT CAST_FROM_VOIDP(h, extraargs); + assert(h->cf == cachefile); + ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv; + r = toku_deserialize_rollback_log_from(fd, logname, result, h); + if (r==0) { + (*result)->ct_pair = p; + *sizep = rollback_memory_size(*result); + } + return r; +} + +void toku_rollback_pe_est_callback( + void* rollback_v, + void* UU(disk_data), + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + assert(rollback_v != NULL); + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; +} + +// callback for partially evicting a cachetable entry +int toku_rollback_pe_callback ( + void *rollback_v, + PAIR_ATTR old_attr, + void* UU(extraargs), + void (*finalize)(PAIR_ATTR new_attr, void * extra), + void *finalize_extra + ) +{ + assert(rollback_v != NULL); + finalize(old_attr, finalize_extra); + return 0; +} + +// partial fetch is never required for a rollback log node +bool toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) { + return false; +} + +// a rollback node should never be partial fetched, +// because we always say it is not required. +// (pf req callback always returns false) +int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) { + assert(false); + return 0; +} + +// the cleaner thread should never choose a rollback node for cleaning +int toku_rollback_cleaner_callback ( + void* UU(ftnode_pv), + BLOCKNUM UU(blocknum), + uint32_t UU(fullhash), + void* UU(extraargs) + ) +{ + assert(false); + return 0; +} + +void toku_rollback_clone_callback( + void* value_data, + void** cloned_value_data, + long* clone_size, + PAIR_ATTR* new_attr, + bool UU(for_checkpoint), + void* UU(write_extraargs) + ) +{ + ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data); + SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr; + if (!rollback_log_is_unused(log)) { + XMALLOC(serialized); + toku_serialize_rollback_log_to_memory_uncompressed(log, serialized); + *cloned_value_data = serialized; + *clone_size = sizeof(struct serialized_rollback_log_node) + serialized->len; + } + else { + *cloned_value_data = &cloned_rollback; + *clone_size = sizeof(cloned_rollback); + } + // clear the dirty bit, because the node has been cloned + log->dirty = 0; + new_attr->is_valid = false; +} + diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.h b/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.h new file mode 100644 index 00000000..5fedb0e5 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/rollback-ct-callbacks.h @@ -0,0 +1,80 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +#include "ft/cachetable/cachetable.h" + +void toku_rollback_flush_callback(CACHEFILE cachefile, int fd, BLOCKNUM logname, void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool UU(is_clone)); +int toku_rollback_fetch_callback(CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash, void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs); +void toku_rollback_pe_est_callback( + void* rollback_v, + void* UU(disk_data), + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ); +int toku_rollback_pe_callback ( + void *rollback_v, + PAIR_ATTR old_attr, + void* UU(extraargs), + void (*finalize)(PAIR_ATTR new_attr, void * extra), + void *finalize_extra + ); +bool toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) ; +int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)); +void toku_rollback_clone_callback(void* value_data, void** cloned_value_data, long* clone_size, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs); + +int toku_rollback_cleaner_callback ( + void* UU(ftnode_pv), + BLOCKNUM UU(blocknum), + uint32_t UU(fullhash), + void* UU(extraargs) + ); + +static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT ft) { + CACHETABLE_WRITE_CALLBACK wc; + wc.flush_callback = toku_rollback_flush_callback; + wc.pe_est_callback = toku_rollback_pe_est_callback; + wc.pe_callback = toku_rollback_pe_callback; + wc.cleaner_callback = toku_rollback_cleaner_callback; + wc.clone_callback = toku_rollback_clone_callback; + wc.checkpoint_complete_callback = nullptr; + wc.write_extraargs = ft; + return wc; +} diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback.cc b/storage/tokudb/PerconaFT/ft/txn/rollback.cc new file mode 100644 index 00000000..105f980d --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/rollback.cc @@ -0,0 +1,334 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include <toku_stdint.h> + +#include "ft/serialize/block_table.h" +#include "ft/ft.h" +#include "ft/logger/log-internal.h" +#include "ft/txn/rollback-ct-callbacks.h" + +extern int writing_rollback; + +static void rollback_unpin_remove_callback(CACHEKEY* cachekey, bool for_checkpoint, void* extra) { + FT CAST_FROM_VOIDP(ft, extra); + ft->blocktable.free_blocknum(cachekey, ft, for_checkpoint); +} + +void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) { + int r; + CACHEFILE cf = txn->logger->rollback_cachefile; + FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf)); + r = toku_cachetable_unpin_and_remove (cf, log->ct_pair, rollback_unpin_remove_callback, ft); + assert(r == 0); +} + +int +toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind) { + if (xid<xidfind) return -1; + if (xid>xidfind) return +1; + return 0; +} + +// TODO: fix this name +// toku_rollback_malloc +void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) { + return log->rollentry_arena.malloc_from_arena(size); +} + +// TODO: fix this name +// toku_rollback_memdup +void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len) { + void *r = toku_malloc_in_rollback(log, len); + memcpy(r, v, len); + return r; +} + +static inline PAIR_ATTR make_rollback_pair_attr(long size) { + PAIR_ATTR result={ + .size = size, + .nonleaf_size = 0, + .leaf_size = 0, + .rollback_size = size, + .cache_pressure_size = 0, + .is_valid = true + }; + return result; +} + +PAIR_ATTR +rollback_memory_size(ROLLBACK_LOG_NODE log) { + size_t size = sizeof(*log); + size += log->rollentry_arena.total_footprint(); + return make_rollback_pair_attr(size); +} + +static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) { + ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data); + log->ct_pair = p; +} + +// +// initializes an empty rollback log node +// Does not touch the blocknum, that is the +// responsibility of the caller +// +void rollback_empty_log_init(ROLLBACK_LOG_NODE log) { + // Having a txnid set to TXNID_NONE is how we determine if the + // rollback log node is empty or in use. + log->txnid.parent_id64 = TXNID_NONE; + log->txnid.child_id64 = TXNID_NONE; + + log->layout_version = FT_LAYOUT_VERSION; + log->layout_version_original = FT_LAYOUT_VERSION; + log->layout_version_read_from_disk = FT_LAYOUT_VERSION; + log->dirty = true; + log->sequence = 0; + log->previous = make_blocknum(0); + log->oldest_logentry = NULL; + log->newest_logentry = NULL; + log->rollentry_arena.create(0); + log->rollentry_resident_bytecount = 0; +} + +static void rollback_initialize_for_txn( + ROLLBACK_LOG_NODE log, + TOKUTXN txn, + BLOCKNUM previous + ) +{ + log->txnid = txn->txnid; + log->sequence = txn->roll_info.num_rollback_nodes++; + log->previous = previous; + log->oldest_logentry = NULL; + log->newest_logentry = NULL; + log->rollentry_arena.create(1024); + log->rollentry_resident_bytecount = 0; + log->dirty = true; +} + +// TODO: fix this name +void make_rollback_log_empty(ROLLBACK_LOG_NODE log) { + log->rollentry_arena.destroy(); + rollback_empty_log_init(log); +} + +// create and pin a new rollback log node. chain it to the other rollback nodes +// by providing a previous blocknum and assigning the new rollback log +// node the next sequence number +static void rollback_log_create ( + TOKUTXN txn, + BLOCKNUM previous, + ROLLBACK_LOG_NODE *result + ) +{ + writing_rollback++; + ROLLBACK_LOG_NODE XMALLOC(log); + rollback_empty_log_init(log); + + CACHEFILE cf = txn->logger->rollback_cachefile; + FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf)); + rollback_initialize_for_txn(log, txn, previous); + ft->blocktable.allocate_blocknum(&log->blocknum, ft); + const uint32_t hash = toku_cachetable_hash(ft->cf, log->blocknum); + *result = log; + toku_cachetable_put(cf, log->blocknum, hash, + log, rollback_memory_size(log), + get_write_callbacks_for_rollback_log(ft), + toku_rollback_node_save_ct_pair); + txn->roll_info.current_rollback = log->blocknum; + writing_rollback --; +} + +void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) { + int r; + CACHEFILE cf = txn->logger->rollback_cachefile; + r = toku_cachetable_unpin( + cf, + log->ct_pair, + (enum cachetable_dirty)log->dirty, + rollback_memory_size(log) + ); + assert(r == 0); +} + +//Requires: log is pinned +// log is current +//After: +// Maybe there is no current after (if it spilled) +void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) { + if (log->rollentry_resident_bytecount > txn->logger->write_block_size) { + assert(log->blocknum.b == txn->roll_info.current_rollback.b); + //spill + if (!txn_has_spilled_rollback_logs(txn)) { + //First spilled. Copy to head. + txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback; + } + //Unconditionally copy to tail. Old tail does not need to be cached anymore. + txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback; + + txn->roll_info.current_rollback = ROLLBACK_NONE; + } +} + +int find_filenum (const FT &h, const FT &hfind); +int find_filenum (const FT &h, const FT &hfind) { + FILENUM fnum = toku_cachefile_filenum(h->cf); + FILENUM fnumfind = toku_cachefile_filenum(hfind->cf); + if (fnum.fileid<fnumfind.fileid) return -1; + if (fnum.fileid>fnumfind.fileid) return +1; + return 0; +} + +//Notify a transaction that it has touched an ft. +void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) { + toku_txn_lock(txn); + FT ftv; + uint32_t idx; + int r = txn->open_fts.find_zero<FT, find_filenum>(ft, &ftv, &idx); + if (r == 0) { + // already there + assert(ftv == ft); + goto exit; + } + r = txn->open_fts.insert_at(ft, idx); + assert_zero(r); + // TODO(leif): if there's anything that locks the reflock and then + // the txn lock, this may deadlock, because it grabs the reflock. + toku_ft_add_txn_ref(ft); +exit: + toku_txn_unlock(txn); +} + +// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression) +int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat) +{ + toku_txn_lock(txn); + txn_stat->rollback_raw_count = txn->roll_info.rollentry_raw_count; + txn_stat->rollback_num_entries = txn->roll_info.num_rollentries; + toku_txn_unlock(txn); + return 0; +} + +void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) { + //Currently processing 'log'. Prefetch the next (previous) log node. + + BLOCKNUM name = log->previous; + int r = 0; + if (name.b != ROLLBACK_NONE.b) { + CACHEFILE cf = txn->logger->rollback_cachefile; + uint32_t hash = toku_cachetable_hash(cf, name); + FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf)); + bool doing_prefetch = false; + r = toku_cachefile_prefetch(cf, name, hash, + get_write_callbacks_for_rollback_log(h), + toku_rollback_fetch_callback, + toku_rollback_pf_req_callback, + toku_rollback_pf_callback, + h, + &doing_prefetch); + assert(r == 0); + } +} + +void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log, + TXNID_PAIR txnid, uint64_t sequence) +{ + assert(log->txnid.parent_id64 == txnid.parent_id64); + assert(log->txnid.child_id64 == txnid.child_id64); + assert(log->sequence == sequence); +} + +void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log) { + void * value; + CACHEFILE cf = txn->logger->rollback_cachefile; + FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf)); + uint32_t hash = toku_cachetable_hash(cf, blocknum); + int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash, + &value, + get_write_callbacks_for_rollback_log(h), + toku_rollback_fetch_callback, + toku_rollback_pf_req_callback, + toku_rollback_pf_callback, + PL_WRITE_CHEAP, // lock_type + h, + 0, NULL, NULL + ); + assert(r == 0); + ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value); + assert(pinned_log->blocknum.b == blocknum.b); + *log = pinned_log; +} + +void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) { + ROLLBACK_LOG_NODE pinned_log = NULL; + invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions + if (txn_has_current_rollback_log(txn)) { + toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &pinned_log); + toku_rollback_verify_contents(pinned_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1); + } else { + // For each transaction, we try to acquire the first rollback log + // from the rollback log node cache, so that we avoid + // putting something new into the cachetable. However, + // if transaction has spilled rollbacks, that means we + // have already done a lot of work for this transaction, + // and subsequent rollback log nodes are created + // and put into the cachetable. The idea is for + // transactions that don't do a lot of work to (hopefully) + // get a rollback log node from a cache, as opposed to + // taking the more expensive route of creating a new one. + if (!txn_has_spilled_rollback_logs(txn)) { + txn->logger->rollback_cache.get_rollback_log_node(txn, &pinned_log); + if (pinned_log != NULL) { + rollback_initialize_for_txn( + pinned_log, + txn, + txn->roll_info.spilled_rollback_tail + ); + txn->roll_info.current_rollback = pinned_log->blocknum; + } + } + if (pinned_log == NULL) { + rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, &pinned_log); + } + } + assert(pinned_log->txnid.parent_id64 == txn->txnid.parent_id64); + assert(pinned_log->txnid.child_id64 == txn->txnid.child_id64); + assert(pinned_log->blocknum.b != ROLLBACK_NONE.b); + *log = pinned_log; +} diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback.h b/storage/tokudb/PerconaFT/ft/txn/rollback.h new file mode 100644 index 00000000..359f2317 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/rollback.h @@ -0,0 +1,145 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +#include "ft/cachetable/cachetable.h" +#include "ft/serialize/sub_block.h" +#include "ft/txn/txn.h" + +#include "util/memarena.h" + +typedef struct rollback_log_node *ROLLBACK_LOG_NODE; +typedef struct serialized_rollback_log_node *SERIALIZED_ROLLBACK_LOG_NODE; + +void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint); + +// these functions assert internally that they succeed + +// get a rollback node this txn may use for a new entry. if there +// is a current rollback node to use, pin it, otherwise create one. +void toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn, ROLLBACK_LOG_NODE *log); + +// get a specific rollback by blocknum +void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log); + +// unpin a rollback node from the cachetable +void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log); + +// assert that the given log's txnid and sequence match the ones given +void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log, TXNID_PAIR txnid, uint64_t sequence); + +// if there is a previous rollback log for the given log node, prefetch it +void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log); + +// unpin and rmove a rollback log from the cachetable +void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log); + +void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size); +void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len); + +// given a transaction and a log node, and if the log is too full, +// set the current rollback log to ROLLBACK_NONE and move the current +// node onto the tail of the rollback node chain. further insertions +// into the rollback log for this transaction will force the creation +// of a new rollback log. +// +// this never unpins the rollback log if a spill occurs. the caller +// is responsible for ensuring the given rollback node is unpinned +// if necessary. +void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log); + +void toku_txn_maybe_note_ft (TOKUTXN txn, struct ft *ft); +int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat); + +int toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind); + +PAIR_ATTR rollback_memory_size(ROLLBACK_LOG_NODE log); + +// A high-level rollback log is made up of a chain of rollback log nodes. +// Each rollback log node is represented (separately) in the cachetable by +// this structure. Each portion of the rollback log chain has a block num +// and a hash to identify it. +struct rollback_log_node { + int layout_version; + int layout_version_original; + int layout_version_read_from_disk; + uint32_t build_id; // build_id (svn rev number) of software that wrote this node to disk + int dirty; + // to which transaction does this node belong? + TXNID_PAIR txnid; + // sequentially, where in the rollback log chain is this node? + // the sequence is between 0 and totalnodes-1 + uint64_t sequence; + BLOCKNUM blocknum; // on which block does this node live? + // which block number is the previous in the chain of rollback nodes + // that make up this rollback log? + BLOCKNUM previous; + struct roll_entry *oldest_logentry; + struct roll_entry *newest_logentry; + memarena rollentry_arena; + size_t rollentry_resident_bytecount; // How many bytes for the rollentries that are stored in main memory. + PAIR ct_pair; +}; + +struct serialized_rollback_log_node { + char *data; + uint32_t len; + int n_sub_blocks; + BLOCKNUM blocknum; + struct sub_block sub_block[max_sub_blocks]; +}; +typedef struct serialized_rollback_log_node *SERIALIZED_ROLLBACK_LOG_NODE; + +static inline void +toku_static_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) { + toku_free(log->data); +} + +static inline void +toku_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) { + toku_static_serialized_rollback_log_destroy(log); + toku_free(log); +} + +void rollback_empty_log_init(ROLLBACK_LOG_NODE log); +void make_rollback_log_empty(ROLLBACK_LOG_NODE log); + +static inline bool rollback_log_is_unused(ROLLBACK_LOG_NODE log) { + return (log->txnid.parent_id64 == TXNID_NONE); +} diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.cc b/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.cc new file mode 100644 index 00000000..5e1ab746 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.cc @@ -0,0 +1,109 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include <memory.h> +#include <portability/toku_portability.h> + +#include "txn/rollback_log_node_cache.h" + +toku_instr_key* rollback_log_node_cache_mutex_key; + +void rollback_log_node_cache::init(uint32_t max_num_avail_nodes) { + XMALLOC_N(max_num_avail_nodes, m_avail_blocknums); + m_max_num_avail = max_num_avail_nodes; + m_first = 0; + m_num_avail = 0; + toku_pthread_mutexattr_t attr; + toku_mutexattr_init(&attr); + toku_mutexattr_settype(&attr, TOKU_MUTEX_ADAPTIVE); + toku_mutex_init(*rollback_log_node_cache_mutex_key, &m_mutex, &attr); + toku_mutexattr_destroy(&attr); +} + +void rollback_log_node_cache::destroy() { + toku_mutex_destroy(&m_mutex); + toku_free(m_avail_blocknums); +} + +// returns true if rollback log node was successfully added, +// false otherwise +bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE log){ + bool retval = false; + toku_mutex_lock(&m_mutex); + if (m_num_avail < m_max_num_avail) { + retval = true; + uint32_t index = m_first + m_num_avail; + if (index >= m_max_num_avail) { + index -= m_max_num_avail; + } + m_avail_blocknums[index].b = log->blocknum.b; + m_num_avail++; + } + toku_mutex_unlock(&m_mutex); + // + // now unpin the rollback log node + // + if (retval) { + make_rollback_log_empty(log); + toku_rollback_log_unpin(txn, log); + } + return retval; +} + +// if a rollback log node is available, will set log to it, +// otherwise, will set log to NULL and caller is on his own +// for getting a rollback log node +void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log){ + BLOCKNUM b = ROLLBACK_NONE; + toku_mutex_lock(&m_mutex); + if (m_num_avail > 0) { + b.b = m_avail_blocknums[m_first].b; + m_num_avail--; + if (++m_first >= m_max_num_avail) { + m_first = 0; + } + } + toku_mutex_unlock(&m_mutex); + if (b.b != ROLLBACK_NONE.b) { + toku_get_and_pin_rollback_log(txn, b, log); + invariant(rollback_log_is_unused(*log)); + } else { + *log = NULL; + } +} + diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.h b/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.h new file mode 100644 index 00000000..c7f1b9a2 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/rollback_log_node_cache.h @@ -0,0 +1,63 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +#include "ft/txn/rollback.h" + +class rollback_log_node_cache { +public: + void init (uint32_t max_num_avail_nodes); + void destroy(); + // returns true if rollback log node was successfully added, + // false otherwise + bool give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE log); + // if a rollback log node is available, will set log to it, + // otherwise, will set log to NULL and caller is on his own + // for getting a rollback log node + void get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log); + +private: + BLOCKNUM* m_avail_blocknums; + uint32_t m_first; + uint32_t m_num_avail; + uint32_t m_max_num_avail; + toku_mutex_t m_mutex; +}; + +ENSURE_POD(rollback_log_node_cache); diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.cc b/storage/tokudb/PerconaFT/ft/txn/txn.cc new file mode 100644 index 00000000..7152833d --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/txn.cc @@ -0,0 +1,754 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "ft/cachetable/checkpoint.h" +#include "ft/ft.h" +#include "ft/logger/log-internal.h" +#include "ft/ule.h" +#include "ft/txn/rollback-apply.h" +#include "ft/txn/txn.h" +#include "ft/txn/txn_manager.h" +#include "util/status.h" + +toku_instr_key *txn_lock_mutex_key; +toku_instr_key *txn_state_lock_mutex_key; +toku_instr_key *result_state_cond_key; + +void toku_txn_get_status(TXN_STATUS s) { + txn_status.init(); + *s = txn_status; +} + +void +toku_txn_lock(TOKUTXN txn) +{ + toku_mutex_lock(&txn->txn_lock); +} + +void +toku_txn_unlock(TOKUTXN txn) +{ + toku_mutex_unlock(&txn->txn_lock); +} + +uint64_t +toku_txn_get_root_id(TOKUTXN txn) +{ + return txn->txnid.parent_id64; +} + +bool txn_declared_read_only(TOKUTXN txn) { + return txn->declared_read_only; +} + +int +toku_txn_begin_txn ( + DB_TXN *container_db_txn, + TOKUTXN parent_tokutxn, + TOKUTXN *tokutxn, + TOKULOGGER logger, + TXN_SNAPSHOT_TYPE snapshot_type, + bool read_only + ) +{ + int r = toku_txn_begin_with_xid( + parent_tokutxn, + tokutxn, + logger, + TXNID_PAIR_NONE, + snapshot_type, + container_db_txn, + false, // for_recovery + read_only + ); + return r; +} + + +static void +txn_create_xids(TOKUTXN txn, TOKUTXN parent) { + XIDS xids; + XIDS parent_xids; + if (parent == NULL) { + parent_xids = toku_xids_get_root_xids(); + } else { + parent_xids = parent->xids; + } + toku_xids_create_unknown_child(parent_xids, &xids); + TXNID finalized_xid = (parent == NULL) ? txn->txnid.parent_id64 : txn->txnid.child_id64; + toku_xids_finalize_with_child(xids, finalized_xid); + txn->xids = xids; +} + +// Allocate and initialize a txn +static void toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, bool for_checkpoint, bool read_only); + +int +toku_txn_begin_with_xid ( + TOKUTXN parent, + TOKUTXN *txnp, + TOKULOGGER logger, + TXNID_PAIR xid, + TXN_SNAPSHOT_TYPE snapshot_type, + DB_TXN *container_db_txn, + bool for_recovery, + bool read_only + ) +{ + int r = 0; + TOKUTXN txn; + // check for case where we are trying to + // create too many nested transactions + if (!read_only && parent && !toku_xids_can_create_child(parent->xids)) { + r = EINVAL; + goto exit; + } + if (read_only && parent) { + invariant(txn_declared_read_only(parent)); + } + toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, for_recovery, read_only); + // txnid64, snapshot_txnid64 + // will be set in here. + if (for_recovery) { + if (parent == NULL) { + invariant(xid.child_id64 == TXNID_NONE); + toku_txn_manager_start_txn_for_recovery( + txn, + logger->txn_manager, + xid.parent_id64 + ); + } + else { + parent->child_manager->start_child_txn_for_recovery(txn, parent, xid); + } + } + else { + assert(xid.parent_id64 == TXNID_NONE); + assert(xid.child_id64 == TXNID_NONE); + if (parent == NULL) { + toku_txn_manager_start_txn( + txn, + logger->txn_manager, + snapshot_type, + read_only + ); + } + else { + parent->child_manager->start_child_txn(txn, parent); + toku_txn_manager_handle_snapshot_create_for_child_txn( + txn, + logger->txn_manager, + snapshot_type + ); + } + } + if (!read_only) { + // this call will set txn->xids + txn_create_xids(txn, parent); + } + toku_unsafe_set(txnp, txn); +exit: + return r; +} + +DB_TXN * +toku_txn_get_container_db_txn (TOKUTXN tokutxn) { + DB_TXN * container = tokutxn->container_db_txn; + return container; +} + +void toku_txn_set_container_db_txn (TOKUTXN tokutxn, DB_TXN*container) { + tokutxn->container_db_txn = container; +} + +static void invalidate_xa_xid (TOKU_XA_XID *xid) { + TOKU_ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind + xid->formatID = -1; // According to the XA spec, -1 means "invalid data" +} + +static void toku_txn_create_txn ( + TOKUTXN *tokutxn, + TOKUTXN parent_tokutxn, + TOKULOGGER logger, + TXN_SNAPSHOT_TYPE snapshot_type, + DB_TXN *container_db_txn, + bool for_recovery, + bool read_only + ) +{ + assert(logger->rollback_cachefile); + + omt<FT> open_fts; + open_fts.create_no_array(); + + struct txn_roll_info roll_info = { + .num_rollback_nodes = 0, + .num_rollentries = 0, + .num_rollentries_processed = 0, + .rollentry_raw_count = 0, + .spilled_rollback_head = ROLLBACK_NONE, + .spilled_rollback_tail = ROLLBACK_NONE, + .current_rollback = ROLLBACK_NONE, + }; + +static txn_child_manager tcm; + +struct tokutxn new_txn = { + .txnid = {.parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE }, + .snapshot_txnid64 = TXNID_NONE, + .snapshot_type = for_recovery ? TXN_SNAPSHOT_NONE : snapshot_type, + .for_recovery = for_recovery, + .logger = logger, + .parent = parent_tokutxn, + .child = NULL, + .child_manager_s = tcm, + .child_manager = NULL, + .container_db_txn = container_db_txn, + .live_root_txn_list = nullptr, + .xids = NULL, + .snapshot_next = NULL, + .snapshot_prev = NULL, + .begin_was_logged = false, + .declared_read_only = read_only, + .do_fsync = false, + .force_fsync_on_commit = false, + .do_fsync_lsn = ZERO_LSN, + .xa_xid = {0, 0, 0, ""}, + .progress_poll_fun = NULL, + .progress_poll_fun_extra = NULL, + + // You cannot initialize txn_lock a TOKU_MUTEX_INITIALIZER, because we + // will initialize it in the code below, and it cannot already + // be initialized at that point. Also, in general, you don't + // get to use PTHREAD_MUTEX_INITALIZER (which is what is inside + // TOKU_MUTEX_INITIALIZER) except in static variables, and this + // is initializing an auto variable. + // + // And we cannot simply avoid initializing these fields + // because, although it avoids -Wmissing-field-initializer + // errors under gcc, it gets other errors about non-trivial + // designated initializers not being supported. + + .txn_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER + .open_fts = open_fts, + .roll_info = roll_info, + .state_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER + .state_cond = ZERO_COND_INITIALIZER, // Not TOKU_COND_INITIALIZER + .state = TOKUTXN_LIVE, + .num_pin = 0, + .client_id = 0, + .client_extra = nullptr, + .start_time = time(NULL), +}; + +TOKUTXN result = NULL; +XMEMDUP(result, &new_txn); +invalidate_xa_xid(&result->xa_xid); +if (parent_tokutxn == NULL) { + result->child_manager = &result->child_manager_s; + result->child_manager->init(result); + } + else { + result->child_manager = parent_tokutxn->child_manager; + } + + toku_mutex_init(*txn_lock_mutex_key, &result->txn_lock, nullptr); + + toku_pthread_mutexattr_t attr; + toku_mutexattr_init(&attr); + toku_mutexattr_settype(&attr, TOKU_MUTEX_ADAPTIVE); + toku_mutex_init(*txn_state_lock_mutex_key, &result->state_lock, &attr); + toku_mutexattr_destroy(&attr); + + toku_cond_init(*result_state_cond_key, &result->state_cond, nullptr); + + *tokutxn = result; + + if (read_only) { + TXN_STATUS_INC(TXN_READ_BEGIN, 1); + } + else { + TXN_STATUS_INC(TXN_BEGIN, 1); + } +} + +void +toku_txn_update_xids_in_txn(TOKUTXN txn, TXNID xid) +{ + // these should not have been set yet + invariant(txn->txnid.parent_id64 == TXNID_NONE); + invariant(txn->txnid.child_id64 == TXNID_NONE); + txn->txnid.parent_id64 = xid; + txn->txnid.child_id64 = TXNID_NONE; +} + +//Used on recovery to recover a transaction. +int +toku_txn_load_txninfo (TOKUTXN txn, struct txninfo *info) { + txn->roll_info.rollentry_raw_count = info->rollentry_raw_count; + uint32_t i; + for (i = 0; i < info->num_fts; i++) { + FT ft = info->open_fts[i]; + toku_txn_maybe_note_ft(txn, ft); + } + txn->force_fsync_on_commit = info->force_fsync_on_commit; + txn->roll_info.num_rollback_nodes = info->num_rollback_nodes; + txn->roll_info.num_rollentries = info->num_rollentries; + + txn->roll_info.spilled_rollback_head = info->spilled_rollback_head; + txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail; + txn->roll_info.current_rollback = info->current_rollback; + return 0; +} + +int toku_txn_commit_txn(TOKUTXN txn, int nosync, + TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) +// Effect: Doesn't close the txn, just performs the commit operations. +// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) +{ + return toku_txn_commit_with_lsn(txn, nosync, ZERO_LSN, + poll, poll_extra); +} + +struct xcommit_info { + int r; + TOKUTXN txn; +}; + +static void txn_note_commit(TOKUTXN txn) { + // Purpose: + // Delay until any indexer is done pinning this transaction. + // Update status of a transaction from live->committing (or prepared->committing) + // Do so in a thread-safe manner that does not conflict with hot indexing or + // begin checkpoint. + if (toku_txn_is_read_only(txn)) { + // Neither hot indexing nor checkpoint do any work with readonly txns, + // so we can skip taking the txn_manager lock here. + invariant(txn->state==TOKUTXN_LIVE); + txn->state = TOKUTXN_COMMITTING; + goto done; + } + if (txn->state==TOKUTXN_PREPARING) { + invalidate_xa_xid(&txn->xa_xid); + } + // for hot indexing, if hot index is processing + // this transaction in some leafentry, then we cannot change + // the state to commit or abort until + // hot index is done with that leafentry + toku_txn_lock_state(txn); + while (txn->num_pin > 0) { + toku_cond_wait( + &txn->state_cond, + &txn->state_lock + ); + } + txn->state = TOKUTXN_COMMITTING; + toku_txn_unlock_state(txn); +done: + return; +} + +int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn, + TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) +{ + // there should be no child when we commit or abort a TOKUTXN + invariant(txn->child == NULL); + txn_note_commit(txn); + + // Child transactions do not actually 'commit'. They promote their + // changes to parent, so no need to fsync if this txn has a parent. The + // do_sync state is captured in the txn for txn_maybe_fsync_log function + // Additionally, if the transaction was first prepared, we do not need to + // fsync because the prepare caused an fsync of the log. In this case, + // we do not need an additional of the log. We rely on the client running + // recovery to properly recommit this transaction if the commit + // does not make it to disk. In the case of MySQL, that would be the + // binary log. + txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0)); + + txn->progress_poll_fun = poll; + txn->progress_poll_fun_extra = poll_extra; + + if (!toku_txn_is_read_only(txn)) { + toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid); + } + // If !txn->begin_was_logged, we could skip toku_rollback_commit + // but it's cheap (only a number of function calls that return immediately) + // since there were no writes. Skipping it would mean we would need to be careful + // in case we added any additional required cleanup into those functions in the future. + int r = toku_rollback_commit(txn, oplsn); + TXN_STATUS_INC(TXN_COMMIT, 1); + return r; +} + +int toku_txn_abort_txn(TOKUTXN txn, + TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) +// Effect: Doesn't close the txn, just performs the abort operations. +// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) +{ + return toku_txn_abort_with_lsn(txn, ZERO_LSN, poll, poll_extra); +} + +static void txn_note_abort(TOKUTXN txn) { + // Purpose: + // Delay until any indexer is done pinning this transaction. + // Update status of a transaction from live->aborting (or prepared->aborting) + // Do so in a thread-safe manner that does not conflict with hot indexing or + // begin checkpoint. + if (toku_txn_is_read_only(txn)) { + // Neither hot indexing nor checkpoint do any work with readonly txns, + // so we can skip taking the state lock here. + invariant(txn->state==TOKUTXN_LIVE); + txn->state = TOKUTXN_ABORTING; + goto done; + } + if (txn->state==TOKUTXN_PREPARING) { + invalidate_xa_xid(&txn->xa_xid); + } + // for hot indexing, if hot index is processing + // this transaction in some leafentry, then we cannot change + // the state to commit or abort until + // hot index is done with that leafentry + toku_txn_lock_state(txn); + while (txn->num_pin > 0) { + toku_cond_wait( + &txn->state_cond, + &txn->state_lock + ); + } + txn->state = TOKUTXN_ABORTING; + toku_txn_unlock_state(txn); +done: + return; +} + +int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn, + TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) +{ + // there should be no child when we commit or abort a TOKUTXN + invariant(txn->child == NULL); + txn_note_abort(txn); + + txn->progress_poll_fun = poll; + txn->progress_poll_fun_extra = poll_extra; + txn->do_fsync = false; + + if (!toku_txn_is_read_only(txn)) { + toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid); + } + // If !txn->begin_was_logged, we could skip toku_rollback_abort + // but it's cheap (only a number of function calls that return immediately) + // since there were no writes. Skipping it would mean we would need to be careful + // in case we added any additional required cleanup into those functions in the future. + int r = toku_rollback_abort(txn, oplsn); + TXN_STATUS_INC(TXN_ABORT, 1); + return r; +} + +static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) { + TOKU_ANNOTATE_NEW_MEMORY(dest, sizeof(*dest)); + dest->formatID = source->formatID; + dest->gtrid_length = source->gtrid_length; + dest->bqual_length = source->bqual_length; + memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length); +} + +void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid, int nosync) { + if (txn->parent || toku_txn_is_read_only(txn)) { + // We do not prepare children. + // + // Readonly transactions do the same if they commit or abort, so + // XA guarantees are free. No need to pay for overhead of prepare. + return; + } + assert(txn->state==TOKUTXN_LIVE); + // This state transition must be protected against begin_checkpoint + // Therefore, the caller must have the mo lock held + toku_txn_lock_state(txn); + txn->state = TOKUTXN_PREPARING; + toku_txn_unlock_state(txn); + // Do we need to do an fsync? + txn->do_fsync = txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0); + copy_xid(&txn->xa_xid, xa_xid); + // This list will go away with #4683, so we wn't need the ydb lock for this anymore. + toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid, xa_xid); +} + +void toku_txn_get_prepared_xa_xid (TOKUTXN txn, TOKU_XA_XID *xid) { + copy_xid(xid, &txn->xa_xid); +} + +int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) { + return toku_txn_manager_recover_root_txn( + logger->txn_manager, + preplist, + count, + retp, + flags + ); +} + +void toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, bool do_fsync) { + if (logger && do_fsync) { + toku_logger_fsync_if_lsn_not_fsynced(logger, do_fsync_lsn); + } +} + +void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn) { + *do_fsync = ttxn->do_fsync; + *do_fsync_lsn = ttxn->do_fsync_lsn; +} + +void toku_txn_close_txn(TOKUTXN txn) { + toku_txn_complete_txn(txn); + toku_txn_destroy_txn(txn); +} + +int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn); +int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const UU(txn)) +// Effect: This function is called on every open FT that a transaction used. +// This function removes the transaction from that FT. +{ + toku_ft_remove_txn_ref(h); + + return 0; +} + +// for every ft in txn, remove it. +static void note_txn_closing (TOKUTXN txn) { + txn->open_fts.iterate<struct tokutxn, remove_txn>(txn); +} + +void toku_txn_complete_txn(TOKUTXN txn) { + assert(txn->roll_info.spilled_rollback_head.b == ROLLBACK_NONE.b); + assert(txn->roll_info.spilled_rollback_tail.b == ROLLBACK_NONE.b); + assert(txn->roll_info.current_rollback.b == ROLLBACK_NONE.b); + assert(txn->num_pin == 0); + assert(txn->state == TOKUTXN_COMMITTING || txn->state == TOKUTXN_ABORTING || txn->state == TOKUTXN_PREPARING); + if (txn->parent) { + toku_txn_manager_handle_snapshot_destroy_for_child_txn( + txn, + txn->logger->txn_manager, + txn->snapshot_type + ); + txn->parent->child_manager->finish_child_txn(txn); + } + else { + toku_txn_manager_finish_txn(txn->logger->txn_manager, txn); + txn->child_manager->destroy(); + } + // note that here is another place we depend on + // this function being called with the multi operation lock + note_txn_closing(txn); +} + +void toku_txn_destroy_txn(TOKUTXN txn) { + txn->open_fts.destroy(); + if (txn->xids) { + toku_xids_destroy(&txn->xids); + } + toku_mutex_destroy(&txn->txn_lock); + toku_mutex_destroy(&txn->state_lock); + toku_cond_destroy(&txn->state_cond); + toku_free(txn); +} + +XIDS toku_txn_get_xids (TOKUTXN txn) { + if (txn==0) return toku_xids_get_root_xids(); + else return txn->xids; +} + +void toku_txn_force_fsync_on_commit(TOKUTXN txn) { + txn->force_fsync_on_commit = true; +} + +TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) { + TXNID xid; + if (txn->live_root_txn_list->size()>0) { + int r = txn->live_root_txn_list->fetch(0, &xid); + assert_zero(r); + } + else { + xid = TXNID_NONE; + } + return xid; +} + +bool toku_is_txn_in_live_root_txn_list(const xid_omt_t &live_root_txn_list, TXNID xid) { + TXNID txnid; + bool retval = false; + int r = live_root_txn_list.find_zero<TXNID, toku_find_xid_by_xid>(xid, &txnid, nullptr); + if (r==0) { + invariant(txnid == xid); + retval = true; + } + else { + invariant(r==DB_NOTFOUND); + } + return retval; +} + +TOKUTXN_STATE +toku_txn_get_state(TOKUTXN txn) { + return txn->state; +} + +static void +maybe_log_begin_txn_for_write_operation_unlocked(TOKUTXN txn) { + // We now hold the lock. + if (txn->begin_was_logged) { + return; + } + TOKUTXN parent; + parent = txn->parent; + TXNID_PAIR xid; + xid = txn->txnid; + TXNID_PAIR pxid; + pxid = TXNID_PAIR_NONE; + if (parent) { + // Recursively log parent first if necessary. + // Transactions cannot do work if they have children, + // so the lowest level child's lock is sufficient for ancestors. + maybe_log_begin_txn_for_write_operation_unlocked(parent); + pxid = parent->txnid; + } + + toku_log_xbegin(txn->logger, NULL, 0, xid, pxid); + txn->begin_was_logged = true; +} + +void +toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn) { + toku_txn_lock(txn); + maybe_log_begin_txn_for_write_operation_unlocked(txn); + toku_txn_unlock(txn); +} + +bool +toku_txn_is_read_only(TOKUTXN txn) { + // No need to recursively check children because parents are + // recursively logged before children. + if (!txn->begin_was_logged) { + // Did no work. + invariant(txn->roll_info.num_rollentries == 0); + invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn); + invariant(txn->open_fts.size() == 0); + invariant(txn->num_pin==0); + return true; + } + return false; +} + +// needed for hot indexing +void toku_txn_lock_state(TOKUTXN txn) { + toku_mutex_lock(&txn->state_lock); +} +void toku_txn_unlock_state(TOKUTXN txn){ + toku_mutex_unlock(&txn->state_lock); +} + + +// prevents a client thread from transitioning txn from LIVE|PREPARING -> COMMITTING|ABORTING +// hot indexing may need a transactions to stay in the LIVE|PREPARING state while it processes +// a leafentry. +void toku_txn_pin_live_txn_unlocked(TOKUTXN txn) { + assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); + assert(!toku_txn_is_read_only(txn)); + txn->num_pin++; +} + +// allows a client thread to go back to being able to transition txn +// from LIVE|PREPARING -> COMMITTING|ABORTING +void toku_txn_unpin_live_txn(TOKUTXN txn) { + assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); + assert(txn->num_pin > 0); + toku_txn_lock_state(txn); + txn->num_pin--; + if (txn->num_pin == 0) { + toku_cond_broadcast(&txn->state_cond); + } + toku_txn_unlock_state(txn); +} + +bool toku_txn_has_spilled_rollback(TOKUTXN txn) { + return txn_has_spilled_rollback_logs(txn); +} + +void toku_txn_get_client_id(TOKUTXN txn, uint64_t *client_id, void **client_extra) { + if (client_id) *client_id = txn->client_id; + if (client_extra) *client_extra = txn->client_extra; +} + +void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *client_extra) { + txn->client_id = client_id; + txn->client_extra = client_extra; +} + +time_t toku_txn_get_start_time(struct tokutxn *txn) { + return txn->start_time; +} + +extern uint force_recovery; +int toku_txn_reads_txnid(TXNID txnid, TOKUTXN txn, bool is_provisional UU()) { + if(force_recovery) { + return TOKUDB_ACCEPT; + } + int r = 0; + TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(txn); + if (oldest_live_in_snapshot == TXNID_NONE && txnid < txn->snapshot_txnid64) { + r = TOKUDB_ACCEPT; + } else if (txnid < oldest_live_in_snapshot || txnid == txn->txnid.parent_id64) { + r = TOKUDB_ACCEPT; + } else if (txnid > txn->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(*txn->live_root_txn_list, txnid)) { + r = 0; + } else { + r = TOKUDB_ACCEPT; + } + return r; +} + +int toku_txn_discard_txn(TOKUTXN txn) { + int r = toku_rollback_discard(txn); + return r; +} + +#include <toku_race_tools.h> +void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void); +void toku_txn_status_helgrind_ignore(void) { + TOKU_VALGRIND_HG_DISABLE_CHECKING(&txn_status, sizeof txn_status); +} diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.h b/storage/tokudb/PerconaFT/ft/txn/txn.h new file mode 100644 index 00000000..34a76aa9 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/txn.h @@ -0,0 +1,362 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +#include "portability/toku_stdint.h" + +#include "ft/txn/txn_state.h" +#include "ft/serialize/block_table.h" +#include "ft/ft-status.h" +#include "util/omt.h" + +typedef uint64_t TXNID; + +typedef struct tokutxn *TOKUTXN; + +#define TXNID_NONE_LIVING ((TXNID)0) +#define TXNID_NONE ((TXNID)0) +#define TXNID_MAX ((TXNID)-1) + +typedef struct txnid_pair_s { + TXNID parent_id64; + TXNID child_id64; +} TXNID_PAIR; + +static const TXNID_PAIR TXNID_PAIR_NONE = { .parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE }; + +// We include the child manager here beacuse it uses the TXNID / TOKUTXN types +#include "ft/txn/txn_child_manager.h" + +/* Log Sequence Number (LSN) + * Make the LSN be a struct instead of an integer so that we get better type checking. */ +typedef struct __toku_lsn { uint64_t lsn; } LSN; +static const LSN ZERO_LSN = { .lsn = 0 }; +static const LSN MAX_LSN = { .lsn = UINT64_MAX }; + +// +// Types of snapshots that can be taken by a tokutxn +// - TXN_SNAPSHOT_NONE: means that there is no snapshot. Reads do not use snapshot reads. +// used for SERIALIZABLE and READ UNCOMMITTED +// - TXN_SNAPSHOT_ROOT: means that all tokutxns use their root transaction's snapshot +// used for REPEATABLE READ +// - TXN_SNAPSHOT_CHILD: means that each child tokutxn creates its own snapshot +// used for READ COMMITTED +// + +typedef enum __TXN_SNAPSHOT_TYPE { + TXN_SNAPSHOT_NONE=0, + TXN_SNAPSHOT_ROOT=1, + TXN_SNAPSHOT_CHILD=2, + TXN_COPIES_SNAPSHOT=3 +} TXN_SNAPSHOT_TYPE; + +typedef toku::omt<struct tokutxn *> txn_omt_t; +typedef toku::omt<TXNID> xid_omt_t; +typedef toku::omt<struct referenced_xid_tuple, struct referenced_xid_tuple *> rx_omt_t; + +inline bool txn_pair_is_none(TXNID_PAIR txnid) { + return txnid.parent_id64 == TXNID_NONE && txnid.child_id64 == TXNID_NONE; +} + +struct tokulogger; + +struct txn_roll_info { + // these are number of rollback nodes and rollback entries for this txn. + // + // the current rollback node below has sequence number num_rollback_nodes - 1 + // (because they are numbered 0...num-1). often, the current rollback is + // already set to this block num, which means it exists and is available to + // log some entries. if the current rollback is NONE and the number of + // rollback nodes for this transaction is non-zero, then we will use + // the number of rollback nodes to know which sequence number to assign + // to a new one we create + uint64_t num_rollback_nodes; + uint64_t num_rollentries; + uint64_t num_rollentries_processed; + uint64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children. + + // spilled rollback nodes are rollback nodes that were gorged by this + // transaction, retired, and saved in a list. + + // the spilled rollback head is the block number of the first rollback node + // that makes up the rollback log chain + BLOCKNUM spilled_rollback_head; + + // the spilled rollback is the block number of the last rollback node that + // makes up the rollback log chain. + BLOCKNUM spilled_rollback_tail; + + // the current rollback node block number we may use. if this is ROLLBACK_NONE, + // then we need to create one and set it here before using it. + BLOCKNUM current_rollback; +}; + +struct tokutxn { + // These don't change after create: + + TXNID_PAIR txnid; + + uint64_t snapshot_txnid64; // this is the lsn of the snapshot + const TXN_SNAPSHOT_TYPE snapshot_type; + const bool for_recovery; + struct tokulogger *const logger; + struct tokutxn *const parent; + // The child txn is protected by the child_txn_manager lock + // and by the user contract. The user contract states (and is + // enforced at the ydb layer) that a child txn should not be created + // while another child exists. The txn_child_manager will protect + // other threads from trying to read this value while another + // thread commits/aborts the child + struct tokutxn *child; + + // statically allocated child manager, if this + // txn is a root txn, this manager will be used and set to + // child_manager for this transaction and all of its children + txn_child_manager child_manager_s; + + // child manager for this transaction, all of its children, + // and all of its ancestors + txn_child_manager* child_manager; + + // These don't change but they're created in a way that's hard to make + // strictly const. + DB_TXN *container_db_txn; // reference to DB_TXN that contains this tokutxn + xid_omt_t *live_root_txn_list; // the root txns live when the root ancestor (self if a root) started. + struct XIDS_S *xids; // Represents the xid list + + struct tokutxn *snapshot_next; + struct tokutxn *snapshot_prev; + + bool begin_was_logged; + bool declared_read_only; // true if the txn was declared read only when began + + // These are not read until a commit, prepare, or abort starts, and + // they're "monotonic" (only go false->true) during operation: + bool do_fsync; + bool force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn) + + // Not used until commit, prepare, or abort starts: + LSN do_fsync_lsn; + TOKU_XA_XID xa_xid; // for prepared transactions + TXN_PROGRESS_POLL_FUNCTION progress_poll_fun; + void *progress_poll_fun_extra; + + toku_mutex_t txn_lock; + // Protected by the txn lock: + toku::omt<struct ft*> open_fts; // a collection of the fts that we touched. Indexed by filenum. + struct txn_roll_info roll_info; // Info used to manage rollback entries + + // mutex that protects the transition of the state variable + // the rest of the variables are used by the txn code and + // hot indexing to ensure that when hot indexing is processing a + // leafentry, a TOKUTXN cannot dissappear or change state out from + // underneath it + toku_mutex_t state_lock; + toku_cond_t state_cond; + TOKUTXN_STATE state; + uint32_t num_pin; // number of threads (all hot indexes) that want this + // txn to not transition to commit or abort + uint64_t client_id; + void *client_extra; + time_t start_time; +}; +typedef struct tokutxn *TOKUTXN; + +void toku_txn_lock(struct tokutxn *txn); +void toku_txn_unlock(struct tokutxn *txn); + +uint64_t toku_txn_get_root_id(struct tokutxn *txn); +bool txn_declared_read_only(struct tokutxn *txn); + +int toku_txn_begin_txn ( + DB_TXN *container_db_txn, + struct tokutxn *parent_tokutxn, + struct tokutxn **tokutxn, + struct tokulogger *logger, + TXN_SNAPSHOT_TYPE snapshot_type, + bool read_only + ); + +DB_TXN * toku_txn_get_container_db_txn (struct tokutxn *tokutxn); +void toku_txn_set_container_db_txn(struct tokutxn *txn, DB_TXN *db_txn); + +// toku_txn_begin_with_xid is called from recovery and has no containing DB_TXN +int toku_txn_begin_with_xid ( + struct tokutxn *parent_tokutxn, + struct tokutxn **tokutxn, + struct tokulogger *logger, + TXNID_PAIR xid, + TXN_SNAPSHOT_TYPE snapshot_type, + DB_TXN *container_db_txn, + bool for_recovery, + bool read_only + ); + +void toku_txn_update_xids_in_txn(struct tokutxn *txn, TXNID xid); + +int toku_txn_load_txninfo (struct tokutxn *txn, struct txninfo *info); + +int toku_txn_commit_txn (struct tokutxn *txn, int nosync, + TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); +int toku_txn_commit_with_lsn(struct tokutxn *txn, int nosync, LSN oplsn, + TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); + +int toku_txn_abort_txn(struct tokutxn *txn, + TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); +int toku_txn_abort_with_lsn(struct tokutxn *txn, LSN oplsn, + TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); + +int toku_txn_discard_txn(struct tokutxn *txn); + +void toku_txn_prepare_txn (struct tokutxn *txn, TOKU_XA_XID *xid, int nosync); +// Effect: Do the internal work of preparing a transaction (does not log the prepare record). + +void toku_txn_get_prepared_xa_xid(struct tokutxn *txn, TOKU_XA_XID *xa_xid); +// Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values. + +void toku_txn_maybe_fsync_log(struct tokulogger *logger, LSN do_fsync_lsn, bool do_fsync); + +void toku_txn_get_fsync_info(struct tokutxn *ttxn, bool* do_fsync, LSN* do_fsync_lsn); + +// Complete and destroy a txn +void toku_txn_close_txn(struct tokutxn *txn); + +// Remove a txn from any live txn lists +void toku_txn_complete_txn(struct tokutxn *txn); + +// Free the memory of a txn +void toku_txn_destroy_txn(struct tokutxn *txn); + +struct XIDS_S *toku_txn_get_xids(struct tokutxn *txn); + +// Force fsync on commit +void toku_txn_force_fsync_on_commit(struct tokutxn *txn); + +void toku_txn_get_status(TXN_STATUS s); + +bool toku_is_txn_in_live_root_txn_list(const xid_omt_t &live_root_txn_list, TXNID xid); + +TXNID toku_get_oldest_in_live_root_txn_list(struct tokutxn *txn); + +TOKUTXN_STATE toku_txn_get_state(struct tokutxn *txn); + +struct tokulogger_preplist { + TOKU_XA_XID xid; + DB_TXN *txn; +}; +int toku_logger_recover_txn (struct tokulogger *logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags); + +void toku_maybe_log_begin_txn_for_write_operation(struct tokutxn *txn); + +// Return whether txn (or it's descendents) have done no work. +bool toku_txn_is_read_only(struct tokutxn *txn); + +void toku_txn_lock_state(struct tokutxn *txn); +void toku_txn_unlock_state(struct tokutxn *txn); +void toku_txn_pin_live_txn_unlocked(struct tokutxn *txn); +void toku_txn_unpin_live_txn(struct tokutxn *txn); + +bool toku_txn_has_spilled_rollback(struct tokutxn *txn); + +void toku_txn_get_client_id(struct tokutxn *txn, uint64_t *client_id, void **client_extra); +void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id, void *client_extra); + +time_t toku_txn_get_start_time(struct tokutxn *txn); + +// +// This function is used by the leafentry iterators. +// returns TOKUDB_ACCEPT if live transaction context is allowed to read a value +// that is written by transaction with LSN of id +// live transaction context may read value if either id is the root ancestor of context, or if +// id was committed before context's snapshot was taken. +// For id to be committed before context's snapshot was taken, the following must be true: +// - id < context->snapshot_txnid64 AND id is not in context's live root transaction list +// For the above to NOT be true: +// - id > context->snapshot_txnid64 OR id is in context's live root transaction list +// +int toku_txn_reads_txnid(TXNID txnid, struct tokutxn *txn, bool is_provisional UU()); + +// For serialize / deserialize + +#include "ft/serialize/wbuf.h" + +static inline void wbuf_TXNID(struct wbuf *wb, TXNID txnid) { + wbuf_ulonglong(wb, txnid); +} + +static inline void wbuf_nocrc_TXNID(struct wbuf *wb, TXNID txnid) { + wbuf_nocrc_ulonglong(wb, txnid); +} + +static inline void wbuf_nocrc_TXNID_PAIR(struct wbuf *wb, TXNID_PAIR txnid) { + wbuf_nocrc_ulonglong(wb, txnid.parent_id64); + wbuf_nocrc_ulonglong(wb, txnid.child_id64); +} + +static inline void wbuf_nocrc_LSN(struct wbuf *wb, LSN lsn) { + wbuf_nocrc_ulonglong(wb, lsn.lsn); +} + +static inline void wbuf_LSN(struct wbuf *wb, LSN lsn) { + wbuf_ulonglong(wb, lsn.lsn); +} + +#include "ft/serialize/rbuf.h" + +static inline void rbuf_TXNID(struct rbuf *rb, TXNID *txnid) { + *txnid = rbuf_ulonglong(rb); +} + +static inline void rbuf_TXNID_PAIR(struct rbuf *rb, TXNID_PAIR *txnid) { + txnid->parent_id64 = rbuf_ulonglong(rb); + txnid->child_id64 = rbuf_ulonglong(rb); +} + +static inline void rbuf_ma_TXNID(struct rbuf *rb, memarena *UU(ma), TXNID *txnid) { + rbuf_TXNID(rb, txnid); +} + +static inline void rbuf_ma_TXNID_PAIR (struct rbuf *r, memarena *ma __attribute__((__unused__)), TXNID_PAIR *txnid) { + rbuf_TXNID_PAIR(r, txnid); +} + +static inline LSN rbuf_LSN(struct rbuf *rb) { + LSN lsn = { .lsn = rbuf_ulonglong(rb) }; + return lsn; +} diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.cc b/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.cc new file mode 100644 index 00000000..99a21331 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.cc @@ -0,0 +1,143 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "ft/logger/log-internal.h" +#include "ft/txn/txn_child_manager.h" + +toku_instr_key *txn_child_manager_mutex_key; + +// +// initialized a txn_child_manager, +// when called, root->txnid.parent_id64 may not yet be set +// +void txn_child_manager::init(TOKUTXN root) { + invariant(root->txnid.child_id64 == TXNID_NONE); + invariant(root->parent == NULL); + m_root = root; + m_last_xid = TXNID_NONE; + ZERO_STRUCT(m_mutex); + + toku_pthread_mutexattr_t attr; + toku_mutexattr_init(&attr); + toku_mutexattr_settype(&attr, TOKU_MUTEX_ADAPTIVE); + toku_mutex_init(*txn_child_manager_mutex_key, &m_mutex, &attr); + toku_mutexattr_destroy(&attr); +} + +void txn_child_manager::destroy() { + toku_mutex_destroy(&m_mutex); +} + +void txn_child_manager::start_child_txn_for_recovery(TOKUTXN child, TOKUTXN parent, TXNID_PAIR txnid) { + invariant(parent->txnid.parent_id64 == m_root->txnid.parent_id64); + invariant(txnid.parent_id64 == m_root->txnid.parent_id64); + + child->txnid = txnid; + toku_mutex_lock(&m_mutex); + if (txnid.child_id64 > m_last_xid) { + m_last_xid = txnid.child_id64; + } + parent->child = child; + toku_mutex_unlock(&m_mutex); +} + +void txn_child_manager::start_child_txn(TOKUTXN child, TOKUTXN parent) { + invariant(parent->txnid.parent_id64 == m_root->txnid.parent_id64); + child->txnid.parent_id64 = m_root->txnid.parent_id64; + toku_mutex_lock(&m_mutex); + + ++m_last_xid; + // Here we ensure that the child_id64 is never equal to the parent_id64 + // We do this to make this feature work more easily with the XIDs + // struct and message application. The XIDs struct stores the parent id + // as the first TXNID, and subsequent TXNIDs store child ids. So, if we + // have a case where the parent id is the same as the child id, we will + // have to do some tricky maneuvering in the message application code + // in ule.cc. So, to lessen the probability of bugs, we ensure that the + // parent id is not the same as the child id. + if (m_last_xid == m_root->txnid.parent_id64) { + ++m_last_xid; + } + child->txnid.child_id64 = m_last_xid; + + parent->child = child; + toku_mutex_unlock(&m_mutex); +} + +void txn_child_manager::finish_child_txn(TOKUTXN child) { + invariant(child->txnid.parent_id64 == m_root->txnid.parent_id64); + toku_mutex_lock(&m_mutex); + child->parent->child = NULL; + toku_mutex_unlock(&m_mutex); +} + +void txn_child_manager::suspend() { + toku_mutex_lock(&m_mutex); +} + +void txn_child_manager::resume() { + toku_mutex_unlock(&m_mutex); +} + +void txn_child_manager::find_tokutxn_by_xid_unlocked(TXNID_PAIR xid, TOKUTXN* result) { + invariant(xid.parent_id64 == m_root->txnid.parent_id64); + TOKUTXN curr_txn = m_root; + while (curr_txn != NULL) { + if (xid.child_id64 == curr_txn->txnid.child_id64) { + *result = curr_txn; + break; + } + curr_txn = curr_txn->child; + } +} + +int txn_child_manager::iterate(txn_mgr_iter_callback cb, void* extra) { + TOKUTXN curr_txn = m_root; + int ret = 0; + toku_mutex_lock(&m_mutex); + while (curr_txn != NULL) { + ret = cb(curr_txn, extra); + if (ret != 0) { + break; + } + curr_txn = curr_txn->child; + } + toku_mutex_unlock(&m_mutex); + return ret; +} + diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.h b/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.h new file mode 100644 index 00000000..76db3705 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/txn_child_manager.h @@ -0,0 +1,66 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +// We should be including ft/txn/txn.h here but that header includes this one, +// so we don't. +#include "portability/toku_pthread.h" + +class txn_child_manager { +public: + void init (TOKUTXN root); + void destroy(); + void start_child_txn_for_recovery(TOKUTXN child, TOKUTXN parent, TXNID_PAIR txnid); + void start_child_txn(TOKUTXN child, TOKUTXN parent); + void finish_child_txn(TOKUTXN child); + void suspend(); + void resume(); + void find_tokutxn_by_xid_unlocked(TXNID_PAIR xid, TOKUTXN* result); + int iterate(int (*cb)(TOKUTXN txn, void *extra), void* extra); + +private: + TXNID m_last_xid; + TOKUTXN m_root; + toku_mutex_t m_mutex; + + friend class txn_child_manager_unit_test; +}; + + +ENSURE_POD(txn_child_manager); diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc b/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc new file mode 100644 index 00000000..1b55844b --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc @@ -0,0 +1,1040 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "portability/toku_race_tools.h" + +#include "ft/cachetable/checkpoint.h" +#include "ft/logger/log-internal.h" +#include "ft/ule.h" +#include "ft/txn/txn.h" +#include "ft/txn/txn_manager.h" +#include "ft/txn/rollback.h" +#include "util/omt.h" +//this is only for testing + +static void (* test_txn_sync_callback) (pthread_t, void *) = NULL; +static void * test_txn_sync_callback_extra = NULL; + +void set_test_txn_sync_callback(void (*cb) (pthread_t, void *), void *extra) { + test_txn_sync_callback = cb; + test_txn_sync_callback_extra = extra; +} +bool garbage_collection_debug = false; + +toku_instr_key *txn_manager_lock_mutex_key; + +static bool txn_records_snapshot(TXN_SNAPSHOT_TYPE snapshot_type, + struct tokutxn *parent) { + if (snapshot_type == TXN_COPIES_SNAPSHOT) { + return false; + } + // we need a snapshot if the snapshot type is a child or + // if the snapshot type is root and we have no parent. + // Cases that we don't need a snapshot: when snapshot type is NONE + // or when it is ROOT and we have a parent + return (snapshot_type != TXN_SNAPSHOT_NONE && (parent==NULL || snapshot_type == TXN_SNAPSHOT_CHILD)); +} + +static bool txn_copies_snapshot(TXN_SNAPSHOT_TYPE snapshot_type, struct tokutxn *parent) { + return (snapshot_type == TXN_COPIES_SNAPSHOT) || txn_records_snapshot(snapshot_type, parent); +} + +// internal locking functions, should use this instead of accessing lock directly +static void txn_manager_lock(TXN_MANAGER txn_manager); +static void txn_manager_unlock(TXN_MANAGER txn_manager); + +#if 0 +static bool is_txnid_live(TXN_MANAGER txn_manager, TXNID txnid) { + TOKUTXN result = NULL; + toku_txn_manager_id2txn_unlocked(txn_manager, txnid, &result); + return (result != NULL); +} +#endif + +//Heaviside function to search through an OMT by a TXNID +int find_by_xid (const TOKUTXN &txn, const TXNID &txnidfind); + +static bool is_txnid_live(TXN_MANAGER txn_manager, TXNID txnid) { + TOKUTXN result = NULL; + TXNID_PAIR id = { .parent_id64 = txnid, .child_id64 = TXNID_NONE }; + toku_txn_manager_id2txn_unlocked(txn_manager, id, &result); + return (result != NULL); +} + +static void toku_txn_manager_clone_state_for_gc_unlocked( + TXN_MANAGER txn_manager, + xid_omt_t* snapshot_xids, + rx_omt_t* referenced_xids, + xid_omt_t* live_root_txns + ); + +static void +verify_snapshot_system(TXN_MANAGER txn_manager UU()) { + uint32_t num_snapshot_txnids = txn_manager->num_snapshots; + TXNID snapshot_txnids[num_snapshot_txnids]; + TOKUTXN snapshot_txns[num_snapshot_txnids]; + uint32_t num_live_txns = txn_manager->live_root_txns.size(); + TOKUTXN live_txns[num_live_txns]; + uint32_t num_referenced_xid_tuples = txn_manager->referenced_xids.size(); + struct referenced_xid_tuple *referenced_xid_tuples[num_referenced_xid_tuples]; + + // do this to get an omt of snapshot_txnids + xid_omt_t snapshot_txnids_omt; + rx_omt_t referenced_xids_omt; + xid_omt_t live_root_txns_omt; + toku_txn_manager_clone_state_for_gc_unlocked( + txn_manager, + &snapshot_txnids_omt, + &referenced_xids_omt, + &live_root_txns_omt + ); + + int r; + uint32_t i; + uint32_t j; + //set up arrays for easier access + { + TOKUTXN curr_txn = txn_manager->snapshot_head; + uint32_t curr_index = 0; + while (curr_txn != NULL) { + snapshot_txns[curr_index] = curr_txn; + snapshot_txnids[curr_index] = curr_txn->snapshot_txnid64; + curr_txn = curr_txn->snapshot_next; + curr_index++; + } + } + + for (i = 0; i < num_live_txns; i++) { + r = txn_manager->live_root_txns.fetch(i, &live_txns[i]); + assert_zero(r); + } + for (i = 0; i < num_referenced_xid_tuples; i++) { + r = txn_manager->referenced_xids.fetch(i, &referenced_xid_tuples[i]); + assert_zero(r); + } + + { + //Verify snapshot_txnids + for (i = 0; i < num_snapshot_txnids; i++) { + TXNID snapshot_xid = snapshot_txnids[i]; + TOKUTXN snapshot_txn = snapshot_txns[i]; + uint32_t num_live_root_txn_list = snapshot_txn->live_root_txn_list->size(); + TXNID live_root_txn_list[num_live_root_txn_list]; + { + for (j = 0; j < num_live_root_txn_list; j++) { + r = snapshot_txn->live_root_txn_list->fetch(j, &live_root_txn_list[j]); + assert_zero(r); + } + } + { + // Only committed entries have return a youngest. + TXNID youngest = toku_get_youngest_live_list_txnid_for( + snapshot_xid, + snapshot_txnids_omt, + txn_manager->referenced_xids + ); + invariant(youngest == TXNID_NONE); + } + for (j = 0; j < num_live_root_txn_list; j++) { + TXNID live_xid = live_root_txn_list[j]; + invariant(live_xid <= snapshot_xid); + TXNID youngest = toku_get_youngest_live_list_txnid_for( + live_xid, + snapshot_txnids_omt, + txn_manager->referenced_xids + ); + if (is_txnid_live(txn_manager, live_xid)) { + // Only committed entries have return a youngest. + invariant(youngest == TXNID_NONE); + } + else { + invariant(youngest != TXNID_NONE); + // A committed entry might have been read-only, in which case it won't return anything. + // This snapshot reads 'live_xid' so it's youngest cannot be older than snapshot_xid. + invariant(youngest >= snapshot_xid); + } + } + } + } + { + // Verify referenced_xids. + for (i = 0; i < num_referenced_xid_tuples; i++) { + struct referenced_xid_tuple *tuple = referenced_xid_tuples[i]; + invariant(tuple->begin_id < tuple->end_id); + invariant(tuple->references > 0); + + { + //verify neither pair->begin_id nor end_id is in live_list + r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(tuple->begin_id, nullptr, nullptr); + invariant(r == DB_NOTFOUND); + r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(tuple->end_id, nullptr, nullptr); + invariant(r == DB_NOTFOUND); + } + { + //verify neither pair->begin_id nor end_id is in snapshot_xids + TOKUTXN curr_txn = txn_manager->snapshot_head; + uint32_t curr_index = 0; + while (curr_txn != NULL) { + invariant(tuple->begin_id != curr_txn->txnid.parent_id64); + invariant(tuple->end_id != curr_txn->txnid.parent_id64); + curr_txn = curr_txn->snapshot_next; + curr_index++; + } + } + { + // Verify number of references is correct + uint32_t refs_found = 0; + for (j = 0; j < num_snapshot_txnids; j++) { + TOKUTXN snapshot_txn = snapshot_txns[j]; + if (toku_is_txn_in_live_root_txn_list(*snapshot_txn->live_root_txn_list, tuple->begin_id)) { + refs_found++; + } + invariant(!toku_is_txn_in_live_root_txn_list( + *snapshot_txn->live_root_txn_list, + tuple->end_id)); + } + invariant(refs_found == tuple->references); + } + { + // Verify youngest makes sense. + TXNID youngest = toku_get_youngest_live_list_txnid_for( + tuple->begin_id, + snapshot_txnids_omt, + txn_manager->referenced_xids + ); + invariant(youngest != TXNID_NONE); + invariant(youngest > tuple->begin_id); + invariant(youngest < tuple->end_id); + // Youngest must be found, and must be a snapshot txn + r = snapshot_txnids_omt.find_zero<TXNID, toku_find_xid_by_xid>(youngest, nullptr, nullptr); + invariant_zero(r); + } + } + } + snapshot_txnids_omt.destroy(); + referenced_xids_omt.destroy(); + live_root_txns_omt.destroy(); +} + +void toku_txn_manager_init(TXN_MANAGER *txn_managerp) { + TXN_MANAGER XCALLOC(txn_manager); + toku_mutex_init( + *txn_manager_lock_mutex_key, &txn_manager->txn_manager_lock, nullptr); + txn_manager->live_root_txns.create(); + txn_manager->live_root_ids.create(); + txn_manager->snapshot_head = NULL; + txn_manager->snapshot_tail = NULL; + txn_manager->num_snapshots = 0; + txn_manager->referenced_xids.create(); + txn_manager->last_xid = 0; + + txn_manager->last_xid_seen_for_recover = TXNID_NONE; + txn_manager->last_calculated_oldest_referenced_xid = TXNID_NONE; + + *txn_managerp = txn_manager; +} + +void toku_txn_manager_destroy(TXN_MANAGER txn_manager) { + toku_mutex_destroy(&txn_manager->txn_manager_lock); + invariant(txn_manager->live_root_txns.size() == 0); + txn_manager->live_root_txns.destroy(); + invariant(txn_manager->live_root_ids.size() == 0); + txn_manager->live_root_ids.destroy(); + invariant(txn_manager->snapshot_head == NULL); + invariant(txn_manager->referenced_xids.size() == 0); + txn_manager->referenced_xids.destroy(); + toku_free(txn_manager); +} + +TXNID +toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager) { + TOKUTXN rtxn = NULL; + TXNID rval = TXNID_NONE_LIVING; + txn_manager_lock(txn_manager); + + if (txn_manager->live_root_txns.size() > 0) { + int r = txn_manager->live_root_txns.fetch(0, &rtxn); + invariant_zero(r); + } + if (rtxn) { + rval = rtxn->txnid.parent_id64; + } + txn_manager_unlock(txn_manager); + return rval; +} + +TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager) { + return toku_unsafe_fetch(&txn_manager->last_calculated_oldest_referenced_xid); +} + +int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids); +int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids){ + (*referenced_xids)[index] = live_xid->txnid.parent_id64; + return 0; +} + + +// Create list of root transactions that were live when this txn began. +static inline void +setup_live_root_txn_list(xid_omt_t* live_root_txnid, xid_omt_t* live_root_txn_list) { + if (live_root_txnid->size() > 0) { + live_root_txn_list->clone(*live_root_txnid); + } else { + live_root_txn_list->create_no_array(); + } +} + +//Heaviside function to search through an OMT by a TXNID +int +find_by_xid (const TOKUTXN &txn, const TXNID &txnidfind) { + if (txn->txnid.parent_id64 < txnidfind) return -1; + if (txn->txnid.parent_id64 > txnidfind) return +1; + return 0; +} + +static TXNID +max_xid(TXNID a, TXNID b) { + return a < b ? b : a; +} + +static void set_oldest_referenced_xid(TXN_MANAGER txn_manager) { + TXNID oldest_referenced_xid = TXNID_MAX; + int r; + if (txn_manager->live_root_ids.size() > 0) { + r = txn_manager->live_root_ids.fetch(0, &oldest_referenced_xid); + // this function should only be called when we know there is at least + // one live transaction + invariant_zero(r); + } + + if (txn_manager->referenced_xids.size() > 0) { + struct referenced_xid_tuple* tuple; + r = txn_manager->referenced_xids.fetch(0, &tuple); + if (r == 0 && tuple->begin_id < oldest_referenced_xid) { + oldest_referenced_xid = tuple->begin_id; + } + } + if (txn_manager->snapshot_head != NULL) { + TXNID id = txn_manager->snapshot_head->snapshot_txnid64; + if (id < oldest_referenced_xid) { + oldest_referenced_xid = id; + } + } + if (txn_manager->last_xid < oldest_referenced_xid) { + oldest_referenced_xid = txn_manager->last_xid; + } + invariant(oldest_referenced_xid != TXNID_MAX); + toku_unsafe_set(&txn_manager->last_calculated_oldest_referenced_xid, oldest_referenced_xid); +} + +//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index) +// template-only function, but must be extern +int find_xid (const TOKUTXN &txn, const TOKUTXN &txnfind); +int +find_xid (const TOKUTXN &txn, const TOKUTXN &txnfind) +{ + if (txn->txnid.parent_id64 < txnfind->txnid.parent_id64) return -1; + if (txn->txnid.parent_id64 > txnfind->txnid.parent_id64) return +1; + return 0; +} + +static inline void txn_manager_create_snapshot_unlocked( + TXN_MANAGER txn_manager, + TOKUTXN txn + ) +{ + txn->snapshot_txnid64 = ++txn_manager->last_xid; + // Add this txn to the global list of txns that have their own snapshots. + // (Note, if a txn is a child that creates its own snapshot, then that child xid + // is the xid stored in the global list.) + if (txn_manager->snapshot_head == NULL) { + invariant(txn_manager->snapshot_tail == NULL); + txn_manager->snapshot_head = txn; + txn_manager->snapshot_tail = txn; + } + else { + txn_manager->snapshot_tail->snapshot_next = txn; + txn->snapshot_prev = txn_manager->snapshot_tail; + txn_manager->snapshot_tail = txn; + } + txn_manager->num_snapshots++; +} + +// template-only function, but must be extern +int find_tuple_by_xid (const struct referenced_xid_tuple &tuple, const TXNID &xidfind); +int +find_tuple_by_xid (const struct referenced_xid_tuple &tuple, const TXNID &xidfind) +{ + if (tuple.begin_id < xidfind) return -1; + if (tuple.begin_id > xidfind) return +1; + return 0; +} + +// template-only function, but must be extern +int referenced_xids_note_snapshot_txn_end_iter(const TXNID &live_xid, const uint32_t UU(index), rx_omt_t *const referenced_xids) + __attribute__((nonnull(3))); +int referenced_xids_note_snapshot_txn_end_iter(const TXNID &live_xid, const uint32_t UU(index), rx_omt_t *const referenced_xids) +{ + int r; + uint32_t idx; + struct referenced_xid_tuple *tuple; + + r = referenced_xids->find_zero<TXNID, find_tuple_by_xid>(live_xid, &tuple, &idx); + if (r == DB_NOTFOUND) { + goto done; + } + invariant_zero(r); + invariant(tuple->references > 0); + if (--tuple->references == 0) { + r = referenced_xids->delete_at(idx); + lazy_assert_zero(r); + } +done: + return 0; +} + +// When txn ends, update reverse live list. To do that, examine each txn in this (closing) txn's live list. +static inline int +note_snapshot_txn_end_by_ref_xids(TXN_MANAGER mgr, const xid_omt_t &live_root_txn_list) { + int r; + r = live_root_txn_list.iterate<rx_omt_t, referenced_xids_note_snapshot_txn_end_iter>(&mgr->referenced_xids); + invariant_zero(r); + return r; +} + +typedef struct snapshot_iter_extra { + uint32_t* indexes_to_delete; + uint32_t num_indexes; + xid_omt_t* live_root_txn_list; +} SNAPSHOT_ITER_EXTRA; + +// template-only function, but must be extern +int note_snapshot_txn_end_by_txn_live_list_iter(referenced_xid_tuple* tuple, const uint32_t index, SNAPSHOT_ITER_EXTRA *const sie) + __attribute__((nonnull(3))); +int note_snapshot_txn_end_by_txn_live_list_iter( + referenced_xid_tuple* tuple, + const uint32_t index, + SNAPSHOT_ITER_EXTRA *const sie + ) +{ + int r; + uint32_t idx; + TXNID txnid; + r = sie->live_root_txn_list->find_zero<TXNID, toku_find_xid_by_xid>(tuple->begin_id, &txnid, &idx); + if (r == DB_NOTFOUND) { + goto done; + } + invariant_zero(r); + invariant(txnid == tuple->begin_id); + invariant(tuple->references > 0); + if (--tuple->references == 0) { + sie->indexes_to_delete[sie->num_indexes] = index; + sie->num_indexes++; + } +done: + return 0; +} + +static inline int +note_snapshot_txn_end_by_txn_live_list(TXN_MANAGER mgr, xid_omt_t* live_root_txn_list) { + uint32_t size = mgr->referenced_xids.size(); + uint32_t indexes_to_delete[size]; + SNAPSHOT_ITER_EXTRA sie = { .indexes_to_delete = indexes_to_delete, .num_indexes = 0, .live_root_txn_list = live_root_txn_list}; + mgr->referenced_xids.iterate_ptr<SNAPSHOT_ITER_EXTRA, note_snapshot_txn_end_by_txn_live_list_iter>(&sie); + for (uint32_t i = 0; i < sie.num_indexes; i++) { + uint32_t curr_index = sie.indexes_to_delete[sie.num_indexes-i-1]; + mgr->referenced_xids.delete_at(curr_index); + } + return 0; +} + +static inline void txn_manager_remove_snapshot_unlocked( + TOKUTXN txn, + TXN_MANAGER txn_manager + ) +{ + // Remove from linked list of snapshot txns + if (txn_manager->snapshot_head == txn) { + txn_manager->snapshot_head = txn->snapshot_next; + } + if (txn_manager->snapshot_tail == txn) { + txn_manager->snapshot_tail = txn->snapshot_prev; + } + if (txn->snapshot_next) { + txn->snapshot_next->snapshot_prev = txn->snapshot_prev; + } + if (txn->snapshot_prev) { + txn->snapshot_prev->snapshot_next = txn->snapshot_next; + } + txn_manager->num_snapshots--; + uint32_t ref_xids_size = txn_manager->referenced_xids.size(); + uint32_t live_list_size = txn->live_root_txn_list->size(); + if (ref_xids_size > 0 && live_list_size > 0) { + if (live_list_size > ref_xids_size && ref_xids_size < 2000) { + note_snapshot_txn_end_by_txn_live_list(txn_manager, txn->live_root_txn_list); + } + else { + note_snapshot_txn_end_by_ref_xids(txn_manager, *txn->live_root_txn_list); + } + } +} + +static inline void inherit_snapshot_from_parent(TOKUTXN child) { + if (child->parent) { + child->snapshot_txnid64 = child->parent->snapshot_txnid64; + child->live_root_txn_list = child->parent->live_root_txn_list; + } +} +void toku_txn_manager_handle_snapshot_create_for_child_txn( + TOKUTXN txn, + TXN_MANAGER txn_manager, + TXN_SNAPSHOT_TYPE snapshot_type + ) +{ + // this is a function for child txns, so just doint a sanity check + invariant(txn->parent != NULL); + bool copies_snapshot = txn_copies_snapshot(snapshot_type, txn->parent); + bool records_snapshot = txn_records_snapshot(snapshot_type, txn->parent); + // assert that if records_snapshot is true, then copies_snapshot is true + invariant(!records_snapshot || copies_snapshot); + if (records_snapshot) { + invariant(txn->live_root_txn_list == nullptr); + XMALLOC(txn->live_root_txn_list); + txn_manager_lock(txn_manager); + txn_manager_create_snapshot_unlocked(txn_manager, txn); + } + else { + inherit_snapshot_from_parent(txn); + } + + toku_debug_txn_sync(pthread_self()); + + if (copies_snapshot) { + if(!records_snapshot) + txn_manager_lock(txn_manager); + setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list); + txn_manager_unlock(txn_manager); + } +} + +void toku_txn_manager_handle_snapshot_destroy_for_child_txn( + TOKUTXN txn, + TXN_MANAGER txn_manager, + TXN_SNAPSHOT_TYPE snapshot_type + ) +{ + // this is a function for child txns, so just doint a sanity check + invariant(txn->parent != NULL); + bool copies_snapshot = txn_copies_snapshot(snapshot_type, txn->parent); + bool records_snapshot = txn_records_snapshot(snapshot_type, txn->parent); + if (records_snapshot) { + txn_manager_lock(txn_manager); + txn_manager_remove_snapshot_unlocked(txn, txn_manager); + txn_manager_unlock(txn_manager); + } + if (copies_snapshot) { + invariant(txn->live_root_txn_list != nullptr); + txn->live_root_txn_list->destroy(); + toku_free(txn->live_root_txn_list); + } +} + +void toku_txn_manager_start_txn_for_recovery( + TOKUTXN txn, + TXN_MANAGER txn_manager, + TXNID xid + ) +{ + txn_manager_lock(txn_manager); + // using xid that is passed in + txn_manager->last_xid = max_xid(txn_manager->last_xid, xid); + toku_txn_update_xids_in_txn(txn, xid); + + uint32_t idx; + int r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, nullptr, &idx); + invariant(r == DB_NOTFOUND); + r = txn_manager->live_root_txns.insert_at(txn, idx); + invariant_zero(r); + r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx); + invariant_zero(r); + + txn_manager_unlock(txn_manager); +} + +void toku_txn_manager_start_txn( + TOKUTXN txn, + TXN_MANAGER txn_manager, + TXN_SNAPSHOT_TYPE snapshot_type, + bool read_only + ) +{ + int r; + TXNID xid = TXNID_NONE; + // if we are running in recovery, we don't need to make snapshots + bool copies_snapshot = txn_copies_snapshot(snapshot_type, NULL); + bool records_snapshot = txn_records_snapshot(snapshot_type, NULL); + // assert that if records_snapshot is true, then copies_snapshot is true + invariant(!records_snapshot || copies_snapshot); + + // perform a malloc outside of the txn_manager lock + // will be used in txn_manager_create_snapshot_unlocked below + if (copies_snapshot) { + invariant(txn->live_root_txn_list == nullptr); + XMALLOC(txn->live_root_txn_list); + } + // the act of getting a transaction ID and adding the + // txn to the proper OMTs must be atomic. MVCC depends + // on this. + txn_manager_lock(txn_manager); + if (garbage_collection_debug) { + verify_snapshot_system(txn_manager); + } + + // + // maintain the data structures necessary for MVCC: + // 1. add txn to list of live_root_txns if this is a root transaction + // 2. if the transaction is creating a snapshot: + // - create a live list for the transaction + // - add the id to the list of snapshot ids + // + // The order of operations is important here, and must be taken + // into account when the transaction is closed. The txn is added + // to the live_root_txns first (if it is a root txn). This has the implication + // that a root level snapshot transaction is in its own live list. This fact + // is taken into account when the transaction is closed. + + // add ancestor information, and maintain global live root txn list + xid = ++txn_manager->last_xid; // we always need an ID, needed for lock tree + toku_txn_update_xids_in_txn(txn, xid); + if (!read_only) { + uint32_t idx = txn_manager->live_root_txns.size(); + r = txn_manager->live_root_txns.insert_at(txn, idx); + invariant_zero(r); + r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx); + invariant_zero(r); + } + set_oldest_referenced_xid(txn_manager); + + if (records_snapshot) { + txn_manager_create_snapshot_unlocked( + txn_manager, + txn + ); + } + if (copies_snapshot) { + setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list); + } + + if (garbage_collection_debug) { + verify_snapshot_system(txn_manager); + } + txn_manager_unlock(txn_manager); + return; +} + +TXNID +toku_get_youngest_live_list_txnid_for(TXNID xc, const xid_omt_t &snapshot_txnids, const rx_omt_t &referenced_xids) { + struct referenced_xid_tuple *tuple; + int r; + TXNID rval = TXNID_NONE; + + r = referenced_xids.find_zero<TXNID, find_tuple_by_xid>(xc, &tuple, nullptr); + if (r == DB_NOTFOUND) { + goto done; + } + TXNID live; + + r = snapshot_txnids.find<TXNID, toku_find_xid_by_xid>(tuple->end_id, -1, &live, nullptr); + if (r == DB_NOTFOUND) { + goto done; + } + invariant(live < tuple->end_id); + if (live > tuple->begin_id) { + rval = live; + } +done: + return rval; +} + +void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) { + int r; + invariant(txn->parent == NULL); + bool records_snapshot = txn_records_snapshot(txn->snapshot_type, NULL); + txn_manager_lock(txn_manager); + + if (garbage_collection_debug) { + verify_snapshot_system(txn_manager); + } + + if (records_snapshot) { + txn_manager_remove_snapshot_unlocked( + txn, + txn_manager + ); + } + + if (!txn_declared_read_only(txn)) { + uint32_t idx; + //Remove txn from list of live root txns + TOKUTXN txnagain; + r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, &txnagain, &idx); + invariant_zero(r); + invariant(txn==txnagain); + + r = txn_manager->live_root_txns.delete_at(idx); + invariant_zero(r); + r = txn_manager->live_root_ids.delete_at(idx); + invariant_zero(r); + + if (!toku_txn_is_read_only(txn) || garbage_collection_debug) { + uint32_t num_references = 0; + TOKUTXN curr_txn = txn_manager->snapshot_tail; + while(curr_txn != NULL) { + if (curr_txn->snapshot_txnid64 > txn->txnid.parent_id64) { + num_references++; + } + else { + break; + } + curr_txn = curr_txn->snapshot_prev; + } + + if (num_references > 0) { + // This transaction exists in a live list of another transaction. + struct referenced_xid_tuple tuple = { + .begin_id = txn->txnid.parent_id64, + .end_id = ++txn_manager->last_xid, + .references = num_references + }; + r = txn_manager->referenced_xids.insert<TXNID, find_tuple_by_xid>(tuple, txn->txnid.parent_id64, nullptr); + lazy_assert_zero(r); + } + } + } + + if (garbage_collection_debug) { + verify_snapshot_system(txn_manager); + } + txn_manager_unlock(txn_manager); + + //Cleanup that does not require the txn_manager lock + if (txn->live_root_txn_list) { + txn->live_root_txn_list->destroy(); + toku_free(txn->live_root_txn_list); + } + return; +} + +static void toku_txn_manager_clone_state_for_gc_unlocked( + TXN_MANAGER txn_manager, + xid_omt_t* snapshot_xids, + rx_omt_t* referenced_xids, + xid_omt_t* live_root_txns + ) +{ + TXNID* snapshot_xids_array = NULL; + XMALLOC_N(txn_manager->num_snapshots, snapshot_xids_array); + TOKUTXN curr_txn = txn_manager->snapshot_head; + uint32_t curr_index = 0; + while (curr_txn != NULL) { + snapshot_xids_array[curr_index] = curr_txn->snapshot_txnid64; + curr_txn = curr_txn->snapshot_next; + curr_index++; + } + snapshot_xids->create_steal_sorted_array( + &snapshot_xids_array, + txn_manager->num_snapshots, + txn_manager->num_snapshots + ); + + referenced_xids->clone(txn_manager->referenced_xids); + setup_live_root_txn_list(&txn_manager->live_root_ids, live_root_txns); +} + +void toku_txn_manager_clone_state_for_gc( + TXN_MANAGER txn_manager, + xid_omt_t* snapshot_xids, + rx_omt_t* referenced_xids, + xid_omt_t* live_root_txns + ) +{ + txn_manager_lock(txn_manager); + toku_txn_manager_clone_state_for_gc_unlocked( + txn_manager, + snapshot_xids, + referenced_xids, + live_root_txns + ); + txn_manager_unlock(txn_manager); +} + +void txn_manager_state::init() { + invariant(!initialized); + invariant_notnull(txn_manager); + toku_txn_manager_clone_state_for_gc( + txn_manager, + &snapshot_xids, + &referenced_xids, + &live_root_txns + ); + initialized = true; +} + +void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID_PAIR txnid, TOKUTXN *result) { + TOKUTXN txn; + int r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(txnid.parent_id64, &txn, nullptr); + if (r==0) { + assert(txn->txnid.parent_id64 == txnid.parent_id64); + *result = txn; + } + else { + assert(r==DB_NOTFOUND); + // If there is no txn, then we treat it as the null txn. + *result = NULL; + } +} + +int toku_txn_manager_get_root_txn_from_xid (TXN_MANAGER txn_manager, TOKU_XA_XID *xid, DB_TXN **txnp) { + txn_manager_lock(txn_manager); + int ret_val = 0; + int num_live_txns = txn_manager->live_root_txns.size(); + for (int i = 0; i < num_live_txns; i++) { + TOKUTXN txn; + { + int r = txn_manager->live_root_txns.fetch(i, &txn); + assert_zero(r); + } + if (txn->xa_xid.formatID == xid->formatID + && txn->xa_xid.gtrid_length == xid->gtrid_length + && txn->xa_xid.bqual_length == xid->bqual_length + && 0==memcmp(txn->xa_xid.data, xid->data, xid->gtrid_length + xid->bqual_length)) { + *txnp = txn->container_db_txn; + ret_val = 0; + goto exit; + } + } + ret_val = DB_NOTFOUND; +exit: + txn_manager_unlock(txn_manager); + return ret_val; +} + +uint32_t toku_txn_manager_num_live_root_txns(TXN_MANAGER txn_manager) { + int ret_val = 0; + txn_manager_lock(txn_manager); + ret_val = txn_manager->live_root_txns.size(); + txn_manager_unlock(txn_manager); + return ret_val; +} + +static int txn_manager_iter( + TXN_MANAGER txn_manager, + txn_mgr_iter_callback cb, + void* extra, + bool just_root_txns + ) +{ + int r = 0; + toku_mutex_lock(&txn_manager->txn_manager_lock); + uint32_t size = txn_manager->live_root_txns.size(); + for (uint32_t i = 0; i < size; i++) { + TOKUTXN curr_txn = NULL; + r = txn_manager->live_root_txns.fetch(i, &curr_txn); + assert_zero(r); + if (just_root_txns) { + r = cb(curr_txn, extra); + } + else { + r = curr_txn->child_manager->iterate(cb, extra); + } + if (r) { + break; + } + } + toku_mutex_unlock(&txn_manager->txn_manager_lock); + return r; +} + +int toku_txn_manager_iter_over_live_txns( + TXN_MANAGER txn_manager, + txn_mgr_iter_callback cb, + void* extra + ) +{ + return txn_manager_iter( + txn_manager, + cb, + extra, + false + ); +} + +int toku_txn_manager_iter_over_live_root_txns( + TXN_MANAGER txn_manager, + txn_mgr_iter_callback cb, + void* extra + ) +{ + return txn_manager_iter( + txn_manager, + cb, + extra, + true + ); +} + + +// +// This function is called only via env_txn_xa_recover and env_txn_recover. +// See comments for those functions to understand assumptions that +// can be made when calling this function. Namely, that the system is +// quiescant, in that we are right after recovery and before user operations +// commence. +// +// Another key assumption made here is that only root transactions +// may be prepared and that child transactions cannot be prepared. +// This assumption is made by the fact that we iterate over the live root txns +// to find prepared transactions. +// +// I (Zardosht), don't think we take advantage of this fact, as we are holding +// the txn_manager_lock in this function, but in the future we might want +// to take these assumptions into account. +// +int toku_txn_manager_recover_root_txn ( + TXN_MANAGER txn_manager, + struct tokulogger_preplist preplist[/*count*/], + long count, + long *retp, /*out*/ + uint32_t flags + ) +{ + int ret_val = 0; + txn_manager_lock(txn_manager); + uint32_t num_txns_returned = 0; + // scan through live root txns to find + // prepared transactions and return them + uint32_t size = txn_manager->live_root_txns.size(); + if (flags==DB_FIRST) { + txn_manager->last_xid_seen_for_recover = TXNID_NONE; + } + else if (flags!=DB_NEXT) { + ret_val = EINVAL; + goto exit; + } + for (uint32_t i = 0; i < size; i++) { + TOKUTXN curr_txn = NULL; + txn_manager->live_root_txns.fetch(i, &curr_txn); + // skip over TOKUTXNs whose txnid64 is too small, meaning + // we have already processed them. + if (curr_txn->txnid.parent_id64 <= txn_manager->last_xid_seen_for_recover) { + continue; + } + if (curr_txn->state == TOKUTXN_PREPARING) { + assert(curr_txn->container_db_txn); + preplist[num_txns_returned].txn = curr_txn->container_db_txn; + preplist[num_txns_returned].xid = curr_txn->xa_xid; + txn_manager->last_xid_seen_for_recover = curr_txn->txnid.parent_id64; + num_txns_returned++; + } + txn_manager->last_xid_seen_for_recover = curr_txn->txnid.parent_id64; + // if we found the maximum number of prepared transactions we are + // allowed to find, then break + if ((long) num_txns_returned >= count) { + break; + } + } + invariant((long) num_txns_returned <= count); + *retp = num_txns_returned; + ret_val = 0; +exit: + txn_manager_unlock(txn_manager); + return ret_val; +} + +static void txn_manager_lock(TXN_MANAGER txn_manager) { + toku_mutex_lock(&txn_manager->txn_manager_lock); +} + +static void txn_manager_unlock(TXN_MANAGER txn_manager) { + toku_mutex_unlock(&txn_manager->txn_manager_lock); +} + +void toku_txn_manager_suspend(TXN_MANAGER txn_manager) { + txn_manager_lock(txn_manager); +} + +void toku_txn_manager_resume(TXN_MANAGER txn_manager) { + txn_manager_unlock(txn_manager); +} + +void +toku_txn_manager_set_last_xid_from_logger(TXN_MANAGER txn_manager, TXNID last_xid) { + invariant(txn_manager->last_xid == TXNID_NONE); + txn_manager->last_xid = last_xid; +} + +void +toku_txn_manager_set_last_xid_from_recovered_checkpoint(TXN_MANAGER txn_manager, TXNID last_xid) { + txn_manager->last_xid = last_xid; +} + +TXNID +toku_txn_manager_get_last_xid(TXN_MANAGER mgr) { + txn_manager_lock(mgr); + TXNID last_xid = mgr->last_xid; + txn_manager_unlock(mgr); + return last_xid; +} + +bool +toku_txn_manager_txns_exist(TXN_MANAGER mgr) { + txn_manager_lock(mgr); + bool retval = mgr->live_root_txns.size() > 0; + txn_manager_unlock(mgr); + return retval; +} + + +// Test-only function +void +toku_txn_manager_increase_last_xid(TXN_MANAGER mgr, uint64_t increment) { + txn_manager_lock(mgr); + mgr->last_xid += increment; + txn_manager_unlock(mgr); +} + diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_manager.h b/storage/tokudb/PerconaFT/ft/txn/txn_manager.h new file mode 100644 index 00000000..25fa6032 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/txn_manager.h @@ -0,0 +1,223 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +#include "portability/toku_portability.h" +#include "portability/toku_pthread.h" + +#include "ft/txn/txn.h" + +void set_test_txn_sync_callback(void (*) (pthread_t, void*), void*); +#define toku_test_txn_sync_callback(a) ((test_txn_sync_callback)? test_txn_sync_callback( a,test_txn_sync_callback_extra) : (void) 0) + +#if defined(TOKU_DEBUG_TXN_SYNC) +#define toku_debug_txn_sync(a) toku_test_txn_sync_callback(a) +#else +#define toku_debug_txn_sync(a) ((void) 0) +#endif // defined(TOKU_DEBUG_TXN_SYNC) + +typedef struct txn_manager *TXN_MANAGER; + +struct referenced_xid_tuple { + TXNID begin_id; + TXNID end_id; + uint32_t references; +}; + +struct txn_manager { + toku_mutex_t txn_manager_lock; // a lock protecting this object + txn_omt_t live_root_txns; // a sorted tree. + xid_omt_t live_root_ids; //contains TXNID x | x is snapshot txn + TOKUTXN snapshot_head; + TOKUTXN snapshot_tail; + uint32_t num_snapshots; + // Contains 3-tuples: (TXNID begin_id, TXNID end_id, uint64_t num_live_list_references) + // for committed root transaction ids that are still referenced by a live list. + rx_omt_t referenced_xids; + + TXNID last_xid; + TXNID last_xid_seen_for_recover; + TXNID last_calculated_oldest_referenced_xid; +}; +typedef struct txn_manager *TXN_MANAGER; + +struct txn_manager_state { + txn_manager_state(TXN_MANAGER mgr) : + txn_manager(mgr), + initialized(false) { + snapshot_xids.create_no_array(); + referenced_xids.create_no_array(); + live_root_txns.create_no_array(); + } + + // should not copy construct + txn_manager_state &operator=(txn_manager_state &rhs) = delete; + txn_manager_state(txn_manager_state &rhs) = delete; + + ~txn_manager_state() { + snapshot_xids.destroy(); + referenced_xids.destroy(); + live_root_txns.destroy(); + } + + void init(); + + TXN_MANAGER txn_manager; + bool initialized; + + // a snapshot of the txn manager's mvcc state + // only valid if initialized = true + xid_omt_t snapshot_xids; + rx_omt_t referenced_xids; + xid_omt_t live_root_txns; +}; + +// represents all of the information needed to run garbage collection +struct txn_gc_info { + txn_gc_info(txn_manager_state *st, TXNID xid_sgc, TXNID xid_ip, bool mvcc) + : txn_state_for_gc(st), + oldest_referenced_xid_for_simple_gc(xid_sgc), + oldest_referenced_xid_for_implicit_promotion(xid_ip), + mvcc_needed(mvcc) { + } + + // a snapshot of the transcation system. may be null. + txn_manager_state *txn_state_for_gc; + + // the oldest xid in any live list + // + // suitible for simple garbage collection that cleans up multiple committed + // transaction records into one. not suitible for implicit promotions, which + // must be correct in the face of abort messages - see ftnode->oldest_referenced_xid + TXNID oldest_referenced_xid_for_simple_gc; + + // lower bound on the oldest xid in any live when the messages to be cleaned + // had no messages above them. suitable for implicitly promoting a provisonal uxr. + TXNID oldest_referenced_xid_for_implicit_promotion; + + // whether or not mvcc is actually needed - false during recovery and non-transactional systems + const bool mvcc_needed; +}; + +void toku_txn_manager_init(TXN_MANAGER* txn_manager); +void toku_txn_manager_destroy(TXN_MANAGER txn_manager); + +TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager); + +TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager); + +void toku_txn_manager_handle_snapshot_create_for_child_txn( + TOKUTXN txn, + TXN_MANAGER txn_manager, + TXN_SNAPSHOT_TYPE snapshot_type + ); +void toku_txn_manager_handle_snapshot_destroy_for_child_txn( + TOKUTXN txn, + TXN_MANAGER txn_manager, + TXN_SNAPSHOT_TYPE snapshot_type + ); + + +// Assign a txnid. Log the txn begin in the recovery log. Initialize the txn live lists. +void toku_txn_manager_start_txn( + TOKUTXN txn, + TXN_MANAGER txn_manager, + TXN_SNAPSHOT_TYPE snapshot_type, + bool read_only + ); + +void toku_txn_manager_start_txn_for_recovery( + TOKUTXN txn, + TXN_MANAGER txn_manager, + TXNID xid + ); + +void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn); + +void toku_txn_manager_clone_state_for_gc( + TXN_MANAGER txn_manager, + xid_omt_t* snapshot_xids, + rx_omt_t* referenced_xids, + xid_omt_t* live_root_txns + ); + +void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID_PAIR txnid, TOKUTXN *result); + +// Returns a root txn associated with xid. The system as a whole +// assumes that only root txns get prepared, adn therefore only +// root txns will have XIDs associated with them. +int toku_txn_manager_get_root_txn_from_xid (TXN_MANAGER txn_manager, TOKU_XA_XID *xid, DB_TXN **txnp); + +uint32_t toku_txn_manager_num_live_root_txns(TXN_MANAGER txn_manager); + +typedef int (*txn_mgr_iter_callback)(TOKUTXN txn, void* extra); + +int toku_txn_manager_iter_over_live_txns( + TXN_MANAGER txn_manager, + txn_mgr_iter_callback cb, + void* extra + ); + +int toku_txn_manager_iter_over_live_root_txns( + TXN_MANAGER txn_manager, + txn_mgr_iter_callback cb, + void* extra + ); + +int toku_txn_manager_recover_root_txn( + TXN_MANAGER txn_manager, + struct tokulogger_preplist preplist[/*count*/], + long count, + long *retp, /*out*/ + uint32_t flags + ); + +void toku_txn_manager_suspend(TXN_MANAGER txn_manager); +void toku_txn_manager_resume(TXN_MANAGER txn_manager); + +void toku_txn_manager_set_last_xid_from_logger(TXN_MANAGER txn_manager, TXNID last_xid); +void toku_txn_manager_set_last_xid_from_recovered_checkpoint(TXN_MANAGER txn_manager, TXNID last_xid); +TXNID toku_txn_manager_get_last_xid(TXN_MANAGER mgr); + +bool toku_txn_manager_txns_exist(TXN_MANAGER mgr); + +// Test-only function +void toku_txn_manager_increase_last_xid(TXN_MANAGER mgr, uint64_t increment); + +TXNID toku_get_youngest_live_list_txnid_for(TXNID xc, const xid_omt_t &snapshot_txnids, const rx_omt_t &referenced_xids); diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_state.h b/storage/tokudb/PerconaFT/ft/txn/txn_state.h new file mode 100644 index 00000000..3301cc68 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/txn_state.h @@ -0,0 +1,50 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +// this is a separate file so that the hotindexing tests can see the txn states + +enum tokutxn_state { + TOKUTXN_LIVE, // initial txn state + TOKUTXN_PREPARING, // txn is preparing (or prepared) + TOKUTXN_COMMITTING, // txn in the process of committing + TOKUTXN_ABORTING, // txn in the process of aborting + TOKUTXN_RETIRED, // txn no longer exists +}; +typedef enum tokutxn_state TOKUTXN_STATE; diff --git a/storage/tokudb/PerconaFT/ft/txn/xids.cc b/storage/tokudb/PerconaFT/ft/txn/xids.cc new file mode 100644 index 00000000..59bf3c9b --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/xids.cc @@ -0,0 +1,247 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +/* Purpose of this file is to implement xids list of nested transactions + * ids. + * + * See design documentation for nested transactions at + * TokuWiki/Imp/TransactionsOverview. + * + * NOTE: xids are always stored in disk byte order. + * Accessors are responsible for transposing bytes to + * host order. + */ + +#include <errno.h> +#include <string.h> + +#include "portability/memory.h" +#include "portability/toku_assert.h" +#include "portability/toku_htod.h" +#include "portability/toku_portability.h" + +#include "ft/txn/xids.h" + +///////////////////////////////////////////////////////////////////////////////// +// This layer of abstraction (xids_xxx) understands xids<> and nothing else. +// It contains all the functions that understand xids<> +// +// xids<> do not store the implicit transaction id of 0 at index 0. +// The accessor functions make the id of 0 explicit at index 0. +// The number of xids physically stored in the xids array is in +// the variable num_xids. +// +// The xids struct is immutable. The caller gets an initial version of XIDS +// by calling toku_xids_get_root_xids(), which returns the constant struct +// representing the root transaction (id 0). When a transaction begins, +// a new XIDS is created with the id of the current transaction appended to +// the list. +// +// + +// This is the xids list for a transactionless environment. +// It is also the initial state of any xids list created for +// nested transactions. + +XIDS +toku_xids_get_root_xids(void) { + static const struct XIDS_S root_xids = { + .num_xids = 0 + }; + + XIDS rval = (XIDS)&root_xids; + return rval; +} + +bool +toku_xids_can_create_child(XIDS xids) { + invariant(xids->num_xids < MAX_TRANSACTION_RECORDS); + return (xids->num_xids + 1) != MAX_TRANSACTION_RECORDS; +} + +int +toku_xids_create_unknown_child(XIDS parent_xids, XIDS *xids_p) { + // Postcondition: + // xids_p points to an xids that is an exact copy of parent_xids, but with room for one more xid. + int rval; + invariant(parent_xids); + uint32_t num_child_xids = parent_xids->num_xids + 1; + // assumes that caller has verified that num_child_xids will + // be less than MAX_TRANSACTIN_RECORDS + invariant(num_child_xids < MAX_TRANSACTION_RECORDS); + size_t new_size = sizeof(*parent_xids) + num_child_xids*sizeof(parent_xids->ids[0]); + XIDS CAST_FROM_VOIDP(xids, toku_xmalloc(new_size)); + // Clone everything (parent does not have the newest xid). + memcpy(xids, parent_xids, new_size - sizeof(xids->ids[0])); + *xids_p = xids; + rval = 0; + return rval; +} + +void +toku_xids_finalize_with_child(XIDS xids, TXNID this_xid) { + // Precondition: + // - xids was created by toku_xids_create_unknown_child + TXNID this_xid_disk = toku_htod64(this_xid); + uint32_t num_child_xids = ++xids->num_xids; + xids->ids[num_child_xids - 1] = this_xid_disk; +} + +// xids is immutable. This function creates a new xids by copying the +// parent's list and then appending the xid of the new transaction. +int +toku_xids_create_child(XIDS parent_xids, // xids list for parent transaction + XIDS *xids_p, // xids list created + TXNID this_xid) { // xid of this transaction (new innermost) + bool can_create_child = toku_xids_can_create_child(parent_xids); + if (!can_create_child) { + return EINVAL; + } + toku_xids_create_unknown_child(parent_xids, xids_p); + toku_xids_finalize_with_child(*xids_p, this_xid); + return 0; +} + +void +toku_xids_create_from_buffer(struct rbuf *rb, // xids list for parent transaction + XIDS *xids_p) { // xids list created + uint8_t num_xids = rbuf_char(rb); + invariant(num_xids < MAX_TRANSACTION_RECORDS); + XIDS CAST_FROM_VOIDP(xids, toku_xmalloc(sizeof(*xids) + num_xids*sizeof(xids->ids[0]))); + xids->num_xids = num_xids; + uint8_t index; + for (index = 0; index < xids->num_xids; index++) { + rbuf_TXNID(rb, &xids->ids[index]); + } + *xids_p = xids; +} + +void +toku_xids_destroy(XIDS *xids_p) { + if (*xids_p != toku_xids_get_root_xids()) toku_free(*xids_p); + *xids_p = NULL; +} + +// Return xid at requested position. +// If requesting an xid out of range (which will be the case if xids array is empty) +// then return 0, the xid of the root transaction. +TXNID +toku_xids_get_xid(XIDS xids, uint8_t index) { + invariant(index < toku_xids_get_num_xids(xids)); + TXNID rval = xids->ids[index]; + rval = toku_dtoh64(rval); + return rval; +} + +uint8_t +toku_xids_get_num_xids(XIDS xids) { + uint8_t rval = xids->num_xids; + return rval; +} + +// Return innermost xid +TXNID +toku_xids_get_innermost_xid(XIDS xids) { + TXNID rval = TXNID_NONE; + if (toku_xids_get_num_xids(xids)) { + // if clause above makes this cast ok + uint8_t innermost_xid = (uint8_t) (toku_xids_get_num_xids(xids) - 1); + rval = toku_xids_get_xid(xids, innermost_xid); + } + return rval; +} + +TXNID +toku_xids_get_outermost_xid(XIDS xids) { + TXNID rval = TXNID_NONE; + if (toku_xids_get_num_xids(xids)) { + rval = toku_xids_get_xid(xids, 0); + } + return rval; +} + +void +toku_xids_cpy(XIDS target, XIDS source) { + size_t size = toku_xids_get_size(source); + memcpy(target, source, size); +} + +// return size in bytes +uint32_t +toku_xids_get_size(XIDS xids) { + uint32_t rval; + uint8_t num_xids = xids->num_xids; + rval = sizeof(*xids) + num_xids * sizeof(xids->ids[0]); + return rval; +} + +uint32_t +toku_xids_get_serialize_size(XIDS xids) { + uint32_t rval; + uint8_t num_xids = xids->num_xids; + rval = 1 + //num xids + 8 * num_xids; + return rval; +} + +unsigned char * +toku_xids_get_end_of_array(XIDS xids) { + TXNID *r = xids->ids + xids->num_xids; + return (unsigned char*)r; +} + +void wbuf_nocrc_xids(struct wbuf *wb, XIDS xids) { + wbuf_nocrc_char(wb, (unsigned char)xids->num_xids); + uint8_t index; + for (index = 0; index < xids->num_xids; index++) { + wbuf_nocrc_TXNID(wb, xids->ids[index]); + } +} + +void +toku_xids_fprintf(FILE *fp, XIDS xids) { + uint8_t index; + unsigned num_xids = toku_xids_get_num_xids(xids); + fprintf(fp, "[|%u| ", num_xids); + for (index = 0; index < toku_xids_get_num_xids(xids); index++) { + if (index) fprintf(fp, ","); + fprintf(fp, "%" PRIx64, toku_xids_get_xid(xids, index)); + } + fprintf(fp, "]"); +} + diff --git a/storage/tokudb/PerconaFT/ft/txn/xids.h b/storage/tokudb/PerconaFT/ft/txn/xids.h new file mode 100644 index 00000000..83ad5e57 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/txn/xids.h @@ -0,0 +1,116 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +/* Purpose of this file is to provide the world with everything necessary + * to use the xids and nothing else. + * Internal requirements of the xids logic do not belong here. + * + * xids is (abstractly) an immutable list of nested transaction ids, accessed only + * via the functions in this file. + * + * See design documentation for nested transactions at + * TokuWiki/Imp/TransactionsOverview. + */ + +#pragma once + +#include "ft/txn/txn.h" +#include "ft/serialize/rbuf.h" +#include "ft/serialize/wbuf.h" + +/* The number of transaction ids stored in the xids structure is + * represented by an 8-bit value. The value 255 is reserved. + * The constant MAX_NESTED_TRANSACTIONS is one less because + * one slot in the packed leaf entry is used for the implicit + * root transaction (id 0). + */ +enum { + MAX_NESTED_TRANSACTIONS = 253, + MAX_TRANSACTION_RECORDS = MAX_NESTED_TRANSACTIONS + 1 +}; + +// Variable size list of transaction ids (known in design doc as xids<>). +// ids[0] is the outermost transaction. +// ids[num_xids - 1] is the innermost transaction. +// Should only be accessed by accessor functions toku_xids_xxx, not directly. + +// If the xids struct is unpacked, the compiler aligns the ids[] and we waste a lot of space +struct __attribute__((__packed__)) XIDS_S { + // maximum value of MAX_TRANSACTION_RECORDS - 1 because transaction 0 is implicit + uint8_t num_xids; + TXNID ids[]; +}; +typedef struct XIDS_S *XIDS; + +// Retrieve an XIDS representing the root transaction. +XIDS toku_xids_get_root_xids(void); + +bool toku_xids_can_create_child(XIDS xids); + +void toku_xids_cpy(XIDS target, XIDS source); + +//Creates an XIDS representing this transaction. +//You must pass in an XIDS representing the parent of this transaction. +int toku_xids_create_child(XIDS parent_xids, XIDS *xids_p, TXNID this_xid); + +// The following two functions (in order) are equivalent to toku_xids_create child, +// but allow you to do most of the work without knowing the new xid. +int toku_xids_create_unknown_child(XIDS parent_xids, XIDS *xids_p); +void toku_xids_finalize_with_child(XIDS xids, TXNID this_xid); + +void toku_xids_create_from_buffer(struct rbuf *rb, XIDS *xids_p); + +void toku_xids_destroy(XIDS *xids_p); + +TXNID toku_xids_get_xid(XIDS xids, uint8_t index); + +uint8_t toku_xids_get_num_xids(XIDS xids); + +TXNID toku_xids_get_innermost_xid(XIDS xids); +TXNID toku_xids_get_outermost_xid(XIDS xids); + +// return size in bytes +uint32_t toku_xids_get_size(XIDS xids); + +uint32_t toku_xids_get_serialize_size(XIDS xids); + +unsigned char *toku_xids_get_end_of_array(XIDS xids); + +void wbuf_nocrc_xids(struct wbuf *wb, XIDS xids); + +void toku_xids_fprintf(FILE* fp, XIDS xids); |