summaryrefslogtreecommitdiffstats
path: root/storage/tokudb/PerconaFT/ft/ft-flusher.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/ft-flusher.cc')
-rw-r--r--storage/tokudb/PerconaFT/ft/ft-flusher.cc1929
1 files changed, 1929 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/ft/ft-flusher.cc b/storage/tokudb/PerconaFT/ft/ft-flusher.cc
new file mode 100644
index 00000000..bbb2a170
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/ft-flusher.cc
@@ -0,0 +1,1929 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include <my_global.h>
+#include "ft/ft.h"
+#include "ft/ft-cachetable-wrappers.h"
+#include "ft/ft-internal.h"
+#include "ft/ft-flusher.h"
+#include "ft/ft-flusher-internal.h"
+#include "ft/node.h"
+#include "ft/serialize/block_table.h"
+#include "ft/serialize/ft_node-serialize.h"
+#include "portability/toku_assert.h"
+#include "portability/toku_atomic.h"
+#include "util/status.h"
+#include "util/context.h"
+
+
+void toku_ft_flusher_get_status(FT_FLUSHER_STATUS status) {
+ fl_status.init();
+ *status = fl_status;
+}
+
+//
+// For test purposes only.
+// These callbacks are never used in production code, only as a way
+// to test the system (for example, by causing crashes at predictable times).
+//
+static void (*flusher_thread_callback)(int, void*) = NULL;
+static void *flusher_thread_callback_extra = NULL;
+
+void toku_flusher_thread_set_callback(void (*callback_f)(int, void*),
+ void* extra) {
+ flusher_thread_callback = callback_f;
+ flusher_thread_callback_extra = extra;
+}
+
+static void call_flusher_thread_callback(int flt_state) {
+ if (flusher_thread_callback) {
+ flusher_thread_callback(flt_state, flusher_thread_callback_extra);
+ }
+}
+
+static int
+find_heaviest_child(FTNODE node)
+{
+ int max_child = 0;
+ uint64_t max_weight = toku_bnc_nbytesinbuf(BNC(node, 0)) + BP_WORKDONE(node, 0);
+
+ invariant(node->n_children > 0);
+ for (int i = 1; i < node->n_children; i++) {
+ uint64_t bytes_in_buf = toku_bnc_nbytesinbuf(BNC(node, i));
+ uint64_t workdone = BP_WORKDONE(node, i);
+ if (workdone > 0) {
+ invariant(bytes_in_buf > 0);
+ }
+ uint64_t this_weight = bytes_in_buf + workdone;
+ if (max_weight < this_weight) {
+ max_child = i;
+ max_weight = this_weight;
+ }
+ }
+ return max_child;
+}
+
+static void
+update_flush_status(FTNODE child, int cascades) {
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_TOTAL)++;
+ if (cascades > 0) {
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES)++;
+ switch (cascades) {
+ case 1:
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_1)++; break;
+ case 2:
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_2)++; break;
+ case 3:
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_3)++; break;
+ case 4:
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_4)++; break;
+ case 5:
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_5)++; break;
+ default:
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_GT_5)++; break;
+ }
+ }
+ bool flush_needs_io = false;
+ for (int i = 0; !flush_needs_io && i < child->n_children; ++i) {
+ if (BP_STATE(child, i) == PT_ON_DISK) {
+ flush_needs_io = true;
+ }
+ }
+ if (flush_needs_io) {
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_NEEDED_IO)++;
+ } else {
+ FL_STATUS_VAL(FT_FLUSHER_FLUSH_IN_MEMORY)++;
+ }
+}
+
+static void
+maybe_destroy_child_blbs(FTNODE node, FTNODE child, FT ft)
+{
+ // If the node is already fully in memory, as in upgrade, we don't
+ // need to destroy the basement nodes because they are all equally
+ // up to date.
+ if (child->n_children > 1 &&
+ child->height == 0 &&
+ !child->dirty()) {
+ for (int i = 0; i < child->n_children; ++i) {
+ if (BP_STATE(child, i) == PT_AVAIL &&
+ node->max_msn_applied_to_node_on_disk.msn < BLB_MAX_MSN_APPLIED(child, i).msn)
+ {
+ toku_evict_bn_from_memory(child, i, ft);
+ }
+ }
+ }
+}
+
+static void
+ft_merge_child(
+ FT ft,
+ FTNODE node,
+ int childnum_to_merge,
+ bool *did_react,
+ struct flusher_advice *fa);
+
+static int
+pick_heaviest_child(FT UU(ft),
+ FTNODE parent,
+ void* UU(extra))
+{
+ int childnum = find_heaviest_child(parent);
+ paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
+ return childnum;
+}
+
+bool
+dont_destroy_basement_nodes(void* UU(extra))
+{
+ return false;
+}
+
+static bool
+do_destroy_basement_nodes(void* UU(extra))
+{
+ return true;
+}
+
+bool
+always_recursively_flush(FTNODE UU(child), void* UU(extra))
+{
+ return true;
+}
+
+bool
+never_recursively_flush(FTNODE UU(child), void* UU(extra))
+{
+ return false;
+}
+
+/**
+ * Flusher thread ("normal" flushing) implementation.
+ */
+struct flush_status_update_extra {
+ int cascades;
+ uint32_t nodesize;
+};
+
+static bool
+recurse_if_child_is_gorged(FTNODE child, void* extra)
+{
+ struct flush_status_update_extra *fste = (flush_status_update_extra *)extra;
+ return toku_ftnode_nonleaf_is_gorged(child, fste->nodesize);
+}
+
+int
+default_pick_child_after_split(FT UU(ft),
+ FTNODE UU(parent),
+ int UU(childnuma),
+ int UU(childnumb),
+ void* UU(extra))
+{
+ return -1;
+}
+
+void
+default_merge_child(struct flusher_advice *fa,
+ FT ft,
+ FTNODE parent,
+ int childnum,
+ FTNODE child,
+ void* UU(extra))
+{
+ //
+ // There is probably a way to pass FTNODE child
+ // into ft_merge_child, but for simplicity for now,
+ // we are just going to unpin child and
+ // let ft_merge_child pin it again
+ //
+ toku_unpin_ftnode(ft, child);
+ //
+ //
+ // it is responsibility of ft_merge_child to unlock parent
+ //
+ bool did_react;
+ ft_merge_child(ft, parent, childnum, &did_react, fa);
+}
+
+void
+flusher_advice_init(
+ struct flusher_advice *fa,
+ FA_PICK_CHILD pick_child,
+ FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,
+ FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,
+ FA_MAYBE_MERGE_CHILD maybe_merge_child,
+ FA_UPDATE_STATUS update_status,
+ FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,
+ void* extra
+ )
+{
+ fa->pick_child = pick_child;
+ fa->should_destroy_basement_nodes = should_destroy_basement_nodes;
+ fa->should_recursively_flush = should_recursively_flush;
+ fa->maybe_merge_child = maybe_merge_child;
+ fa->update_status = update_status;
+ fa->pick_child_after_split = pick_child_after_split;
+ fa->extra = extra;
+}
+
+static void
+flt_update_status(FTNODE child,
+ int UU(dirtied),
+ void* extra)
+{
+ struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra;
+ update_flush_status(child, fste->cascades);
+ // If `toku_ft_flush_some_child` decides to recurse after this, we'll need
+ // cascades to increase. If not it doesn't matter.
+ fste->cascades++;
+}
+
+static void
+flt_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra *fste, uint32_t nodesize)
+{
+ fste->cascades = 0;
+ fste->nodesize = nodesize;
+ flusher_advice_init(fa,
+ pick_heaviest_child,
+ dont_destroy_basement_nodes,
+ recurse_if_child_is_gorged,
+ default_merge_child,
+ flt_update_status,
+ default_pick_child_after_split,
+ fste);
+}
+
+struct ctm_extra {
+ bool is_last_child;
+ DBT target_key;
+};
+
+static int
+ctm_pick_child(FT ft,
+ FTNODE parent,
+ void* extra)
+{
+ struct ctm_extra* ctme = (struct ctm_extra *) extra;
+ int childnum;
+ if (parent->height == 1 && ctme->is_last_child) {
+ childnum = parent->n_children - 1;
+ } else {
+ childnum = toku_ftnode_which_child(parent, &ctme->target_key, ft->cmp);
+ }
+ return childnum;
+}
+
+static void
+ctm_update_status(
+ FTNODE UU(child),
+ int dirtied,
+ void* UU(extra)
+ )
+{
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_DIRTIED_FOR_LEAF_MERGE) += dirtied;
+}
+
+static void
+ctm_maybe_merge_child(struct flusher_advice *fa,
+ FT ft,
+ FTNODE parent,
+ int childnum,
+ FTNODE child,
+ void *extra)
+{
+ if (child->height == 0) {
+ (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1);
+ }
+ default_merge_child(fa, ft, parent, childnum, child, extra);
+}
+
+static void
+ct_maybe_merge_child(struct flusher_advice *fa,
+ FT ft,
+ FTNODE parent,
+ int childnum,
+ FTNODE child,
+ void* extra)
+{
+ if (child->height > 0) {
+ default_merge_child(fa, ft, parent, childnum, child, extra);
+ }
+ else {
+ struct ctm_extra ctme;
+ paranoid_invariant(parent->n_children > 1);
+ int pivot_to_save;
+ //
+ // we have two cases, one where the childnum
+ // is the last child, and therefore the pivot we
+ // save is not of the pivot which we wish to descend
+ // and another where it is not the last child,
+ // so the pivot is sufficient for identifying the leaf
+ // to be merged
+ //
+ if (childnum == (parent->n_children - 1)) {
+ ctme.is_last_child = true;
+ pivot_to_save = childnum - 1;
+ }
+ else {
+ ctme.is_last_child = false;
+ pivot_to_save = childnum;
+ }
+ toku_clone_dbt(&ctme.target_key, parent->pivotkeys.get_pivot(pivot_to_save));
+
+ // at this point, ctme is properly setup, now we can do the merge
+ struct flusher_advice new_fa;
+ flusher_advice_init(
+ &new_fa,
+ ctm_pick_child,
+ dont_destroy_basement_nodes,
+ always_recursively_flush,
+ ctm_maybe_merge_child,
+ ctm_update_status,
+ default_pick_child_after_split,
+ &ctme);
+
+ toku_unpin_ftnode(ft, parent);
+ toku_unpin_ftnode(ft, child);
+
+ FTNODE root_node = NULL;
+ {
+ uint32_t fullhash;
+ CACHEKEY root;
+ toku_calculate_root_offset_pointer(ft, &root, &fullhash);
+ ftnode_fetch_extra bfe;
+ bfe.create_for_full_read(ft);
+ toku_pin_ftnode(ft, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, &root_node, true);
+ toku_ftnode_assert_fully_in_memory(root_node);
+ }
+
+ (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
+ (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
+
+ toku_ft_flush_some_child(ft, root_node, &new_fa);
+
+ (void) toku_sync_fetch_and_sub(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
+
+ toku_destroy_dbt(&ctme.target_key);
+ }
+}
+
+static void
+ct_update_status(FTNODE child,
+ int dirtied,
+ void* extra)
+{
+ struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra;
+ update_flush_status(child, fste->cascades);
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied;
+ // Incrementing this in case `toku_ft_flush_some_child` decides to recurse.
+ fste->cascades++;
+}
+
+static void
+ct_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra* fste, uint32_t nodesize)
+{
+ fste->cascades = 0;
+ fste->nodesize = nodesize;
+ flusher_advice_init(fa,
+ pick_heaviest_child,
+ do_destroy_basement_nodes,
+ recurse_if_child_is_gorged,
+ ct_maybe_merge_child,
+ ct_update_status,
+ default_pick_child_after_split,
+ fste);
+}
+
+//
+// This returns true if the node MAY be reactive,
+// false is we are absolutely sure that it is NOT reactive.
+// The reason for inaccuracy is that the node may be
+// a leaf node that is not entirely in memory. If so, then
+// we cannot be sure if the node is reactive.
+//
+static bool ft_ftnode_may_be_reactive(FT ft, FTNODE node)
+{
+ if (node->height == 0) {
+ return true;
+ } else {
+ return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout) != RE_STABLE;
+ }
+}
+
+/* NODE is a node with a child.
+ * childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
+ * We must slide things around, & move things from the old table to the new tables.
+ * Requires: the CHILDNUMth buffer of node is empty.
+ * We don't push anything down to children. We split the node, and things land wherever they land.
+ * We must delete the old buffer (but the old child is already deleted.)
+ * On return, the new children and node STAY PINNED.
+ */
+static void
+handle_split_of_child(
+ FT ft,
+ FTNODE node,
+ int childnum,
+ FTNODE childa,
+ FTNODE childb,
+ DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */
+ )
+{
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(0 <= childnum);
+ paranoid_invariant(childnum < node->n_children);
+ toku_ftnode_assert_fully_in_memory(node);
+ toku_ftnode_assert_fully_in_memory(childa);
+ toku_ftnode_assert_fully_in_memory(childb);
+ NONLEAF_CHILDINFO old_bnc = BNC(node, childnum);
+ paranoid_invariant(toku_bnc_nbytesinbuf(old_bnc)==0);
+ WHEN_NOT_GCOV(
+ if (toku_ft_debug_mode) {
+ printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
+ printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
+ for(int i = 0; i < node->n_children - 1; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
+ printf("\n");
+ }
+ )
+
+ node->set_dirty();
+
+ XREALLOC_N(node->n_children+1, node->bp);
+ // Slide the children over.
+ // suppose n_children is 10 and childnum is 5, meaning node->childnum[5] just got split
+ // this moves node->bp[6] through node->bp[9] over to
+ // node->bp[7] through node->bp[10]
+ for (int cnum=node->n_children; cnum>childnum+1; cnum--) {
+ node->bp[cnum] = node->bp[cnum-1];
+ }
+ memset(&node->bp[childnum+1],0,sizeof(node->bp[0]));
+ node->n_children++;
+
+ paranoid_invariant(BP_BLOCKNUM(node, childnum).b==childa->blocknum.b); // use the same child
+
+ // We never set the rightmost blocknum to be the root.
+ // Instead, we wait for the root to split and let promotion initialize the rightmost
+ // blocknum to be the first non-root leaf node on the right extreme to receive an insert.
+ BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
+ invariant(ft->h->root_blocknum.b != rightmost_blocknum.b);
+ if (childa->blocknum.b == rightmost_blocknum.b) {
+ // The rightmost leaf (a) split into (a) and (b). We want (b) to swap pair values
+ // with (a), now that it is the new rightmost leaf. This keeps the rightmost blocknum
+ // constant, the same the way we keep the root blocknum constant.
+ toku_ftnode_swap_pair_values(childa, childb);
+ BP_BLOCKNUM(node, childnum) = childa->blocknum;
+ }
+
+ BP_BLOCKNUM(node, childnum+1) = childb->blocknum;
+ BP_WORKDONE(node, childnum+1) = 0;
+ BP_STATE(node,childnum+1) = PT_AVAIL;
+
+ NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
+ for (unsigned int i = 0; i < (sizeof new_bnc->flow) / (sizeof new_bnc->flow[0]); ++i) {
+ // just split the flows in half for now, can't guess much better
+ // at the moment
+ new_bnc->flow[i] = old_bnc->flow[i] / 2;
+ old_bnc->flow[i] = (old_bnc->flow[i] + 1) / 2;
+ }
+ set_BNC(node, childnum+1, new_bnc);
+
+ // Insert the new split key , sliding the other keys over
+ node->pivotkeys.insert_at(splitk, childnum);
+
+ WHEN_NOT_GCOV(
+ if (toku_ft_debug_mode) {
+ printf("%s:%d splitkeys:", __FILE__, __LINE__);
+ for (int i = 0; i < node->n_children - 2; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
+ printf("\n");
+ }
+ )
+
+ /* Keep pushing to the children, but not if the children would require a pushdown */
+ toku_ftnode_assert_fully_in_memory(node);
+ toku_ftnode_assert_fully_in_memory(childa);
+ toku_ftnode_assert_fully_in_memory(childb);
+
+ VERIFY_NODE(t, node);
+ VERIFY_NODE(t, childa);
+ VERIFY_NODE(t, childb);
+}
+
+static void
+verify_all_in_mempool(FTNODE UU() node)
+{
+#ifdef TOKU_DEBUG_PARANOID
+ if (node->height==0) {
+ for (int i = 0; i < node->n_children; i++) {
+ invariant(BP_STATE(node,i) == PT_AVAIL);
+ BLB_DATA(node, i)->verify_mempool();
+ }
+ }
+#endif
+}
+
+static uint64_t
+ftleaf_disk_size(FTNODE node)
+// Effect: get the disk size of a leafentry
+{
+ paranoid_invariant(node->height == 0);
+ toku_ftnode_assert_fully_in_memory(node);
+ uint64_t retval = 0;
+ for (int i = 0; i < node->n_children; i++) {
+ retval += BLB_DATA(node, i)->get_disk_size();
+ }
+ return retval;
+}
+
+static void
+ftleaf_get_split_loc(
+ FTNODE node,
+ enum split_mode split_mode,
+ int *num_left_bns, // which basement within leaf
+ int *num_left_les // which key within basement
+ )
+// Effect: Find the location within a leaf node where we want to perform a split
+// num_left_bns is how many basement nodes (which OMT) should be split to the left.
+// num_left_les is how many leafentries in OMT of the last bn should be on the left side of the split.
+{
+ switch (split_mode) {
+ case SPLIT_LEFT_HEAVY: {
+ *num_left_bns = node->n_children;
+ *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
+ if (*num_left_les == 0) {
+ *num_left_bns = node->n_children - 1;
+ *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
+ }
+ goto exit;
+ }
+ case SPLIT_RIGHT_HEAVY: {
+ *num_left_bns = 1;
+ *num_left_les = BLB_DATA(node, 0)->num_klpairs() ? 1 : 0;
+ goto exit;
+ }
+ case SPLIT_EVENLY: {
+ paranoid_invariant(node->height == 0);
+ // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice
+ uint64_t sumlesizes = ftleaf_disk_size(node);
+ uint32_t size_so_far = 0;
+ for (int i = 0; i < node->n_children; i++) {
+ bn_data* bd = BLB_DATA(node, i);
+ uint32_t n_leafentries = bd->num_klpairs();
+ for (uint32_t j=0; j < n_leafentries; j++) {
+ size_t size_this_le;
+ int rr = bd->fetch_klpair_disksize(j, &size_this_le);
+ invariant_zero(rr);
+ size_so_far += size_this_le;
+ if (size_so_far >= sumlesizes/2) {
+ *num_left_bns = i + 1;
+ *num_left_les = j + 1;
+ if (*num_left_bns == node->n_children &&
+ (unsigned int) *num_left_les == n_leafentries) {
+ // need to correct for when we're splitting after the
+ // last element, that makes no sense
+ if (*num_left_les > 1) {
+ (*num_left_les)--;
+ } else if (*num_left_bns > 1) {
+ (*num_left_bns)--;
+ *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
+ } else {
+ // we are trying to split a leaf with only one
+ // leafentry in it
+ abort();
+ }
+ }
+ goto exit;
+ }
+ }
+ }
+ }
+ }
+ abort();
+exit:
+ return;
+}
+
+static void
+move_leafentries(
+ BASEMENTNODE dest_bn,
+ BASEMENTNODE src_bn,
+ uint32_t lbi, //lower bound inclusive
+ uint32_t ube //upper bound exclusive
+ )
+//Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
+{
+ invariant(ube == src_bn->data_buffer.num_klpairs());
+ src_bn->data_buffer.split_klpairs(&dest_bn->data_buffer, lbi);
+}
+
+static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) {
+// Effect: Finalizes a split by updating some bits and dirtying both nodes
+ toku_ftnode_assert_fully_in_memory(node);
+ toku_ftnode_assert_fully_in_memory(B);
+ verify_all_in_mempool(node);
+ verify_all_in_mempool(B);
+
+ node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
+ B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
+
+ // The new node in the split inherits the oldest known reference xid
+ B->oldest_referenced_xid_known = node->oldest_referenced_xid_known;
+
+ node->set_dirty();
+ B->set_dirty();
+}
+
+void
+ftleaf_split(
+ FT ft,
+ FTNODE node,
+ FTNODE *nodea,
+ FTNODE *nodeb,
+ DBT *splitk,
+ bool create_new_node,
+ enum split_mode split_mode,
+ uint32_t num_dependent_nodes,
+ FTNODE* dependent_nodes)
+// Effect: Split a leaf node.
+// Argument "node" is node to be split.
+// Upon return:
+// nodea and nodeb point to new nodes that result from split of "node"
+// nodea is the left node that results from the split
+// splitk is the right-most key of nodea
+{
+
+ paranoid_invariant(node->height == 0);
+ FL_STATUS_VAL(FT_FLUSHER_SPLIT_LEAF)++;
+ if (node->n_children) {
+ // First move all the accumulated stat64info deltas into the first basement.
+ // After the split, either both nodes or neither node will be included in the next checkpoint.
+ // The accumulated stats in the dictionary will be correct in either case.
+ // By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain
+ // correct information for a basement that is divided between two leafnodes (i.e. when split is
+ // not on a basement boundary).
+ STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node);
+ BASEMENTNODE bn = BLB(node,0);
+ bn->stat64_delta = delta_for_leafnode;
+ }
+
+
+ FTNODE B = nullptr;
+ uint32_t fullhash;
+ BLOCKNUM name;
+
+ if (create_new_node) {
+ // put value in cachetable and do checkpointing
+ // of dependent nodes
+ //
+ // We do this here, before evaluating the last_bn_on_left
+ // and last_le_on_left_within_bn because this operation
+ // may write to disk the dependent nodes.
+ // While doing so, we may rebalance the leaf node
+ // we are splitting, thereby invalidating the
+ // values of last_bn_on_left and last_le_on_left_within_bn.
+ // So, we must call this before evaluating
+ // those two values
+ cachetable_put_empty_node_with_dep_nodes(
+ ft,
+ num_dependent_nodes,
+ dependent_nodes,
+ &name,
+ &fullhash,
+ &B
+ );
+ // GCC 4.8 seems to get confused and think B is maybe uninitialized at link time.
+ // TODO(leif): figure out why it thinks this and actually fix it.
+ invariant_notnull(B);
+ }
+
+
+ paranoid_invariant(node->height==0);
+ toku_ftnode_assert_fully_in_memory(node);
+ verify_all_in_mempool(node);
+ MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
+
+ // variables that say where we will do the split.
+ // After the split, there will be num_left_bns basement nodes in the left node,
+ // and the last basement node in the left node will have num_left_les leafentries.
+ int num_left_bns;
+ int num_left_les;
+ ftleaf_get_split_loc(node, split_mode, &num_left_bns, &num_left_les);
+ {
+ // did we split right on the boundary between basement nodes?
+ const bool split_on_boundary = (num_left_les == 0) || (num_left_les == (int) BLB_DATA(node, num_left_bns - 1)->num_klpairs());
+ // Now we know where we are going to break it
+ // the two nodes will have a total of n_children+1 basement nodes
+ // and n_children-1 pivots
+ // the left node, node, will have last_bn_on_left+1 basement nodes
+ // the right node, B, will have n_children-last_bn_on_left basement nodes
+ // the pivots of node will be the first last_bn_on_left pivots that originally exist
+ // the pivots of B will be the last (n_children - 1 - last_bn_on_left) pivots that originally exist
+
+ // Note: The basements will not be rebalanced. Only the mempool of the basement that is split
+ // (if split_on_boundary is false) will be affected. All other mempools will remain intact. ???
+
+ //set up the basement nodes in the new node
+ int num_children_in_node = num_left_bns;
+ // In the SPLIT_RIGHT_HEAVY case, we need to add 1 back because
+ // while it's not on the boundary, we do need node->n_children
+ // children in B.
+ int num_children_in_b = node->n_children - num_left_bns + (!split_on_boundary ? 1 : 0);
+ if (num_children_in_b == 0) {
+ // for uneven split, make sure we have at least 1 bn
+ paranoid_invariant(split_mode == SPLIT_LEFT_HEAVY);
+ num_children_in_b = 1;
+ }
+ paranoid_invariant(num_children_in_node > 0);
+ if (create_new_node) {
+ toku_initialize_empty_ftnode(
+ B,
+ name,
+ 0,
+ num_children_in_b,
+ ft->h->layout_version,
+ ft->h->flags);
+ B->fullhash = fullhash;
+ }
+ else {
+ B = *nodeb;
+ REALLOC_N(num_children_in_b, B->bp);
+ B->n_children = num_children_in_b;
+ for (int i = 0; i < num_children_in_b; i++) {
+ BP_BLOCKNUM(B,i).b = 0;
+ BP_STATE(B,i) = PT_AVAIL;
+ BP_WORKDONE(B,i) = 0;
+ set_BLB(B, i, toku_create_empty_bn());
+ }
+ }
+
+ // now move all the data
+
+ int curr_src_bn_index = num_left_bns - 1;
+ int curr_dest_bn_index = 0;
+
+ // handle the move of a subset of data in last_bn_on_left from node to B
+ if (!split_on_boundary) {
+ BP_STATE(B,curr_dest_bn_index) = PT_AVAIL;
+ destroy_basement_node(BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
+ set_BNULL(B, curr_dest_bn_index);
+ set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer());
+ move_leafentries(BLB(B, curr_dest_bn_index),
+ BLB(node, curr_src_bn_index),
+ num_left_les, // first row to be moved to B
+ BLB_DATA(node, curr_src_bn_index)->num_klpairs() // number of rows in basement to be split
+ );
+ BLB_MAX_MSN_APPLIED(B, curr_dest_bn_index) = BLB_MAX_MSN_APPLIED(node, curr_src_bn_index);
+ curr_dest_bn_index++;
+ }
+ curr_src_bn_index++;
+
+ paranoid_invariant(B->n_children >= curr_dest_bn_index);
+ paranoid_invariant(node->n_children >= curr_src_bn_index);
+
+ // move the rest of the basement nodes
+ for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
+ destroy_basement_node(BLB(B, curr_dest_bn_index));
+ set_BNULL(B, curr_dest_bn_index);
+ B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index];
+ }
+ if (curr_dest_bn_index < B->n_children) {
+ // B already has an empty basement node here.
+ BP_STATE(B, curr_dest_bn_index) = PT_AVAIL;
+ }
+
+ //
+ // now handle the pivots
+ //
+
+ // the child index in the original node that corresponds to the
+ // first node in the right node of the split
+ int split_idx = num_left_bns - (split_on_boundary ? 0 : 1);
+ node->pivotkeys.split_at(split_idx, &B->pivotkeys);
+ if (split_on_boundary && num_left_bns < node->n_children && splitk) {
+ toku_copyref_dbt(splitk, node->pivotkeys.get_pivot(num_left_bns - 1));
+ } else if (splitk) {
+ bn_data* bd = BLB_DATA(node, num_left_bns - 1);
+ uint32_t keylen;
+ void *key;
+ int rr = bd->fetch_key_and_len(bd->num_klpairs() - 1, &keylen, &key);
+ invariant_zero(rr);
+ toku_memdup_dbt(splitk, key, keylen);
+ }
+
+ node->n_children = num_children_in_node;
+ REALLOC_N(num_children_in_node, node->bp);
+ }
+
+ ftnode_finalize_split(node, B, max_msn_applied_to_node);
+ *nodea = node;
+ *nodeb = B;
+} // end of ftleaf_split()
+
+void
+ft_nonleaf_split(
+ FT ft,
+ FTNODE node,
+ FTNODE *nodea,
+ FTNODE *nodeb,
+ DBT *splitk,
+ uint32_t num_dependent_nodes,
+ FTNODE* dependent_nodes)
+{
+ //VERIFY_NODE(t,node);
+ FL_STATUS_VAL(FT_FLUSHER_SPLIT_NONLEAF)++;
+ toku_ftnode_assert_fully_in_memory(node);
+ int old_n_children = node->n_children;
+ int n_children_in_a = old_n_children/2;
+ int n_children_in_b = old_n_children-n_children_in_a;
+ MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
+ FTNODE B;
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(node->n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
+ create_new_ftnode_with_dep_nodes(ft, &B, node->height, n_children_in_b, num_dependent_nodes, dependent_nodes);
+ {
+ /* The first n_children_in_a go into node a.
+ * That means that the first n_children_in_a-1 keys go into node a.
+ * The splitter key is key number n_children_in_a */
+ for (int i = n_children_in_a; i<old_n_children; i++) {
+ int targchild = i-n_children_in_a;
+ // TODO: Figure out better way to handle this
+ // the problem is that create_new_ftnode_with_dep_nodes for B creates
+ // all the data structures, whereas we really don't want it to fill
+ // in anything for the bp's.
+ // Now we have to go free what it just created so we can
+ // slide the bp over
+ destroy_nonleaf_childinfo(BNC(B, targchild));
+ // now move the bp over
+ B->bp[targchild] = node->bp[i];
+ memset(&node->bp[i], 0, sizeof(node->bp[0]));
+ }
+
+ // the split key for our parent is the rightmost pivot key in node
+ node->pivotkeys.split_at(n_children_in_a, &B->pivotkeys);
+ toku_clone_dbt(splitk, node->pivotkeys.get_pivot(n_children_in_a - 1));
+ node->pivotkeys.delete_at(n_children_in_a - 1);
+
+ node->n_children = n_children_in_a;
+ REALLOC_N(node->n_children, node->bp);
+ }
+
+ ftnode_finalize_split(node, B, max_msn_applied_to_node);
+ *nodea = node;
+ *nodeb = B;
+}
+
+//
+// responsibility of ft_split_child is to take locked FTNODEs node and child
+// and do the following:
+// - split child,
+// - fix node,
+// - release lock on node
+// - possibly flush either new children created from split, otherwise unlock children
+//
+static void
+ft_split_child(
+ FT ft,
+ FTNODE node,
+ int childnum,
+ FTNODE child,
+ enum split_mode split_mode,
+ struct flusher_advice *fa)
+{
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
+ FTNODE nodea, nodeb;
+ DBT splitk;
+
+ // for test
+ call_flusher_thread_callback(flt_flush_before_split);
+
+ FTNODE dep_nodes[2];
+ dep_nodes[0] = node;
+ dep_nodes[1] = child;
+ if (child->height==0) {
+ ftleaf_split(ft, child, &nodea, &nodeb, &splitk, true, split_mode, 2, dep_nodes);
+ } else {
+ ft_nonleaf_split(ft, child, &nodea, &nodeb, &splitk, 2, dep_nodes);
+ }
+ // printf("%s:%d child did split\n", __FILE__, __LINE__);
+ handle_split_of_child (ft, node, childnum, nodea, nodeb, &splitk);
+
+ // for test
+ call_flusher_thread_callback(flt_flush_during_split);
+
+ // at this point, the split is complete
+ // now we need to unlock node,
+ // and possibly continue
+ // flushing one of the children
+ int picked_child = fa->pick_child_after_split(ft, node, childnum, childnum + 1, fa->extra);
+ toku_unpin_ftnode(ft, node);
+ if (picked_child == childnum ||
+ (picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) {
+ toku_unpin_ftnode(ft, nodeb);
+ toku_ft_flush_some_child(ft, nodea, fa);
+ }
+ else if (picked_child == childnum + 1 ||
+ (picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) {
+ toku_unpin_ftnode(ft, nodea);
+ toku_ft_flush_some_child(ft, nodeb, fa);
+ }
+ else {
+ toku_unpin_ftnode(ft, nodea);
+ toku_unpin_ftnode(ft, nodeb);
+ }
+
+ toku_destroy_dbt(&splitk);
+}
+
+static void bring_node_fully_into_memory(FTNODE node, FT ft) {
+ if (!toku_ftnode_fully_in_memory(node)) {
+ ftnode_fetch_extra bfe;
+ bfe.create_for_full_read(ft);
+ toku_cachetable_pf_pinned_pair(
+ node,
+ toku_ftnode_pf_callback,
+ &bfe,
+ ft->cf,
+ node->blocknum,
+ toku_cachetable_hash(ft->cf, node->blocknum)
+ );
+ }
+}
+
+static void
+flush_this_child(
+ FT ft,
+ FTNODE node,
+ FTNODE child,
+ int childnum,
+ struct flusher_advice *fa)
+// Effect: Push everything in the CHILDNUMth buffer of node down into the child.
+{
+ update_flush_status(child, 0);
+ toku_ftnode_assert_fully_in_memory(node);
+ if (fa->should_destroy_basement_nodes(fa)) {
+ maybe_destroy_child_blbs(node, child, ft);
+ }
+ bring_node_fully_into_memory(child, ft);
+ toku_ftnode_assert_fully_in_memory(child);
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(child->blocknum.b!=0);
+ // VERIFY_NODE does not work off client thread as of now
+ //VERIFY_NODE(t, child);
+ node->set_dirty();
+ child->set_dirty();
+
+ BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
+ NONLEAF_CHILDINFO bnc = BNC(node, childnum);
+ set_BNC(node, childnum, toku_create_empty_nl());
+
+ // now we have a bnc to flush to the child. pass down the parent's
+ // oldest known referenced xid as we flush down to the child.
+ toku_bnc_flush_to_child(ft, bnc, child, node->oldest_referenced_xid_known);
+ destroy_nonleaf_childinfo(bnc);
+}
+
+static void
+merge_leaf_nodes(FTNODE a, FTNODE b)
+{
+ FL_STATUS_VAL(FT_FLUSHER_MERGE_LEAF)++;
+ toku_ftnode_assert_fully_in_memory(a);
+ toku_ftnode_assert_fully_in_memory(b);
+ paranoid_invariant(a->height == 0);
+ paranoid_invariant(b->height == 0);
+ paranoid_invariant(a->n_children > 0);
+ paranoid_invariant(b->n_children > 0);
+
+ // Mark nodes as dirty before moving basements from b to a.
+ // This way, whatever deltas are accumulated in the basements are
+ // applied to the in_memory_stats in the header if they have not already
+ // been (if nodes are clean).
+ // TODO(leif): this is no longer the way in_memory_stats is
+ // maintained. verify that it's ok to move this just before the unpin
+ // and then do that.
+ a->set_dirty();
+ b->set_dirty();
+
+ bn_data* a_last_bd = BLB_DATA(a, a->n_children-1);
+ // this bool states if the last basement node in a has any items or not
+ // If it does, then it stays in the merge. If it does not, the last basement node
+ // of a gets eliminated because we do not have a pivot to store for it (because it has no elements)
+ const bool a_has_tail = a_last_bd->num_klpairs() > 0;
+
+ int num_children = a->n_children + b->n_children;
+ if (!a_has_tail) {
+ int lastchild = a->n_children - 1;
+ BASEMENTNODE bn = BLB(a, lastchild);
+
+ // verify that last basement in a is empty, then destroy mempool
+ size_t used_space = a_last_bd->get_disk_size();
+ invariant_zero(used_space);
+ destroy_basement_node(bn);
+ set_BNULL(a, lastchild);
+ num_children--;
+ if (lastchild < a->pivotkeys.num_pivots()) {
+ a->pivotkeys.delete_at(lastchild);
+ }
+ } else {
+ // fill in pivot for what used to be max of node 'a', if it is needed
+ uint32_t keylen;
+ void *key;
+ int r = a_last_bd->fetch_key_and_len(a_last_bd->num_klpairs() - 1, &keylen, &key);
+ invariant_zero(r);
+ DBT pivotkey;
+ toku_fill_dbt(&pivotkey, key, keylen);
+ a->pivotkeys.replace_at(&pivotkey, a->n_children - 1);
+ }
+
+ // realloc basement nodes in `a'
+ REALLOC_N(num_children, a->bp);
+
+ // move each basement node from b to a
+ uint32_t offset = a_has_tail ? a->n_children : a->n_children - 1;
+ for (int i = 0; i < b->n_children; i++) {
+ a->bp[i + offset] = b->bp[i];
+ memset(&b->bp[i], 0, sizeof(b->bp[0]));
+ }
+
+ // append b's pivots to a's pivots
+ a->pivotkeys.append(b->pivotkeys);
+
+ // now that all the data has been moved from b to a, we can destroy the data in b
+ a->n_children = num_children;
+ b->pivotkeys.destroy();
+ b->n_children = 0;
+}
+
+static void balance_leaf_nodes(
+ FTNODE a,
+ FTNODE b,
+ DBT *splitk)
+// Effect:
+// If b is bigger then move stuff from b to a until b is the smaller.
+// If a is bigger then move stuff from a to b until a is the smaller.
+{
+ FL_STATUS_VAL(FT_FLUSHER_BALANCE_LEAF)++;
+ // first merge all the data into a
+ merge_leaf_nodes(a,b);
+ // now split them
+ // because we are not creating a new node, we can pass in no dependent nodes
+ ftleaf_split(NULL, a, &a, &b, splitk, false, SPLIT_EVENLY, 0, NULL);
+}
+
+static void
+maybe_merge_pinned_leaf_nodes(
+ FTNODE a,
+ FTNODE b,
+ const DBT *parent_splitk,
+ bool *did_merge,
+ bool *did_rebalance,
+ DBT *splitk,
+ uint32_t nodesize
+ )
+// Effect: Either merge a and b into one one node (merge them into a) and set *did_merge = true.
+// (We do this if the resulting node is not fissible)
+// or distribute the leafentries evenly between a and b, and set *did_rebalance = true.
+// (If a and be are already evenly distributed, we may do nothing.)
+{
+ unsigned int sizea = toku_serialize_ftnode_size(a);
+ unsigned int sizeb = toku_serialize_ftnode_size(b);
+ uint32_t num_leafentries = toku_ftnode_leaf_num_entries(a) + toku_ftnode_leaf_num_entries(b);
+ if (num_leafentries > 1 && (sizea + sizeb)*4 > (nodesize*3)) {
+ // the combined size is more than 3/4 of a node, so don't merge them.
+ *did_merge = false;
+ if (sizea*4 > nodesize && sizeb*4 > nodesize) {
+ // no need to do anything if both are more than 1/4 of a node.
+ *did_rebalance = false;
+ toku_clone_dbt(splitk, *parent_splitk);
+ return;
+ }
+ // one is less than 1/4 of a node, and together they are more than 3/4 of a node.
+ *did_rebalance = true;
+ balance_leaf_nodes(a, b, splitk);
+ } else {
+ // we are merging them.
+ *did_merge = true;
+ *did_rebalance = false;
+ toku_init_dbt(splitk);
+ merge_leaf_nodes(a, b);
+ }
+}
+
+static void
+maybe_merge_pinned_nonleaf_nodes(
+ const DBT *parent_splitk,
+ FTNODE a,
+ FTNODE b,
+ bool *did_merge,
+ bool *did_rebalance,
+ DBT *splitk)
+{
+ toku_ftnode_assert_fully_in_memory(a);
+ toku_ftnode_assert_fully_in_memory(b);
+ invariant_notnull(parent_splitk->data);
+
+ int old_n_children = a->n_children;
+ int new_n_children = old_n_children + b->n_children;
+
+ XREALLOC_N(new_n_children, a->bp);
+ memcpy(a->bp + old_n_children, b->bp, b->n_children * sizeof(b->bp[0]));
+ memset(b->bp, 0, b->n_children * sizeof(b->bp[0]));
+
+ a->pivotkeys.insert_at(parent_splitk, old_n_children - 1);
+ a->pivotkeys.append(b->pivotkeys);
+ a->n_children = new_n_children;
+ b->n_children = 0;
+
+ a->set_dirty();
+ b->set_dirty();
+
+ *did_merge = true;
+ *did_rebalance = false;
+ toku_init_dbt(splitk);
+
+ FL_STATUS_VAL(FT_FLUSHER_MERGE_NONLEAF)++;
+}
+
+static void
+maybe_merge_pinned_nodes(
+ FTNODE parent,
+ const DBT *parent_splitk,
+ FTNODE a,
+ FTNODE b,
+ bool *did_merge,
+ bool *did_rebalance,
+ DBT *splitk,
+ uint32_t nodesize
+ )
+// Effect: either merge a and b into one node (merge them into a) and set *did_merge = true.
+// (We do this if the resulting node is not fissible)
+// or distribute a and b evenly and set *did_merge = false and *did_rebalance = true
+// (If a and be are already evenly distributed, we may do nothing.)
+// If we distribute:
+// For leaf nodes, we distribute the leafentries evenly.
+// For nonleaf nodes, we distribute the children evenly. That may leave one or both of the nodes overfull, but that's OK.
+// If we distribute, we set *splitk to a malloced pivot key.
+// Parameters:
+// t The FT.
+// parent The parent of the two nodes to be split.
+// parent_splitk The pivot key between a and b. This is either free()'d or returned in *splitk.
+// a The first node to merge.
+// b The second node to merge.
+// logger The logger.
+// did_merge (OUT): Did the two nodes actually get merged?
+// splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes.
+{
+ MSN msn_max;
+ paranoid_invariant(a->height == b->height);
+ toku_ftnode_assert_fully_in_memory(parent);
+ toku_ftnode_assert_fully_in_memory(a);
+ toku_ftnode_assert_fully_in_memory(b);
+ parent->set_dirty(); // just to make sure
+ {
+ MSN msna = a->max_msn_applied_to_node_on_disk;
+ MSN msnb = b->max_msn_applied_to_node_on_disk;
+ msn_max = (msna.msn > msnb.msn) ? msna : msnb;
+ }
+ if (a->height == 0) {
+ maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, did_rebalance, splitk, nodesize);
+ } else {
+ maybe_merge_pinned_nonleaf_nodes(parent_splitk, a, b, did_merge, did_rebalance, splitk);
+ }
+ if (*did_merge || *did_rebalance) {
+ // accurate for leaf nodes because all msgs above have been
+ // applied, accurate for non-leaf nodes because buffer immediately
+ // above each node has been flushed
+ a->max_msn_applied_to_node_on_disk = msn_max;
+ b->max_msn_applied_to_node_on_disk = msn_max;
+ }
+}
+
+static void merge_remove_key_callback(BLOCKNUM *bp, bool for_checkpoint, void *extra) {
+ FT ft = (FT) extra;
+ ft->blocktable.free_blocknum(bp, ft, for_checkpoint);
+}
+
+//
+// Takes as input a locked node and a childnum_to_merge
+// As output, two of node's children are merged or rebalanced, and node is unlocked
+//
+static void
+ft_merge_child(
+ FT ft,
+ FTNODE node,
+ int childnum_to_merge,
+ bool *did_react,
+ struct flusher_advice *fa)
+{
+ // this function should not be called
+ // if the child is not mergable
+ paranoid_invariant(node->n_children > 1);
+ toku_ftnode_assert_fully_in_memory(node);
+
+ int childnuma,childnumb;
+ if (childnum_to_merge > 0) {
+ childnuma = childnum_to_merge-1;
+ childnumb = childnum_to_merge;
+ } else {
+ childnuma = childnum_to_merge;
+ childnumb = childnum_to_merge+1;
+ }
+ paranoid_invariant(0 <= childnuma);
+ paranoid_invariant(childnuma+1 == childnumb);
+ paranoid_invariant(childnumb < node->n_children);
+
+ paranoid_invariant(node->height>0);
+
+ // We suspect that at least one of the children is fusible, but they might not be.
+ // for test
+ call_flusher_thread_callback(flt_flush_before_merge);
+
+ FTNODE childa, childb;
+ {
+ uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnuma);
+ ftnode_fetch_extra bfe;
+ bfe.create_for_full_read(ft);
+ toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnuma), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &node, &childa, true);
+ }
+ // for test
+ call_flusher_thread_callback(flt_flush_before_pin_second_node_for_merge);
+ {
+ FTNODE dep_nodes[2];
+ dep_nodes[0] = node;
+ dep_nodes[1] = childa;
+ uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnumb);
+ ftnode_fetch_extra bfe;
+ bfe.create_for_full_read(ft);
+ toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnumb), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 2, dep_nodes, &childb, true);
+ }
+
+ if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
+ flush_this_child(ft, node, childa, childnuma, fa);
+ }
+ if (toku_bnc_n_entries(BNC(node,childnumb))>0) {
+ flush_this_child(ft, node, childb, childnumb, fa);
+ }
+
+ // now we have both children pinned in main memory, and cachetable locked,
+ // so no checkpoints will occur.
+
+ bool did_merge, did_rebalance;
+ {
+ DBT splitk;
+ toku_init_dbt(&splitk);
+ const DBT old_split_key = node->pivotkeys.get_pivot(childnuma);
+ maybe_merge_pinned_nodes(node, &old_split_key, childa, childb, &did_merge, &did_rebalance, &splitk, ft->h->nodesize);
+ //toku_verify_estimates(t,childa);
+ // the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
+ *did_react = (bool)(did_merge || did_rebalance);
+
+ if (did_merge) {
+ invariant_null(splitk.data);
+ NONLEAF_CHILDINFO remaining_bnc = BNC(node, childnuma);
+ NONLEAF_CHILDINFO merged_bnc = BNC(node, childnumb);
+ for (unsigned int i = 0; i < (sizeof remaining_bnc->flow) / (sizeof remaining_bnc->flow[0]); ++i) {
+ remaining_bnc->flow[i] += merged_bnc->flow[i];
+ }
+ destroy_nonleaf_childinfo(merged_bnc);
+ set_BNULL(node, childnumb);
+ node->n_children--;
+ memmove(&node->bp[childnumb],
+ &node->bp[childnumb+1],
+ (node->n_children-childnumb)*sizeof(node->bp[0]));
+ REALLOC_N(node->n_children, node->bp);
+ node->pivotkeys.delete_at(childnuma);
+
+ // Handle a merge of the rightmost leaf node.
+ BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
+ if (did_merge && childb->blocknum.b == rightmost_blocknum.b) {
+ invariant(childb->blocknum.b != ft->h->root_blocknum.b);
+ toku_ftnode_swap_pair_values(childa, childb);
+ BP_BLOCKNUM(node, childnuma) = childa->blocknum;
+ }
+
+ paranoid_invariant(BP_BLOCKNUM(node, childnuma).b == childa->blocknum.b);
+ childa->set_dirty(); // just to make sure
+ childb->set_dirty(); // just to make sure
+ } else {
+ // flow will be inaccurate for a while, oh well. the children
+ // are leaves in this case so it's not a huge deal (we're
+ // pretty far down the tree)
+
+ // If we didn't merge the nodes, then we need the correct pivot.
+ invariant_notnull(splitk.data);
+ node->pivotkeys.replace_at(&splitk, childnuma);
+ node->set_dirty();
+ }
+ toku_destroy_dbt(&splitk);
+ }
+ //
+ // now we possibly flush the children
+ //
+ if (did_merge) {
+ // for test
+ call_flusher_thread_callback(flt_flush_before_unpin_remove);
+
+ // merge_remove_key_callback will free the blocknum
+ int rrb = toku_cachetable_unpin_and_remove(
+ ft->cf,
+ childb->ct_pair,
+ merge_remove_key_callback,
+ ft
+ );
+ assert_zero(rrb);
+
+ // for test
+ call_flusher_thread_callback(ft_flush_aflter_merge);
+
+ // unlock the parent
+ paranoid_invariant(node->dirty());
+ toku_unpin_ftnode(ft, node);
+ }
+ else {
+ // for test
+ call_flusher_thread_callback(ft_flush_aflter_rebalance);
+
+ // unlock the parent
+ paranoid_invariant(node->dirty());
+ toku_unpin_ftnode(ft, node);
+ toku_unpin_ftnode(ft, childb);
+ }
+ if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) {
+ toku_ft_flush_some_child(ft, childa, fa);
+ }
+ else {
+ toku_unpin_ftnode(ft, childa);
+ }
+}
+
+void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa)
+// Effect: This function does the following:
+// - Pick a child of parent (the heaviest child),
+// - flush from parent to child,
+// - possibly split/merge child.
+// - if child is gorged, recursively proceed with child
+// Note that parent is already locked
+// Upon exit of this function, parent is unlocked and no new
+// new nodes (such as a child) remain locked
+{
+ int dirtied = 0;
+ NONLEAF_CHILDINFO bnc = NULL;
+ paranoid_invariant(parent->height>0);
+ toku_ftnode_assert_fully_in_memory(parent);
+ TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
+
+ // pick the child we want to flush to
+ int childnum = fa->pick_child(ft, parent, fa->extra);
+
+ // for test
+ call_flusher_thread_callback(flt_flush_before_child_pin);
+
+ // get the child into memory
+ BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum);
+ ft->blocktable.verify_blocknum_allocated(targetchild);
+ uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
+ FTNODE child;
+ ftnode_fetch_extra bfe;
+ // Note that we don't read the entire node into memory yet.
+ // The idea is let's try to do the minimum work before releasing the parent lock
+ bfe.create_for_min_read(ft);
+ toku_pin_ftnode_with_dep_nodes(ft, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child, true);
+
+ // for test
+ call_flusher_thread_callback(ft_flush_aflter_child_pin);
+
+ if (fa->should_destroy_basement_nodes(fa)) {
+ maybe_destroy_child_blbs(parent, child, ft);
+ }
+
+ //Note that at this point, we don't have the entire child in.
+ // Let's do a quick check to see if the child may be reactive
+ // If the child cannot be reactive, then we can safely unlock
+ // the parent before finishing reading in the entire child node.
+ bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
+
+ paranoid_invariant(child->blocknum.b!=0);
+
+ // only do the following work if there is a flush to perform
+ if (toku_bnc_n_entries(BNC(parent, childnum)) > 0 || parent->height == 1) {
+ if (!parent->dirty()) {
+ dirtied++;
+ parent->set_dirty();
+ }
+ // detach buffer
+ BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents
+ bnc = BNC(parent, childnum);
+ NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
+ memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
+ set_BNC(parent, childnum, new_bnc);
+ }
+
+ //
+ // at this point, the buffer has been detached from the parent
+ // and a new empty buffer has been placed in its stead
+ // so, if we are absolutely sure that the child is not
+ // reactive, we can unpin the parent
+ //
+ if (!may_child_be_reactive) {
+ toku_unpin_ftnode(ft, parent);
+ parent = NULL;
+ }
+
+ //
+ // now, if necessary, read/decompress the rest of child into memory,
+ // so that we can proceed and apply the flush
+ //
+ bring_node_fully_into_memory(child, ft);
+
+ // It is possible after reading in the entire child,
+ // that we now know that the child is not reactive
+ // if so, we can unpin parent right now
+ // we won't be splitting/merging child
+ // and we have already replaced the bnc
+ // for the root with a fresh one
+ enum reactivity child_re = toku_ftnode_get_reactivity(ft, child);
+ if (parent && child_re == RE_STABLE) {
+ toku_unpin_ftnode(ft, parent);
+ parent = NULL;
+ }
+
+ // from above, we know at this point that either the bnc
+ // is detached from the parent (which may be unpinned),
+ // and we have to apply the flush, or there was no data
+ // in the buffer to flush, and as a result, flushing is not necessary
+ // and bnc is NULL
+ if (bnc != NULL) {
+ if (!child->dirty()) {
+ dirtied++;
+ child->set_dirty();
+ }
+ // do the actual flush
+ toku_bnc_flush_to_child(
+ ft,
+ bnc,
+ child,
+ parent_oldest_referenced_xid_known
+ );
+ destroy_nonleaf_childinfo(bnc);
+ }
+
+ fa->update_status(child, dirtied, fa->extra);
+ // let's get the reactivity of the child again,
+ // it is possible that the flush got rid of some values
+ // and now the parent is no longer reactive
+ child_re = toku_ftnode_get_reactivity(ft, child);
+ // if the parent has been unpinned above, then
+ // this is our only option, even if the child is not stable
+ // if the child is not stable, we'll handle it the next
+ // time we need to flush to the child
+ if (!parent ||
+ child_re == RE_STABLE ||
+ (child_re == RE_FUSIBLE && parent->n_children == 1)
+ )
+ {
+ if (parent) {
+ toku_unpin_ftnode(ft, parent);
+ parent = NULL;
+ }
+ //
+ // it is the responsibility of toku_ft_flush_some_child to unpin child
+ //
+ if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
+ toku_ft_flush_some_child(ft, child, fa);
+ }
+ else {
+ toku_unpin_ftnode(ft, child);
+ }
+ }
+ else if (child_re == RE_FISSIBLE) {
+ //
+ // it is responsibility of `ft_split_child` to unlock nodes of
+ // parent and child as it sees fit
+ //
+ paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
+ ft_split_child(ft, parent, childnum, child, SPLIT_EVENLY, fa);
+ }
+ else if (child_re == RE_FUSIBLE) {
+ //
+ // it is responsibility of `maybe_merge_child to unlock nodes of
+ // parent and child as it sees fit
+ //
+ paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
+ fa->maybe_merge_child(fa, ft, parent, childnum, child, fa->extra);
+ }
+ else {
+ abort();
+ }
+}
+
+void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID parent_oldest_referenced_xid_known) {
+ paranoid_invariant(bnc);
+
+ TOKULOGGER logger = toku_cachefile_logger(ft->cf);
+ TXN_MANAGER txn_manager = logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
+ TXNID oldest_referenced_xid_for_simple_gc = TXNID_NONE;
+
+ txn_manager_state txn_state_for_gc(txn_manager);
+ bool do_garbage_collection = child->height == 0 && txn_manager != nullptr;
+ if (do_garbage_collection) {
+ txn_state_for_gc.init();
+ oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
+ }
+ txn_gc_info gc_info(&txn_state_for_gc,
+ oldest_referenced_xid_for_simple_gc,
+ child->oldest_referenced_xid_known,
+ true);
+ struct flush_msg_fn {
+ FT ft;
+ FTNODE child;
+ NONLEAF_CHILDINFO bnc;
+ txn_gc_info *gc_info;
+
+ STAT64INFO_S stats_delta;
+ int64_t logical_rows_delta = 0;
+ size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use();
+
+ flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) :
+ ft(t), child(n), bnc(nl), gc_info(g), remaining_memsize(bnc->msg_buffer.buffer_size_in_use()) {
+ stats_delta = { 0, 0 };
+ }
+ int operator()(const ft_msg &msg, bool is_fresh) {
+ size_t flow_deltas[] = { 0, 0 };
+ size_t memsize_in_buffer = message_buffer::msg_memsize_in_buffer(msg);
+ if (remaining_memsize <= bnc->flow[0]) {
+ // this message is in the current checkpoint's worth of
+ // the end of the message buffer
+ flow_deltas[0] = memsize_in_buffer;
+ } else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) {
+ // this message is in the last checkpoint's worth of the
+ // end of the message buffer
+ flow_deltas[1] = memsize_in_buffer;
+ }
+ toku_ftnode_put_msg(
+ ft->cmp,
+ ft->update_fun,
+ child,
+ -1,
+ msg,
+ is_fresh,
+ gc_info,
+ flow_deltas,
+ &stats_delta,
+ &logical_rows_delta);
+ remaining_memsize -= memsize_in_buffer;
+ return 0;
+ }
+ } flush_fn(ft, child, bnc, &gc_info);
+ bnc->msg_buffer.iterate(flush_fn);
+
+ child->oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
+
+ invariant(flush_fn.remaining_memsize == 0);
+ if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) {
+ toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta);
+ }
+ toku_ft_adjust_logical_row_count(ft, flush_fn.logical_rows_delta);
+ if (do_garbage_collection) {
+ size_t buffsize = bnc->msg_buffer.buffer_size_in_use();
+ // may be misleading if there's a broadcast message in there
+ toku_ft_status_note_msg_bytes_out(buffsize);
+ }
+}
+
+static void
+update_cleaner_status(
+ FTNODE node,
+ int childnum)
+{
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_NODES)++;
+ if (node->height == 1) {
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_H1_NODES)++;
+ } else {
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_HGT1_NODES)++;
+ }
+
+ unsigned int nbytesinbuf = toku_bnc_nbytesinbuf(BNC(node, childnum));
+ if (nbytesinbuf == 0) {
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_EMPTY_NODES)++;
+ } else {
+ if (nbytesinbuf > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE)) {
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE) = nbytesinbuf;
+ }
+ if (nbytesinbuf < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE)) {
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE) = nbytesinbuf;
+ }
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_SIZE) += nbytesinbuf;
+
+ uint64_t workdone = BP_WORKDONE(node, childnum);
+ if (workdone > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE)) {
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE) = workdone;
+ }
+ if (workdone < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE)) {
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE) = workdone;
+ }
+ FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_WORKDONE) += workdone;
+ }
+}
+
+static void
+dummy_update_status(
+ FTNODE UU(child),
+ int UU(dirtied),
+ void* UU(extra)
+ )
+{
+}
+
+static int
+dummy_pick_heaviest_child(FT UU(h),
+ FTNODE UU(parent),
+ void* UU(extra))
+{
+ abort();
+ return -1;
+}
+
+void toku_ft_split_child(
+ FT ft,
+ FTNODE node,
+ int childnum,
+ FTNODE child,
+ enum split_mode split_mode
+ )
+{
+ struct flusher_advice fa;
+ flusher_advice_init(
+ &fa,
+ dummy_pick_heaviest_child,
+ dont_destroy_basement_nodes,
+ never_recursively_flush,
+ default_merge_child,
+ dummy_update_status,
+ default_pick_child_after_split,
+ NULL
+ );
+ ft_split_child(
+ ft,
+ node,
+ childnum, // childnum to split
+ child,
+ split_mode,
+ &fa
+ );
+}
+
+void toku_ft_merge_child(
+ FT ft,
+ FTNODE node,
+ int childnum
+ )
+{
+ struct flusher_advice fa;
+ flusher_advice_init(
+ &fa,
+ dummy_pick_heaviest_child,
+ dont_destroy_basement_nodes,
+ never_recursively_flush,
+ default_merge_child,
+ dummy_update_status,
+ default_pick_child_after_split,
+ NULL
+ );
+ bool did_react;
+ ft_merge_child(
+ ft,
+ node,
+ childnum, // childnum to merge
+ &did_react,
+ &fa
+ );
+}
+
+int
+toku_ftnode_cleaner_callback(
+ void *ftnode_pv,
+ BLOCKNUM blocknum,
+ uint32_t fullhash,
+ void *extraargs)
+{
+ FTNODE node = (FTNODE) ftnode_pv;
+ invariant(node->blocknum.b == blocknum.b);
+ invariant(node->fullhash == fullhash);
+ invariant(node->height > 0); // we should never pick a leaf node (for now at least)
+ FT ft = (FT) extraargs;
+ bring_node_fully_into_memory(node, ft);
+ int childnum = find_heaviest_child(node);
+ update_cleaner_status(node, childnum);
+
+ // Either toku_ft_flush_some_child will unlock the node, or we do it here.
+ if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
+ struct flusher_advice fa;
+ struct flush_status_update_extra fste;
+ ct_flusher_advice_init(&fa, &fste, ft->h->nodesize);
+ toku_ft_flush_some_child(ft, node, &fa);
+ } else {
+ toku_unpin_ftnode(ft, node);
+ }
+ return 0;
+}
+
+struct flusher_extra {
+ FT ft;
+ FTNODE node;
+ NONLEAF_CHILDINFO bnc;
+ TXNID parent_oldest_referenced_xid_known;
+};
+
+//
+// This is the function that gets called by a
+// background thread. Its purpose is to complete
+// a flush, and possibly do a split/merge.
+//
+static void flush_node_fun(void *fe_v)
+{
+ toku::context flush_ctx(CTX_FLUSH);
+ struct flusher_extra* fe = (struct flusher_extra *) fe_v;
+ // The node that has been placed on the background
+ // thread may not be fully in memory. Some message
+ // buffers may be compressed. Before performing
+ // any operations, we must first make sure
+ // the node is fully in memory
+ //
+ // If we have a bnc, that means fe->node is a child, and we've already
+ // destroyed its basement nodes if necessary, so we now need to either
+ // read them back in, or just do the regular partial fetch. If we
+ // don't, that means fe->node is a parent, so we need to do this anyway.
+ bring_node_fully_into_memory(fe->node,fe->ft);
+ fe->node->set_dirty();
+
+ struct flusher_advice fa;
+ struct flush_status_update_extra fste;
+ flt_flusher_advice_init(&fa, &fste, fe->ft->h->nodesize);
+
+ if (fe->bnc) {
+ // In this case, we have a bnc to flush to a node
+
+ // for test purposes
+ call_flusher_thread_callback(flt_flush_before_applying_inbox);
+
+ toku_bnc_flush_to_child(
+ fe->ft,
+ fe->bnc,
+ fe->node,
+ fe->parent_oldest_referenced_xid_known
+ );
+ destroy_nonleaf_childinfo(fe->bnc);
+
+ // after the flush has completed, now check to see if the node needs flushing
+ // If so, call toku_ft_flush_some_child on the node (because this flush intends to
+ // pass a meaningful oldest referenced xid for simple garbage collection), and it is the
+ // responsibility of the flush to unlock the node. otherwise, we unlock it here.
+ if (fe->node->height > 0 && toku_ftnode_nonleaf_is_gorged(fe->node, fe->ft->h->nodesize)) {
+ toku_ft_flush_some_child(fe->ft, fe->node, &fa);
+ }
+ else {
+ toku_unpin_ftnode(fe->ft,fe->node);
+ }
+ }
+ else {
+ // In this case, we were just passed a node with no
+ // bnc, which means we are tasked with flushing some
+ // buffer in the node.
+ // It is the responsibility of flush some child to unlock the node
+ toku_ft_flush_some_child(fe->ft, fe->node, &fa);
+ }
+ remove_background_job_from_cf(fe->ft->cf);
+ toku_free(fe);
+}
+
+static void
+place_node_and_bnc_on_background_thread(
+ FT ft,
+ FTNODE node,
+ NONLEAF_CHILDINFO bnc,
+ TXNID parent_oldest_referenced_xid_known)
+{
+ struct flusher_extra *XMALLOC(fe);
+ fe->ft = ft;
+ fe->node = node;
+ fe->bnc = bnc;
+ fe->parent_oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
+ cachefile_kibbutz_enq(ft->cf, flush_node_fun, fe);
+}
+
+//
+// This takes as input a gorged, locked, non-leaf node named parent
+// and sets up a flush to be done in the background.
+// The flush is setup like this:
+// - We call maybe_get_and_pin_clean on the child we want to flush to in order to try to lock the child
+// - if we successfully pin the child, and the child does not need to be split or merged
+// then we detach the buffer, place the child and buffer onto a background thread, and
+// have the flush complete in the background, and unlock the parent. The child will be
+// unlocked on the background thread
+// - if any of the above does not happen (child cannot be locked,
+// child needs to be split/merged), then we place the parent on the background thread.
+// The parent will be unlocked on the background thread
+//
+void toku_ft_flush_node_on_background_thread(FT ft, FTNODE parent)
+{
+ toku::context flush_ctx(CTX_FLUSH);
+ TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
+ //
+ // first let's see if we can detach buffer on client thread
+ // and pick the child we want to flush to
+ //
+ int childnum = find_heaviest_child(parent);
+ paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
+ //
+ // see if we can pin the child
+ //
+ FTNODE child;
+ uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
+ int r = toku_maybe_pin_ftnode_clean(ft, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
+ if (r != 0) {
+ // In this case, we could not lock the child, so just place the parent on the background thread
+ // In the callback, we will use toku_ft_flush_some_child, which checks to
+ // see if we should blow away the old basement nodes.
+ place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
+ }
+ else {
+ //
+ // successfully locked child
+ //
+ bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
+ if (!may_child_be_reactive) {
+ // We're going to unpin the parent, so before we do, we must
+ // check to see if we need to blow away the basement nodes to
+ // keep the MSN invariants intact.
+ maybe_destroy_child_blbs(parent, child, ft);
+
+ //
+ // can detach buffer and unpin root here
+ //
+ parent->set_dirty();
+ BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents
+ NONLEAF_CHILDINFO bnc = BNC(parent, childnum);
+ NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
+ memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
+ set_BNC(parent, childnum, new_bnc);
+
+ //
+ // at this point, the buffer has been detached from the parent
+ // and a new empty buffer has been placed in its stead
+ // so, because we know for sure the child is not
+ // reactive, we can unpin the parent
+ //
+ place_node_and_bnc_on_background_thread(ft, child, bnc, parent_oldest_referenced_xid_known);
+ toku_unpin_ftnode(ft, parent);
+ }
+ else {
+ // because the child may be reactive, we need to
+ // put parent on background thread.
+ // As a result, we unlock the child here.
+ toku_unpin_ftnode(ft, child);
+ // Again, we'll have the parent on the background thread, so
+ // we don't need to destroy the basement nodes yet.
+ place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
+ }
+ }
+}
+
+#include <toku_race_tools.h>
+void __attribute__((__constructor__)) toku_ft_flusher_helgrind_ignore(void);
+void
+toku_ft_flusher_helgrind_ignore(void) {
+ TOKU_VALGRIND_HG_DISABLE_CHECKING(&fl_status, sizeof fl_status);
+}