// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- // vim: ts=8 sw=2 smarttab #include "node.h" #include #include #include #include "common/likely.h" #include "crimson/common/log.h" #include "node_extent_manager.h" #include "node_impl.h" #include "stages/node_stage_layout.h" namespace { seastar::logger& logger() { return crimson::get_logger(ceph_subsys_filestore); } } namespace crimson::os::seastore::onode { using node_ertr = Node::node_ertr; template using node_future = Node::node_future; /* * tree_cursor_t */ tree_cursor_t::tree_cursor_t(Ref node, const search_position_t& pos) : leaf_node{node}, position{pos} { assert(!is_end()); leaf_node->do_track_cursor(*this); } tree_cursor_t::tree_cursor_t( Ref node, const search_position_t& pos, const key_view_t& key, const onode_t* _p_value, layout_version_t v) : leaf_node{node}, position{pos} { assert(!is_end()); update_kv(key, _p_value, v); leaf_node->do_track_cursor(*this); } tree_cursor_t::tree_cursor_t(Ref node) : leaf_node{node}, position{search_position_t::end()} { assert(is_end()); assert(leaf_node->is_level_tail()); } tree_cursor_t::~tree_cursor_t() { if (!is_end()) { leaf_node->do_untrack_cursor(*this); } } const key_view_t& tree_cursor_t::get_key_view() const { ensure_kv(); return *key_view; } const onode_t* tree_cursor_t::get_p_value() const { ensure_kv(); return p_value; } template void tree_cursor_t::update_track( Ref node, const search_position_t& pos) { // the cursor must be already untracked // track the new node and new pos assert(!pos.is_end()); assert(!is_end()); leaf_node = node; position = pos; key_view.reset(); p_value = nullptr; leaf_node->do_track_cursor(*this); } template void tree_cursor_t::update_track(Ref, const search_position_t&); template void tree_cursor_t::update_track(Ref, const search_position_t&); void tree_cursor_t::update_kv( const key_view_t& key, const onode_t* _p_value, layout_version_t v) const { assert(!is_end()); assert(_p_value); assert(std::make_tuple(key, _p_value, v) == leaf_node->get_kv(position)); key_view = key; p_value = _p_value; node_version = v; } void tree_cursor_t::ensure_kv() const { assert(!is_end()); if (!p_value || node_version != leaf_node->get_layout_version()) { // NOTE: the leaf node is always present when we hold its reference. std::tie(key_view, p_value, node_version) = leaf_node->get_kv(position); } assert(p_value); } /* * Node */ Node::Node(NodeImplURef&& impl) : impl{std::move(impl)} {} Node::~Node() { // XXX: tolerate failure between allocate() and as_child() if (is_root()) { super->do_untrack_root(*this); } else { _parent_info->ptr->do_untrack_child(*this); } } level_t Node::level() const { return impl->level(); } node_future Node::lower_bound( context_t c, const key_hobj_t& key) { return seastar::do_with( MatchHistory(), [this, c, &key](auto& history) { return lower_bound_tracked(c, key, history); } ); } node_future, bool>> Node::insert( context_t c, const key_hobj_t& key, const onode_t& value) { return seastar::do_with( MatchHistory(), [this, c, &key, &value](auto& history) { return lower_bound_tracked(c, key, history ).safe_then([c, &key, &value, &history](auto result) { if (result.match() == MatchKindBS::EQ) { return node_ertr::make_ready_future, bool>>( std::make_pair(result.p_cursor, false)); } else { auto leaf_node = result.p_cursor->get_leaf_node(); return leaf_node->insert_value( c, key, value, result.p_cursor->get_position(), history, result.mstat ).safe_then([](auto p_cursor) { return node_ertr::make_ready_future, bool>>( std::make_pair(p_cursor, true)); }); } }); } ); } node_future Node::get_tree_stats(context_t c) { return seastar::do_with( tree_stats_t(), [this, c](auto& stats) { return do_get_tree_stats(c, stats).safe_then([&stats] { return stats; }); } ); } std::ostream& Node::dump(std::ostream& os) const { return impl->dump(os); } std::ostream& Node::dump_brief(std::ostream& os) const { return impl->dump_brief(os); } void Node::test_make_destructable( context_t c, NodeExtentMutable& mut, Super::URef&& _super) { impl->test_set_tail(mut); make_root(c, std::move(_super)); } node_future<> Node::mkfs(context_t c, RootNodeTracker& root_tracker) { return LeafNode::allocate_root(c, root_tracker ).safe_then([](auto ret) { /* FIXME: discard_result(); */ }); } node_future> Node::load_root(context_t c, RootNodeTracker& root_tracker) { return c.nm.get_super(c.t, root_tracker ).safe_then([c, &root_tracker](auto&& _super) { auto root_addr = _super->get_root_laddr(); assert(root_addr != L_ADDR_NULL); return Node::load(c, root_addr, true ).safe_then([c, _super = std::move(_super), &root_tracker](auto root) mutable { assert(root->impl->field_type() == field_type_t::N0); root->as_root(std::move(_super)); std::ignore = c; // as only used in an assert std::ignore = root_tracker; assert(root == root_tracker.get_root(c.t)); return node_ertr::make_ready_future>(root); }); }); } void Node::make_root(context_t c, Super::URef&& _super) { _super->write_root_laddr(c, impl->laddr()); as_root(std::move(_super)); } void Node::as_root(Super::URef&& _super) { assert(!super && !_parent_info); assert(_super->get_root_laddr() == impl->laddr()); assert(impl->is_level_tail()); super = std::move(_super); super->do_track_root(*this); } node_future<> Node::upgrade_root(context_t c) { assert(is_root()); assert(impl->is_level_tail()); assert(impl->field_type() == field_type_t::N0); super->do_untrack_root(*this); return InternalNode::allocate_root(c, impl->level(), impl->laddr(), std::move(super) ).safe_then([this](auto new_root) { as_child(search_position_t::end(), new_root); }); } template void Node::as_child(const search_position_t& pos, Ref parent_node) { assert(!super); _parent_info = parent_info_t{pos, parent_node}; parent_info().ptr->do_track_child(*this); } template void Node::as_child(const search_position_t&, Ref); template void Node::as_child(const search_position_t&, Ref); node_future<> Node::insert_parent(context_t c, Ref right_node) { assert(!is_root()); // TODO(cross-node string dedup) return parent_info().ptr->apply_child_split( c, parent_info().position, this, right_node); } node_future> Node::load( context_t c, laddr_t addr, bool expect_is_level_tail) { // NOTE: // *option1: all types of node have the same length; // option2: length is defined by node/field types; // option3: length is totally flexible; return c.nm.read_extent(c.t, addr, NODE_BLOCK_SIZE ).safe_then([expect_is_level_tail](auto extent) { auto [node_type, field_type] = extent->get_types(); if (node_type == node_type_t::LEAF) { auto impl = LeafNodeImpl::load(extent, field_type, expect_is_level_tail); return Ref(new LeafNode(impl.get(), std::move(impl))); } else if (node_type == node_type_t::INTERNAL) { auto impl = InternalNodeImpl::load(extent, field_type, expect_is_level_tail); return Ref(new InternalNode(impl.get(), std::move(impl))); } else { ceph_abort("impossible path"); } }); } /* * InternalNode */ InternalNode::InternalNode(InternalNodeImpl* impl, NodeImplURef&& impl_ref) : Node(std::move(impl_ref)), impl{impl} {} node_future<> InternalNode::apply_child_split( context_t c, const search_position_t& pos, Ref left_child, Ref right_child) { #ifndef NDEBUG if (pos.is_end()) { assert(impl->is_level_tail()); } #endif impl->prepare_mutate(c); auto left_key = left_child->impl->get_largest_key_view(); auto left_child_addr = left_child->impl->laddr(); auto left_child_addr_packed = laddr_packed_t{left_child_addr}; auto right_key = right_child->impl->get_largest_key_view(); auto right_child_addr = right_child->impl->laddr(); logger().debug("OTree::Internal::Insert: " "pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...", pos, left_key, left_child_addr, right_key, right_child_addr); // update pos => left_child to pos => right_child impl->replace_child_addr(pos, right_child_addr, left_child_addr); replace_track(pos, right_child, left_child); search_position_t insert_pos = pos; auto [insert_stage, insert_size] = impl->evaluate_insert( left_key, left_child_addr, insert_pos); auto free_size = impl->free_size(); if (free_size >= insert_size) { // insert [[maybe_unused]] auto p_value = impl->insert( left_key, left_child_addr_packed, insert_pos, insert_stage, insert_size); assert(impl->free_size() == free_size - insert_size); assert(insert_pos <= pos); assert(p_value->value == left_child_addr); track_insert(insert_pos, insert_stage, left_child, right_child); validate_tracked_children(); return node_ertr::now(); } // split and insert Ref this_ref = this; return (is_root() ? upgrade_root(c) : node_ertr::now() ).safe_then([this, c] { return InternalNode::allocate( c, impl->field_type(), impl->is_level_tail(), impl->level()); }).safe_then([this_ref, this, c, left_key, left_child, right_child, insert_pos, insert_stage=insert_stage, insert_size=insert_size](auto fresh_right) mutable { auto right_node = fresh_right.node; auto left_child_addr = left_child->impl->laddr(); auto left_child_addr_packed = laddr_packed_t{left_child_addr}; auto [split_pos, is_insert_left, p_value] = impl->split_insert( fresh_right.mut, *right_node->impl, left_key, left_child_addr_packed, insert_pos, insert_stage, insert_size); assert(p_value->value == left_child_addr); track_split(split_pos, right_node); if (is_insert_left) { track_insert(insert_pos, insert_stage, left_child); } else { right_node->track_insert(insert_pos, insert_stage, left_child); } validate_tracked_children(); right_node->validate_tracked_children(); // propagate index to parent return insert_parent(c, right_node); // TODO (optimize) // try to acquire space from siblings before split... see btrfs }); } node_future> InternalNode::allocate_root( context_t c, level_t old_root_level, laddr_t old_root_addr, Super::URef&& super) { return InternalNode::allocate(c, field_type_t::N0, true, old_root_level + 1 ).safe_then([c, old_root_addr, super = std::move(super)](auto fresh_node) mutable { auto root = fresh_node.node; auto p_value = root->impl->get_p_value(search_position_t::end()); fresh_node.mut.copy_in_absolute( const_cast(p_value), old_root_addr); root->make_root_from(c, std::move(super), old_root_addr); return root; }); } node_future> InternalNode::lookup_smallest(context_t c) { auto position = search_position_t::begin(); laddr_t child_addr = impl->get_p_value(position)->value; return get_or_track_child(c, position, child_addr ).safe_then([c](auto child) { return child->lookup_smallest(c); }); } node_future> InternalNode::lookup_largest(context_t c) { // NOTE: unlike LeafNode::lookup_largest(), this only works for the tail // internal node to return the tail child address. auto position = search_position_t::end(); laddr_t child_addr = impl->get_p_value(position)->value; return get_or_track_child(c, position, child_addr).safe_then([c](auto child) { return child->lookup_largest(c); }); } node_future InternalNode::lower_bound_tracked( context_t c, const key_hobj_t& key, MatchHistory& history) { auto result = impl->lower_bound(key, history); return get_or_track_child(c, result.position, result.p_value->value ).safe_then([c, &key, &history](auto child) { // XXX(multi-type): pass result.mstat to child return child->lower_bound_tracked(c, key, history); }); } node_future<> InternalNode::do_get_tree_stats( context_t c, tree_stats_t& stats) { auto nstats = impl->get_stats(); stats.size_persistent_internal += nstats.size_persistent; stats.size_filled_internal += nstats.size_filled; stats.size_logical_internal += nstats.size_logical; stats.size_overhead_internal += nstats.size_overhead; stats.size_value_internal += nstats.size_value; stats.num_kvs_internal += nstats.num_kvs; stats.num_nodes_internal += 1; Ref this_ref = this; return seastar::do_with( search_position_t(), [this, this_ref, c, &stats](auto& pos) { pos = search_position_t::begin(); return crimson::do_until( [this, this_ref, c, &stats, &pos]() -> node_future { auto child_addr = impl->get_p_value(pos)->value; return get_or_track_child(c, pos, child_addr ).safe_then([c, &stats](auto child) { return child->do_get_tree_stats(c, stats); }).safe_then([this, this_ref, &pos] { if (pos.is_end()) { return node_ertr::make_ready_future(true); } else { impl->next_position(pos); if (pos.is_end()) { if (impl->is_level_tail()) { return node_ertr::make_ready_future(false); } else { return node_ertr::make_ready_future(true); } } else { return node_ertr::make_ready_future(false); } } }); }); } ); } node_future<> InternalNode::test_clone_root( context_t c_other, RootNodeTracker& tracker_other) const { assert(is_root()); assert(impl->is_level_tail()); assert(impl->field_type() == field_type_t::N0); Ref this_ref = this; return InternalNode::allocate(c_other, field_type_t::N0, true, impl->level() ).safe_then([this, c_other, &tracker_other](auto fresh_other) { impl->test_copy_to(fresh_other.mut); auto cloned_root = fresh_other.node; return c_other.nm.get_super(c_other.t, tracker_other ).safe_then([c_other, cloned_root](auto&& super_other) { cloned_root->make_root_new(c_other, std::move(super_other)); return cloned_root; }); }).safe_then([this_ref, this, c_other](auto cloned_root) { // clone tracked children // In some unit tests, the children are stubbed out that they // don't exist in NodeExtentManager, and are only tracked in memory. return crimson::do_for_each( tracked_child_nodes.begin(), tracked_child_nodes.end(), [this_ref, c_other, cloned_root](auto& kv) { assert(kv.first == kv.second->parent_info().position); return kv.second->test_clone_non_root(c_other, cloned_root); } ); }); } node_future> InternalNode::get_or_track_child( context_t c, const search_position_t& position, laddr_t child_addr) { bool level_tail = position.is_end(); Ref child; auto found = tracked_child_nodes.find(position); Ref this_ref = this; return (found == tracked_child_nodes.end() ? (logger().trace("OTree::Internal: load child untracked at {:#x}, pos({}), level={}", child_addr, position, level() - 1), Node::load(c, child_addr, level_tail ).safe_then([this, position] (auto child) { child->as_child(position, this); return child; })) : (logger().trace("OTree::Internal: load child tracked at {:#x}, pos({}), level={}", child_addr, position, level() - 1), node_ertr::make_ready_future>(found->second)) ).safe_then([this_ref, this, position, child_addr] (auto child) { assert(child_addr == child->impl->laddr()); assert(position == child->parent_info().position); std::ignore = position; std::ignore = child_addr; validate_child(*child); return child; }); } void InternalNode::track_insert( const search_position_t& insert_pos, match_stage_t insert_stage, Ref insert_child, Ref nxt_child) { // update tracks auto pos_upper_bound = insert_pos; pos_upper_bound.index_by_stage(insert_stage) = INDEX_UPPER_BOUND; auto first = tracked_child_nodes.lower_bound(insert_pos); auto last = tracked_child_nodes.lower_bound(pos_upper_bound); std::vector nodes; std::for_each(first, last, [&nodes](auto& kv) { nodes.push_back(kv.second); }); tracked_child_nodes.erase(first, last); for (auto& node : nodes) { auto _pos = node->parent_info().position; assert(!_pos.is_end()); ++_pos.index_by_stage(insert_stage); node->as_child(_pos, this); } // track insert insert_child->as_child(insert_pos, this); #ifndef NDEBUG // validate left_child is before right_child if (nxt_child) { auto iter = tracked_child_nodes.find(insert_pos); ++iter; assert(iter->second == nxt_child); } #endif } void InternalNode::replace_track( const search_position_t& position, Ref new_child, Ref old_child) { assert(tracked_child_nodes[position] == old_child); tracked_child_nodes.erase(position); new_child->as_child(position, this); assert(tracked_child_nodes[position] == new_child); } void InternalNode::track_split( const search_position_t& split_pos, Ref right_node) { auto first = tracked_child_nodes.lower_bound(split_pos); auto iter = first; while (iter != tracked_child_nodes.end()) { search_position_t new_pos = iter->first; new_pos -= split_pos; iter->second->as_child(new_pos, right_node); ++iter; } tracked_child_nodes.erase(first, tracked_child_nodes.end()); } void InternalNode::validate_child(const Node& child) const { #ifndef NDEBUG assert(impl->level() - 1 == child.impl->level()); assert(this == child.parent_info().ptr); auto& child_pos = child.parent_info().position; assert(impl->get_p_value(child_pos)->value == child.impl->laddr()); if (child_pos.is_end()) { assert(impl->is_level_tail()); assert(child.impl->is_level_tail()); } else { assert(!child.impl->is_level_tail()); assert(impl->get_key_view(child_pos) == child.impl->get_largest_key_view()); } // XXX(multi-type) assert(impl->field_type() <= child.impl->field_type()); #endif } node_future InternalNode::allocate( context_t c, field_type_t field_type, bool is_level_tail, level_t level) { return InternalNodeImpl::allocate(c, field_type, is_level_tail, level ).safe_then([](auto&& fresh_impl) { auto node = Ref(new InternalNode( fresh_impl.impl.get(), std::move(fresh_impl.impl))); return fresh_node_t{node, fresh_impl.mut}; }); } /* * LeafNode */ LeafNode::LeafNode(LeafNodeImpl* impl, NodeImplURef&& impl_ref) : Node(std::move(impl_ref)), impl{impl} {} bool LeafNode::is_level_tail() const { return impl->is_level_tail(); } std::tuple LeafNode::get_kv( const search_position_t& pos) const { key_view_t key_view; auto p_value = impl->get_p_value(pos, &key_view); return {key_view, p_value, layout_version}; } node_future> LeafNode::lookup_smallest(context_t) { if (unlikely(impl->is_empty())) { assert(is_root()); return node_ertr::make_ready_future>( new tree_cursor_t(this)); } auto pos = search_position_t::begin(); key_view_t index_key; auto p_value = impl->get_p_value(pos, &index_key); return node_ertr::make_ready_future>( get_or_track_cursor(pos, index_key, p_value)); } node_future> LeafNode::lookup_largest(context_t) { if (unlikely(impl->is_empty())) { assert(is_root()); return node_ertr::make_ready_future>( new tree_cursor_t(this)); } search_position_t pos; const onode_t* p_value = nullptr; key_view_t index_key; impl->get_largest_slot(pos, index_key, &p_value); return node_ertr::make_ready_future>( get_or_track_cursor(pos, index_key, p_value)); } node_future LeafNode::lower_bound_tracked( context_t c, const key_hobj_t& key, MatchHistory& history) { key_view_t index_key; auto result = impl->lower_bound(key, history, &index_key); Ref cursor; if (result.position.is_end()) { assert(!result.p_value); cursor = new tree_cursor_t(this); } else { cursor = get_or_track_cursor(result.position, index_key, result.p_value); } return node_ertr::make_ready_future( search_result_t{cursor, result.mstat}); } node_future<> LeafNode::do_get_tree_stats(context_t, tree_stats_t& stats) { auto nstats = impl->get_stats(); stats.size_persistent_leaf += nstats.size_persistent; stats.size_filled_leaf += nstats.size_filled; stats.size_logical_leaf += nstats.size_logical; stats.size_overhead_leaf += nstats.size_overhead; stats.size_value_leaf += nstats.size_value; stats.num_kvs_leaf += nstats.num_kvs; stats.num_nodes_leaf += 1; return node_ertr::now(); } node_future<> LeafNode::test_clone_root( context_t c_other, RootNodeTracker& tracker_other) const { assert(is_root()); assert(impl->is_level_tail()); assert(impl->field_type() == field_type_t::N0); Ref this_ref = this; return LeafNode::allocate(c_other, field_type_t::N0, true ).safe_then([this, c_other, &tracker_other](auto fresh_other) { impl->test_copy_to(fresh_other.mut); auto cloned_root = fresh_other.node; return c_other.nm.get_super(c_other.t, tracker_other ).safe_then([c_other, cloned_root](auto&& super_other) { cloned_root->make_root_new(c_other, std::move(super_other)); }); }).safe_then([this_ref]{}); } node_future> LeafNode::insert_value( context_t c, const key_hobj_t& key, const onode_t& value, const search_position_t& pos, const MatchHistory& history, match_stat_t mstat) { #ifndef NDEBUG if (pos.is_end()) { assert(impl->is_level_tail()); } #endif logger().debug("OTree::Leaf::Insert: " "pos({}), {}, {}, {}, mstat({}) ...", pos, key, value, history, mstat); search_position_t insert_pos = pos; auto [insert_stage, insert_size] = impl->evaluate_insert( key, value, history, mstat, insert_pos); auto free_size = impl->free_size(); if (free_size >= insert_size) { // insert on_layout_change(); impl->prepare_mutate(c); auto p_value = impl->insert(key, value, insert_pos, insert_stage, insert_size); assert(impl->free_size() == free_size - insert_size); assert(insert_pos <= pos); assert(p_value->size == value.size); auto ret = track_insert(insert_pos, insert_stage, p_value); validate_tracked_cursors(); return node_ertr::make_ready_future>(ret); } // split and insert Ref this_ref = this; return (is_root() ? upgrade_root(c) : node_ertr::now() ).safe_then([this, c] { return LeafNode::allocate(c, impl->field_type(), impl->is_level_tail()); }).safe_then([this_ref, this, c, &key, &value, insert_pos, insert_stage=insert_stage, insert_size=insert_size](auto fresh_right) mutable { auto right_node = fresh_right.node; // no need to bump version for right node, as it is fresh on_layout_change(); impl->prepare_mutate(c); auto [split_pos, is_insert_left, p_value] = impl->split_insert( fresh_right.mut, *right_node->impl, key, value, insert_pos, insert_stage, insert_size); assert(p_value->size == value.size); track_split(split_pos, right_node); Ref ret; if (is_insert_left) { ret = track_insert(insert_pos, insert_stage, p_value); } else { ret = right_node->track_insert(insert_pos, insert_stage, p_value); } validate_tracked_cursors(); right_node->validate_tracked_cursors(); // propagate insert to parent return insert_parent(c, right_node).safe_then([ret] { return ret; }); // TODO (optimize) // try to acquire space from siblings before split... see btrfs }); } node_future> LeafNode::allocate_root( context_t c, RootNodeTracker& root_tracker) { return LeafNode::allocate(c, field_type_t::N0, true ).safe_then([c, &root_tracker](auto fresh_node) { auto root = fresh_node.node; return c.nm.get_super(c.t, root_tracker ).safe_then([c, root](auto&& super) { root->make_root_new(c, std::move(super)); return root; }); }); } Ref LeafNode::get_or_track_cursor( const search_position_t& position, const key_view_t& key, const onode_t* p_value) { assert(!position.is_end()); assert(p_value); Ref p_cursor; auto found = tracked_cursors.find(position); if (found == tracked_cursors.end()) { p_cursor = new tree_cursor_t(this, position, key, p_value, layout_version); } else { p_cursor = found->second; assert(p_cursor->get_leaf_node() == this); assert(p_cursor->get_position() == position); p_cursor->update_kv(key, p_value, layout_version); } return p_cursor; } void LeafNode::validate_cursor(tree_cursor_t& cursor) const { #ifndef NDEBUG assert(this == cursor.get_leaf_node().get()); assert(!cursor.is_end()); auto [key, val, ver] = get_kv(cursor.get_position()); assert(key == cursor.get_key_view()); assert(val == cursor.get_p_value()); #endif } Ref LeafNode::track_insert( const search_position_t& insert_pos, match_stage_t insert_stage, const onode_t* p_onode) { // update cursor position auto pos_upper_bound = insert_pos; pos_upper_bound.index_by_stage(insert_stage) = INDEX_UPPER_BOUND; auto first = tracked_cursors.lower_bound(insert_pos); auto last = tracked_cursors.lower_bound(pos_upper_bound); std::vector p_cursors; std::for_each(first, last, [&p_cursors](auto& kv) { p_cursors.push_back(kv.second); }); tracked_cursors.erase(first, last); for (auto& p_cursor : p_cursors) { search_position_t new_pos = p_cursor->get_position(); ++new_pos.index_by_stage(insert_stage); p_cursor->update_track(this, new_pos); } // track insert // TODO: getting key_view_t from stage::proceed_insert() and // stage::append_insert() has not supported yet return new tree_cursor_t(this, insert_pos); } void LeafNode::track_split( const search_position_t& split_pos, Ref right_node) { // update cursor ownership and position auto first = tracked_cursors.lower_bound(split_pos); auto iter = first; while (iter != tracked_cursors.end()) { search_position_t new_pos = iter->first; new_pos -= split_pos; iter->second->update_track(right_node, new_pos); ++iter; } tracked_cursors.erase(first, tracked_cursors.end()); } node_future LeafNode::allocate( context_t c, field_type_t field_type, bool is_level_tail) { return LeafNodeImpl::allocate(c, field_type, is_level_tail ).safe_then([](auto&& fresh_impl) { auto node = Ref(new LeafNode( fresh_impl.impl.get(), std::move(fresh_impl.impl))); return fresh_node_t{node, fresh_impl.mut}; }); } }