summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc1023
1 files changed, 1023 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc b/src/rocksdb/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc
new file mode 100644
index 000000000..3d6a590c7
--- /dev/null
+++ b/src/rocksdb/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc
@@ -0,0 +1,1023 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
+#ifndef ROCKSDB_LITE
+#ifndef OS_WIN
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+======= */
+
+#ident \
+ "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "locktree.h"
+
+#include <memory.h>
+
+#include "../portability/toku_pthread.h"
+#include "../portability/toku_time.h"
+#include "../util/growable_array.h"
+#include "range_buffer.h"
+
+// including the concurrent_tree here expands the templates
+// and "defines" the implementation, so we do it here in
+// the locktree source file instead of the header.
+#include "concurrent_tree.h"
+
+namespace toku {
+// A locktree represents the set of row locks owned by all transactions
+// over an open dictionary. Read and write ranges are represented as
+// a left and right key which are compared with the given descriptor
+// and comparison fn.
+//
+// Each locktree has a reference count which it manages
+// but does nothing based on the value of the reference count - it is
+// up to the user of the locktree to destroy it when it sees fit.
+
+void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id,
+ const comparator &cmp,
+ toku_external_mutex_factory_t mutex_factory) {
+ m_mgr = mgr;
+ m_dict_id = dict_id;
+
+ m_cmp.create_from(cmp);
+ m_reference_count = 1;
+ m_userdata = nullptr;
+
+ XCALLOC(m_rangetree);
+ m_rangetree->create(&m_cmp);
+
+ m_sto_txnid = TXNID_NONE;
+ m_sto_buffer.create();
+ m_sto_score = STO_SCORE_THRESHOLD;
+ m_sto_end_early_count = 0;
+ m_sto_end_early_time = 0;
+
+ m_escalation_barrier = [](const DBT *, const DBT *, void *) -> bool {
+ return false;
+ };
+
+ m_lock_request_info.init(mutex_factory);
+}
+
+void locktree::set_escalation_barrier_func(
+ lt_escalation_barrier_check_func func, void *extra) {
+ m_escalation_barrier = func;
+ m_escalation_barrier_arg = extra;
+}
+
+void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) {
+ pending_lock_requests.create();
+ pending_is_empty = true;
+ toku_external_mutex_init(mutex_factory, &mutex);
+ retry_want = retry_done = 0;
+ ZERO_STRUCT(counters);
+ ZERO_STRUCT(retry_mutex);
+ toku_mutex_init(locktree_request_info_retry_mutex_key, &retry_mutex, nullptr);
+ toku_cond_init(locktree_request_info_retry_cv_key, &retry_cv, nullptr);
+ running_retry = false;
+
+ TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty,
+ sizeof(pending_is_empty));
+ TOKU_DRD_IGNORE_VAR(pending_is_empty);
+}
+
+void locktree::destroy(void) {
+ invariant(m_reference_count == 0);
+ invariant(m_lock_request_info.pending_lock_requests.size() == 0);
+ m_cmp.destroy();
+ m_rangetree->destroy();
+ toku_free(m_rangetree);
+ m_sto_buffer.destroy();
+ m_lock_request_info.destroy();
+}
+
+void lt_lock_request_info::destroy(void) {
+ pending_lock_requests.destroy();
+ toku_external_mutex_destroy(&mutex);
+ toku_mutex_destroy(&retry_mutex);
+ toku_cond_destroy(&retry_cv);
+}
+
+void locktree::add_reference(void) {
+ (void)toku_sync_add_and_fetch(&m_reference_count, 1);
+}
+
+uint32_t locktree::release_reference(void) {
+ return toku_sync_sub_and_fetch(&m_reference_count, 1);
+}
+
+uint32_t locktree::get_reference_count(void) { return m_reference_count; }
+
+// a container for a range/txnid pair
+struct row_lock {
+ keyrange range;
+ TXNID txnid;
+ bool is_shared;
+ TxnidVector *owners;
+};
+
+// iterate over a locked keyrange and copy out all of the data,
+// storing each row lock into the given growable array. the
+// caller does not own the range inside the returned row locks,
+// so remove from the tree with care using them as keys.
+static void iterate_and_get_overlapping_row_locks(
+ const concurrent_tree::locked_keyrange *lkr,
+ GrowableArray<row_lock> *row_locks) {
+ struct copy_fn_obj {
+ GrowableArray<row_lock> *row_locks;
+ bool fn(const keyrange &range, TXNID txnid, bool is_shared,
+ TxnidVector *owners) {
+ row_lock lock = {.range = range,
+ .txnid = txnid,
+ .is_shared = is_shared,
+ .owners = owners};
+ row_locks->push(lock);
+ return true;
+ }
+ } copy_fn;
+ copy_fn.row_locks = row_locks;
+ lkr->iterate(&copy_fn);
+}
+
+// given a txnid and a set of overlapping row locks, determine
+// which txnids are conflicting, and store them in the conflicts
+// set, if given.
+static bool determine_conflicting_txnids(
+ const GrowableArray<row_lock> &row_locks, const TXNID &txnid,
+ txnid_set *conflicts) {
+ bool conflicts_exist = false;
+ const size_t num_overlaps = row_locks.get_size();
+ for (size_t i = 0; i < num_overlaps; i++) {
+ const row_lock lock = row_locks.fetch_unchecked(i);
+ const TXNID other_txnid = lock.txnid;
+ if (other_txnid != txnid) {
+ if (conflicts) {
+ if (other_txnid == TXNID_SHARED) {
+ // Add all shared lock owners, except this transaction.
+ for (TXNID shared_id : *lock.owners) {
+ if (shared_id != txnid) conflicts->add(shared_id);
+ }
+ } else {
+ conflicts->add(other_txnid);
+ }
+ }
+ conflicts_exist = true;
+ }
+ }
+ return conflicts_exist;
+}
+
+// how much memory does a row lock take up in a concurrent tree?
+static uint64_t row_lock_size_in_tree(const row_lock &lock) {
+ const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead();
+ return lock.range.get_memory_size() + overhead;
+}
+
+// remove and destroy the given row lock from the locked keyrange,
+// then notify the memory tracker of the newly freed lock.
+static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
+ const row_lock &lock, TXNID txnid,
+ locktree_manager *mgr) {
+ const uint64_t mem_released = row_lock_size_in_tree(lock);
+ lkr->remove(lock.range, txnid);
+ if (mgr != nullptr) {
+ mgr->note_mem_released(mem_released);
+ }
+}
+
+// insert a row lock into the locked keyrange, then notify
+// the memory tracker of this newly acquired lock.
+static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
+ const row_lock &lock,
+ locktree_manager *mgr) {
+ uint64_t mem_used = row_lock_size_in_tree(lock);
+ lkr->insert(lock.range, lock.txnid, lock.is_shared);
+ if (mgr != nullptr) {
+ mgr->note_mem_used(mem_used);
+ }
+}
+
+void locktree::sto_begin(TXNID txnid) {
+ invariant(m_sto_txnid == TXNID_NONE);
+ invariant(m_sto_buffer.is_empty());
+ m_sto_txnid = txnid;
+}
+
+void locktree::sto_append(const DBT *left_key, const DBT *right_key,
+ bool is_write_request) {
+ uint64_t buffer_mem, delta;
+
+ // psergey: the below two lines do not make any sense
+ // (and it's the same in upstream TokuDB)
+ keyrange range;
+ range.create(left_key, right_key);
+
+ buffer_mem = m_sto_buffer.total_memory_size();
+ m_sto_buffer.append(left_key, right_key, is_write_request);
+ delta = m_sto_buffer.total_memory_size() - buffer_mem;
+ if (m_mgr != nullptr) {
+ m_mgr->note_mem_used(delta);
+ }
+}
+
+void locktree::sto_end(void) {
+ uint64_t mem_size = m_sto_buffer.total_memory_size();
+ if (m_mgr != nullptr) {
+ m_mgr->note_mem_released(mem_size);
+ }
+ m_sto_buffer.destroy();
+ m_sto_buffer.create();
+ m_sto_txnid = TXNID_NONE;
+}
+
+void locktree::sto_end_early_no_accounting(void *prepared_lkr) {
+ sto_migrate_buffer_ranges_to_tree(prepared_lkr);
+ sto_end();
+ toku_unsafe_set(m_sto_score, 0);
+}
+
+void locktree::sto_end_early(void *prepared_lkr) {
+ m_sto_end_early_count++;
+
+ tokutime_t t0 = toku_time_now();
+ sto_end_early_no_accounting(prepared_lkr);
+ tokutime_t t1 = toku_time_now();
+
+ m_sto_end_early_time += (t1 - t0);
+}
+
+void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
+ // There should be something to migrate, and nothing in the rangetree.
+ invariant(!m_sto_buffer.is_empty());
+ invariant(m_rangetree->is_empty());
+
+ concurrent_tree sto_rangetree;
+ concurrent_tree::locked_keyrange sto_lkr;
+ sto_rangetree.create(&m_cmp);
+
+ // insert all of the ranges from the single txnid buffer into a new rangtree
+ range_buffer::iterator iter(&m_sto_buffer);
+ range_buffer::iterator::record rec;
+ while (iter.current(&rec)) {
+ sto_lkr.prepare(&sto_rangetree);
+ int r = acquire_lock_consolidated(&sto_lkr, m_sto_txnid, rec.get_left_key(),
+ rec.get_right_key(),
+ rec.get_exclusive_flag(), nullptr);
+ invariant_zero(r);
+ sto_lkr.release();
+ iter.next();
+ }
+
+ // Iterate the newly created rangetree and insert each range into the
+ // locktree's rangetree, on behalf of the old single txnid.
+ struct migrate_fn_obj {
+ concurrent_tree::locked_keyrange *dst_lkr;
+ bool fn(const keyrange &range, TXNID txnid, bool is_shared,
+ TxnidVector *owners) {
+ // There can't be multiple owners in STO mode
+ invariant_zero(owners);
+ dst_lkr->insert(range, txnid, is_shared);
+ return true;
+ }
+ } migrate_fn;
+ migrate_fn.dst_lkr =
+ static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
+ sto_lkr.prepare(&sto_rangetree);
+ sto_lkr.iterate(&migrate_fn);
+ sto_lkr.remove_all();
+ sto_lkr.release();
+ sto_rangetree.destroy();
+ invariant(!m_rangetree->is_empty());
+}
+
+bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid,
+ const DBT *left_key, const DBT *right_key,
+ bool is_write_request) {
+ if (m_rangetree->is_empty() && m_sto_buffer.is_empty() &&
+ toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
+ // We can do the optimization because the rangetree is empty, and
+ // we know its worth trying because the sto score is big enough.
+ sto_begin(txnid);
+ } else if (m_sto_txnid != TXNID_NONE) {
+ // We are currently doing the optimization. Check if we need to cancel
+ // it because a new txnid appeared, or if the current single txnid has
+ // taken too many locks already.
+ if (m_sto_txnid != txnid ||
+ m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) {
+ sto_end_early(prepared_lkr);
+ }
+ }
+
+ // At this point the sto txnid is properly set. If it is valid, then
+ // this txnid can append its lock to the sto buffer successfully.
+ if (m_sto_txnid != TXNID_NONE) {
+ invariant(m_sto_txnid == txnid);
+ sto_append(left_key, right_key, is_write_request);
+ return true;
+ } else {
+ invariant(m_sto_buffer.is_empty());
+ return false;
+ }
+}
+
+/*
+ Do the same as iterate_and_get_overlapping_row_locks does, but also check for
+ this:
+ The set of overlapping rows locks consists of just one read-only shared
+ lock with the same endpoints as specified (in that case, we can just add
+ ourselves into that list)
+
+ @return true - One compatible shared lock
+ false - Otherwise
+*/
+static bool iterate_and_get_overlapping_row_locks2(
+ const concurrent_tree::locked_keyrange *lkr, const DBT *left_key,
+ const DBT *right_key, comparator *cmp, TXNID,
+ GrowableArray<row_lock> *row_locks) {
+ struct copy_fn_obj {
+ GrowableArray<row_lock> *row_locks;
+ bool first_call = true;
+ bool matching_lock_found = false;
+ const DBT *left_key, *right_key;
+ comparator *cmp;
+
+ bool fn(const keyrange &range, TXNID txnid, bool is_shared,
+ TxnidVector *owners) {
+ if (first_call) {
+ first_call = false;
+ if (is_shared && !(*cmp)(left_key, range.get_left_key()) &&
+ !(*cmp)(right_key, range.get_right_key())) {
+ matching_lock_found = true;
+ }
+ } else {
+ // if we see multiple matching locks, it doesn't matter whether
+ // the first one was matching.
+ matching_lock_found = false;
+ }
+ row_lock lock = {.range = range,
+ .txnid = txnid,
+ .is_shared = is_shared,
+ .owners = owners};
+ row_locks->push(lock);
+ return true;
+ }
+ } copy_fn;
+ copy_fn.row_locks = row_locks;
+ copy_fn.left_key = left_key;
+ copy_fn.right_key = right_key;
+ copy_fn.cmp = cmp;
+ lkr->iterate(&copy_fn);
+ return copy_fn.matching_lock_found;
+}
+
+// try to acquire a lock and consolidate it with existing locks if possible
+// param: lkr, a prepared locked keyrange
+// return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
+int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
+ const DBT *left_key,
+ const DBT *right_key,
+ bool is_write_request,
+ txnid_set *conflicts) {
+ int r = 0;
+ concurrent_tree::locked_keyrange *lkr;
+
+ keyrange requested_range;
+ requested_range.create(left_key, right_key);
+ lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
+ lkr->acquire(requested_range);
+
+ // copy out the set of overlapping row locks.
+ GrowableArray<row_lock> overlapping_row_locks;
+ overlapping_row_locks.init();
+ bool matching_shared_lock_found = false;
+
+ if (is_write_request)
+ iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
+ else {
+ matching_shared_lock_found = iterate_and_get_overlapping_row_locks2(
+ lkr, left_key, right_key, &m_cmp, txnid, &overlapping_row_locks);
+ // psergey-todo: what to do now? So, we have figured we have just one
+ // shareable lock. Need to add us into it as an owner but the lock
+ // pointer cannot be kept?
+ // A: use find_node_with_overlapping_child(key_range, nullptr);
+ // then, add ourselves to the owner list.
+ // Dont' foreget to release the subtree after that.
+ }
+
+ if (matching_shared_lock_found) {
+ // there is just one non-confliting matching shared lock.
+ // we are hilding a lock on it (see acquire() call above).
+ // we need to modify it to indicate there is another locker...
+ if (lkr->add_shared_owner(requested_range, txnid)) {
+ // Pretend shared lock uses as much memory.
+ row_lock new_lock = {.range = requested_range,
+ .txnid = txnid,
+ .is_shared = false,
+ .owners = nullptr};
+ uint64_t mem_used = row_lock_size_in_tree(new_lock);
+ if (m_mgr) {
+ m_mgr->note_mem_used(mem_used);
+ }
+ }
+ requested_range.destroy();
+ overlapping_row_locks.deinit();
+ return 0;
+ }
+
+ size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
+
+ // if any overlapping row locks conflict with this request, bail out.
+
+ bool conflicts_exist =
+ determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
+ if (!conflicts_exist) {
+ // there are no conflicts, so all of the overlaps are for the requesting
+ // txnid. so, we must consolidate all existing overlapping ranges and the
+ // requested range into one dominating range. then we insert the dominating
+ // range.
+ bool all_shared = !is_write_request;
+ for (size_t i = 0; i < num_overlapping_row_locks; i++) {
+ row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
+ invariant(overlapping_lock.txnid == txnid);
+ requested_range.extend(m_cmp, overlapping_lock.range);
+ remove_row_lock_from_tree(lkr, overlapping_lock, TXNID_ANY, m_mgr);
+ all_shared = all_shared && overlapping_lock.is_shared;
+ }
+
+ row_lock new_lock = {.range = requested_range,
+ .txnid = txnid,
+ .is_shared = all_shared,
+ .owners = nullptr};
+ insert_row_lock_into_tree(lkr, new_lock, m_mgr);
+ } else {
+ r = DB_LOCK_NOTGRANTED;
+ }
+
+ requested_range.destroy();
+ overlapping_row_locks.deinit();
+ return r;
+}
+
+// acquire a lock in the given key range, inclusive. if successful,
+// return 0. otherwise, populate the conflicts txnid_set with the set of
+// transactions that conflict with this request.
+int locktree::acquire_lock(bool is_write_request, TXNID txnid,
+ const DBT *left_key, const DBT *right_key,
+ txnid_set *conflicts) {
+ int r = 0;
+
+ // we are only supporting write locks for simplicity
+ // invariant(is_write_request);
+
+ // acquire and prepare a locked keyrange over the requested range.
+ // prepare is a serialzation point, so we take the opportunity to
+ // try the single txnid optimization first.
+ concurrent_tree::locked_keyrange lkr;
+ lkr.prepare(m_rangetree);
+
+ bool acquired =
+ sto_try_acquire(&lkr, txnid, left_key, right_key, is_write_request);
+ if (!acquired) {
+ r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key,
+ is_write_request, conflicts);
+ }
+
+ lkr.release();
+ return r;
+}
+
+int locktree::try_acquire_lock(bool is_write_request, TXNID txnid,
+ const DBT *left_key, const DBT *right_key,
+ txnid_set *conflicts, bool big_txn) {
+ // All ranges in the locktree must have left endpoints <= right endpoints.
+ // Range comparisons rely on this fact, so we make a paranoid invariant here.
+ paranoid_invariant(m_cmp(left_key, right_key) <= 0);
+ int r = m_mgr == nullptr ? 0 : m_mgr->check_current_lock_constraints(big_txn);
+ if (r == 0) {
+ r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
+ }
+ return r;
+}
+
+// the locktree silently upgrades read locks to write locks for simplicity
+int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key,
+ const DBT *right_key, txnid_set *conflicts,
+ bool big_txn) {
+ return try_acquire_lock(false, txnid, left_key, right_key, conflicts,
+ big_txn);
+}
+
+int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key,
+ const DBT *right_key, txnid_set *conflicts,
+ bool big_txn) {
+ return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
+}
+
+// typedef void (*dump_callback)(void *cdata, const DBT *left, const DBT *right,
+// TXNID txnid);
+void locktree::dump_locks(void *cdata, dump_callback cb) {
+ concurrent_tree::locked_keyrange lkr;
+ keyrange range;
+ range.create(toku_dbt_negative_infinity(), toku_dbt_positive_infinity());
+
+ lkr.prepare(m_rangetree);
+ lkr.acquire(range);
+
+ TXNID sto_txn;
+ if ((sto_txn = toku_unsafe_fetch(m_sto_txnid)) != TXNID_NONE) {
+ // insert all of the ranges from the single txnid buffer into a new rangtree
+ range_buffer::iterator iter(&m_sto_buffer);
+ range_buffer::iterator::record rec;
+ while (iter.current(&rec)) {
+ (*cb)(cdata, rec.get_left_key(), rec.get_right_key(), sto_txn,
+ !rec.get_exclusive_flag(), nullptr);
+ iter.next();
+ }
+ } else {
+ GrowableArray<row_lock> all_locks;
+ all_locks.init();
+ iterate_and_get_overlapping_row_locks(&lkr, &all_locks);
+
+ const size_t n_locks = all_locks.get_size();
+ for (size_t i = 0; i < n_locks; i++) {
+ const row_lock lock = all_locks.fetch_unchecked(i);
+ (*cb)(cdata, lock.range.get_left_key(), lock.range.get_right_key(),
+ lock.txnid, lock.is_shared, lock.owners);
+ }
+ all_locks.deinit();
+ }
+ lkr.release();
+ range.destroy();
+}
+
+void locktree::get_conflicts(bool is_write_request, TXNID txnid,
+ const DBT *left_key, const DBT *right_key,
+ txnid_set *conflicts) {
+ // because we only support write locks, ignore this bit for now.
+ (void)is_write_request;
+
+ // preparing and acquire a locked keyrange over the range
+ keyrange range;
+ range.create(left_key, right_key);
+ concurrent_tree::locked_keyrange lkr;
+ lkr.prepare(m_rangetree);
+ lkr.acquire(range);
+
+ // copy out the set of overlapping row locks and determine the conflicts
+ GrowableArray<row_lock> overlapping_row_locks;
+ overlapping_row_locks.init();
+ iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
+
+ // we don't care if conflicts exist. we just want the conflicts set populated.
+ (void)determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
+
+ lkr.release();
+ overlapping_row_locks.deinit();
+ range.destroy();
+}
+
+// Effect:
+// For each range in the lock tree that overlaps the given range and has
+// the given txnid, remove it.
+// Rationale:
+// In the common case, there is only the range [left_key, right_key] and
+// it is associated with txnid, so this is a single tree delete.
+//
+// However, consolidation and escalation change the objects in the tree
+// without telling the txn anything. In this case, the txn may own a
+// large range lock that represents its ownership of many smaller range
+// locks. For example, the txn may think it owns point locks on keys 1,
+// 2, and 3, but due to escalation, only the object [1,3] exists in the
+// tree.
+//
+// The first call for a small lock will remove the large range lock, and
+// the rest of the calls should do nothing. After the first release,
+// another thread can acquire one of the locks that the txn thinks it
+// still owns. That's ok, because the txn doesn't want it anymore (it
+// unlocks everything at once), but it may find a lock that it does not
+// own.
+//
+// In our example, the txn unlocks key 1, which actually removes the
+// whole lock [1,3]. Now, someone else can lock 2 before our txn gets
+// around to unlocking 2, so we should not remove that lock.
+void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
+ const DBT *left_key,
+ const DBT *right_key) {
+ keyrange release_range;
+ release_range.create(left_key, right_key);
+
+ // acquire and prepare a locked keyrange over the release range
+ concurrent_tree::locked_keyrange lkr;
+ lkr.prepare(m_rangetree);
+ lkr.acquire(release_range);
+
+ // copy out the set of overlapping row locks.
+ GrowableArray<row_lock> overlapping_row_locks;
+ overlapping_row_locks.init();
+ iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
+ size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
+
+ for (size_t i = 0; i < num_overlapping_row_locks; i++) {
+ row_lock lock = overlapping_row_locks.fetch_unchecked(i);
+ // If this isn't our lock, that's ok, just don't remove it.
+ // See rationale above.
+ // psergey-todo: for shared locks, just remove ourselves from the
+ // owners.
+ if (lock.txnid == txnid || (lock.owners && lock.owners->contains(txnid))) {
+ remove_row_lock_from_tree(&lkr, lock, txnid, m_mgr);
+ }
+ }
+
+ lkr.release();
+ overlapping_row_locks.deinit();
+ release_range.destroy();
+}
+
+bool locktree::sto_txnid_is_valid_unsafe(void) const {
+ return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE;
+}
+
+int locktree::sto_get_score_unsafe(void) const {
+ return toku_unsafe_fetch(m_sto_score);
+}
+
+bool locktree::sto_try_release(TXNID txnid) {
+ bool released = false;
+ if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
+ // check the bit again with a prepared locked keyrange,
+ // which protects the optimization bits and rangetree data
+ concurrent_tree::locked_keyrange lkr;
+ lkr.prepare(m_rangetree);
+ if (m_sto_txnid != TXNID_NONE) {
+ // this txnid better be the single txnid on this locktree,
+ // or else we are in big trouble (meaning the logic is broken)
+ invariant(m_sto_txnid == txnid);
+ invariant(m_rangetree->is_empty());
+ sto_end();
+ released = true;
+ }
+ lkr.release();
+ }
+ return released;
+}
+
+// release all of the locks for a txnid whose endpoints are pairs
+// in the given range buffer.
+void locktree::release_locks(TXNID txnid, const range_buffer *ranges,
+ bool all_trx_locks_hint) {
+ // try the single txn optimization. if it worked, then all of the
+ // locks are already released, otherwise we need to do it here.
+ bool released;
+ if (all_trx_locks_hint) {
+ // This will release all of the locks the transaction is holding
+ released = sto_try_release(txnid);
+ } else {
+ /*
+ psergey: we are asked to release *Some* of the locks the transaction
+ is holding.
+ We could try doing that without leaving the STO mode, but right now,
+ the easiest way is to exit the STO mode and let the non-STO code path
+ handle it.
+ */
+ if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
+ // check the bit again with a prepared locked keyrange,
+ // which protects the optimization bits and rangetree data
+ concurrent_tree::locked_keyrange lkr;
+ lkr.prepare(m_rangetree);
+ if (m_sto_txnid != TXNID_NONE) {
+ sto_end_early(&lkr);
+ }
+ lkr.release();
+ }
+ released = false;
+ }
+ if (!released) {
+ range_buffer::iterator iter(ranges);
+ range_buffer::iterator::record rec;
+ while (iter.current(&rec)) {
+ const DBT *left_key = rec.get_left_key();
+ const DBT *right_key = rec.get_right_key();
+ // All ranges in the locktree must have left endpoints <= right endpoints.
+ // Range comparisons rely on this fact, so we make a paranoid invariant
+ // here.
+ paranoid_invariant(m_cmp(left_key, right_key) <= 0);
+ remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
+ iter.next();
+ }
+ // Increase the sto score slightly. Eventually it will hit
+ // the threshold and we'll try the optimization again. This
+ // is how a previously multithreaded system transitions into
+ // a single threaded system that benefits from the optimization.
+ if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) {
+ toku_sync_fetch_and_add(&m_sto_score, 1);
+ }
+ }
+}
+
+// iterate over a locked keyrange and extract copies of the first N
+// row locks, storing each one into the given array of size N,
+// then removing each extracted lock from the locked keyrange.
+static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
+ locktree_manager *mgr, row_lock *row_locks,
+ int num_to_extract) {
+ struct extract_fn_obj {
+ int num_extracted;
+ int num_to_extract;
+ row_lock *row_locks;
+ bool fn(const keyrange &range, TXNID txnid, bool is_shared,
+ TxnidVector *owners) {
+ if (num_extracted < num_to_extract) {
+ row_lock lock;
+ lock.range.create_copy(range);
+ lock.txnid = txnid;
+ lock.is_shared = is_shared;
+ // deep-copy the set of owners:
+ if (owners)
+ lock.owners = new TxnidVector(*owners);
+ else
+ lock.owners = nullptr;
+ row_locks[num_extracted++] = lock;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ } extract_fn;
+
+ extract_fn.row_locks = row_locks;
+ extract_fn.num_to_extract = num_to_extract;
+ extract_fn.num_extracted = 0;
+ lkr->iterate(&extract_fn);
+
+ // now that the ranges have been copied out, complete
+ // the extraction by removing the ranges from the tree.
+ // use remove_row_lock_from_tree() so we properly track the
+ // amount of memory and number of locks freed.
+ int num_extracted = extract_fn.num_extracted;
+ invariant(num_extracted <= num_to_extract);
+ for (int i = 0; i < num_extracted; i++) {
+ remove_row_lock_from_tree(lkr, row_locks[i], TXNID_ANY, mgr);
+ }
+
+ return num_extracted;
+}
+
+// Store each newly escalated lock in a range buffer for appropriate txnid.
+// We'll rebuild the locktree by iterating over these ranges, and then we
+// can pass back each txnid/buffer pair individually through a callback
+// to notify higher layers that locks have changed.
+struct txnid_range_buffer {
+ TXNID txnid;
+ range_buffer buffer;
+
+ static int find_by_txnid(struct txnid_range_buffer *const &other_buffer,
+ const TXNID &txnid) {
+ if (txnid < other_buffer->txnid) {
+ return -1;
+ } else if (other_buffer->txnid == txnid) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+};
+
+// escalate the locks in the locktree by merging adjacent
+// locks that have the same txnid into one larger lock.
+//
+// if there's only one txnid in the locktree then this
+// approach works well. if there are many txnids and each
+// has locks in a random/alternating order, then this does
+// not work so well.
+void locktree::escalate(lt_escalate_cb after_escalate_callback,
+ void *after_escalate_callback_extra) {
+ omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers;
+ range_buffers.create();
+
+ // prepare and acquire a locked keyrange on the entire locktree
+ concurrent_tree::locked_keyrange lkr;
+ keyrange infinite_range = keyrange::get_infinite_range();
+ lkr.prepare(m_rangetree);
+ lkr.acquire(infinite_range);
+
+ // if we're in the single txnid optimization, simply call it off.
+ // if you have to run escalation, you probably don't care about
+ // the optimization anyway, and this makes things easier.
+ if (m_sto_txnid != TXNID_NONE) {
+ // We are already accounting for this escalation time and
+ // count, so don't do it for sto_end_early too.
+ sto_end_early_no_accounting(&lkr);
+ }
+
+ // extract and remove batches of row locks from the locktree
+ int num_extracted;
+ const int num_row_locks_per_batch = 128;
+ row_lock *XCALLOC_N(num_row_locks_per_batch, extracted_buf);
+
+ // we always remove the "first" n because we are removing n
+ // each time we do an extraction. so this loops until its empty.
+ while ((num_extracted = extract_first_n_row_locks(
+ &lkr, m_mgr, extracted_buf, num_row_locks_per_batch)) > 0) {
+ int current_index = 0;
+ while (current_index < num_extracted) {
+ // every batch of extracted locks is in range-sorted order. search
+ // through them and merge adjacent locks with the same txnid into
+ // one dominating lock and save it to a set of escalated locks.
+ //
+ // first, find the index of the next row lock that
+ // - belongs to a different txnid, or
+ // - belongs to several txnids, or
+ // - is a shared lock (we could potentially merge those but
+ // currently we don't), or
+ // - is across a lock escalation barrier.
+ int next_txnid_index = current_index + 1;
+
+ while (next_txnid_index < num_extracted &&
+ (extracted_buf[current_index].txnid ==
+ extracted_buf[next_txnid_index].txnid) &&
+ !extracted_buf[next_txnid_index].is_shared &&
+ !extracted_buf[next_txnid_index].owners &&
+ !m_escalation_barrier(
+ extracted_buf[current_index].range.get_right_key(),
+ extracted_buf[next_txnid_index].range.get_left_key(),
+ m_escalation_barrier_arg)) {
+ next_txnid_index++;
+ }
+
+ // Create an escalated range for the current txnid that dominates
+ // each range between the current indext and the next txnid's index.
+ // const TXNID current_txnid = extracted_buf[current_index].txnid;
+ const DBT *escalated_left_key =
+ extracted_buf[current_index].range.get_left_key();
+ const DBT *escalated_right_key =
+ extracted_buf[next_txnid_index - 1].range.get_right_key();
+
+ // Try to find a range buffer for the current txnid. Create one if it
+ // doesn't exist. Then, append the new escalated range to the buffer. (If
+ // a lock is shared by multiple txnids, append it each of txnid's lists)
+ TxnidVector *owners_ptr;
+ TxnidVector singleton_owner;
+ if (extracted_buf[current_index].owners)
+ owners_ptr = extracted_buf[current_index].owners;
+ else {
+ singleton_owner.insert(extracted_buf[current_index].txnid);
+ owners_ptr = &singleton_owner;
+ }
+
+ for (auto cur_txnid : *owners_ptr) {
+ uint32_t idx;
+ struct txnid_range_buffer *existing_range_buffer;
+ int r =
+ range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
+ cur_txnid, &existing_range_buffer, &idx);
+ if (r == DB_NOTFOUND) {
+ struct txnid_range_buffer *XMALLOC(new_range_buffer);
+ new_range_buffer->txnid = cur_txnid;
+ new_range_buffer->buffer.create();
+ new_range_buffer->buffer.append(
+ escalated_left_key, escalated_right_key,
+ !extracted_buf[current_index].is_shared);
+ range_buffers.insert_at(new_range_buffer, idx);
+ } else {
+ invariant_zero(r);
+ invariant(existing_range_buffer->txnid == cur_txnid);
+ existing_range_buffer->buffer.append(
+ escalated_left_key, escalated_right_key,
+ !extracted_buf[current_index].is_shared);
+ }
+ }
+
+ current_index = next_txnid_index;
+ }
+
+ // destroy the ranges copied during the extraction
+ for (int i = 0; i < num_extracted; i++) {
+ delete extracted_buf[i].owners;
+ extracted_buf[i].range.destroy();
+ }
+ }
+ toku_free(extracted_buf);
+
+ // Rebuild the locktree from each range in each range buffer,
+ // then notify higher layers that the txnid's locks have changed.
+ //
+ // (shared locks: if a lock was initially shared between transactions TRX1,
+ // TRX2, etc, we will now try to acquire it acting on behalf on TRX1, on
+ // TRX2, etc. This will succeed and an identical shared lock will be
+ // constructed)
+
+ invariant(m_rangetree->is_empty());
+ const uint32_t num_range_buffers = range_buffers.size();
+ for (uint32_t i = 0; i < num_range_buffers; i++) {
+ struct txnid_range_buffer *current_range_buffer;
+ int r = range_buffers.fetch(i, &current_range_buffer);
+ invariant_zero(r);
+ if (r == EINVAL) // Shouldn't happen, avoid compiler warning
+ continue;
+
+ const TXNID current_txnid = current_range_buffer->txnid;
+ range_buffer::iterator iter(&current_range_buffer->buffer);
+ range_buffer::iterator::record rec;
+ while (iter.current(&rec)) {
+ keyrange range;
+ range.create(rec.get_left_key(), rec.get_right_key());
+ row_lock lock = {.range = range,
+ .txnid = current_txnid,
+ .is_shared = !rec.get_exclusive_flag(),
+ .owners = nullptr};
+ insert_row_lock_into_tree(&lkr, lock, m_mgr);
+ iter.next();
+ }
+
+ // Notify higher layers that locks have changed for the current txnid
+ if (after_escalate_callback) {
+ after_escalate_callback(current_txnid, this, current_range_buffer->buffer,
+ after_escalate_callback_extra);
+ }
+ current_range_buffer->buffer.destroy();
+ }
+
+ while (range_buffers.size() > 0) {
+ struct txnid_range_buffer *buffer;
+ int r = range_buffers.fetch(0, &buffer);
+ invariant_zero(r);
+ r = range_buffers.delete_at(0);
+ invariant_zero(r);
+ toku_free(buffer);
+ }
+ range_buffers.destroy();
+
+ lkr.release();
+}
+
+void *locktree::get_userdata(void) const { return m_userdata; }
+
+void locktree::set_userdata(void *userdata) { m_userdata = userdata; }
+
+struct lt_lock_request_info *locktree::get_lock_request_info(void) {
+ return &m_lock_request_info;
+}
+
+void locktree::set_comparator(const comparator &cmp) { m_cmp.inherit(cmp); }
+
+locktree_manager *locktree::get_manager(void) const { return m_mgr; }
+
+int locktree::compare(const locktree *lt) const {
+ if (m_dict_id.dictid < lt->m_dict_id.dictid) {
+ return -1;
+ } else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+DICTIONARY_ID locktree::get_dict_id() const { return m_dict_id; }
+
+} /* namespace toku */
+#endif // OS_WIN
+#endif // ROCKSDB_LITE