summaryrefslogtreecommitdiffstats
path: root/src/crimson/os/seastore/journal.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/os/seastore/journal.cc')
-rw-r--r--src/crimson/os/seastore/journal.cc756
1 files changed, 756 insertions, 0 deletions
diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc
new file mode 100644
index 000000000..39875fb56
--- /dev/null
+++ b/src/crimson/os/seastore/journal.cc
@@ -0,0 +1,756 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <iostream>
+
+#include <boost/iterator/counting_iterator.hpp>
+
+#include "crimson/os/seastore/journal.h"
+
+#include "include/intarith.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_filestore);
+ }
+}
+
+namespace crimson::os::seastore {
+
+std::ostream &operator<<(std::ostream &out, const segment_header_t &header)
+{
+ return out << "segment_header_t("
+ << "segment_seq=" << header.journal_segment_seq
+ << ", physical_segment_id=" << header.physical_segment_id
+ << ", journal_tail=" << header.journal_tail
+ << ", segment_nonce=" << header.segment_nonce
+ << ")";
+}
+
+segment_nonce_t generate_nonce(
+ segment_seq_t seq,
+ const seastore_meta_t &meta)
+{
+ return ceph_crc32c(
+ seq,
+ reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
+ sizeof(meta.seastore_id.uuid));
+}
+
+Journal::Journal(SegmentManager &segment_manager)
+ : block_size(segment_manager.get_block_size()),
+ max_record_length(
+ segment_manager.get_segment_size() -
+ p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+ size_t(block_size))),
+ segment_manager(segment_manager) {}
+
+
+Journal::initialize_segment_ertr::future<segment_seq_t>
+Journal::initialize_segment(Segment &segment)
+{
+ auto new_tail = segment_provider->get_journal_tail_target();
+ logger().debug(
+ "initialize_segment {} journal_tail_target {}",
+ segment.get_segment_id(),
+ new_tail);
+ // write out header
+ ceph_assert(segment.get_write_ptr() == 0);
+ bufferlist bl;
+
+ segment_seq_t seq = next_journal_segment_seq++;
+ current_segment_nonce = generate_nonce(
+ seq, segment_manager.get_meta());
+ auto header = segment_header_t{
+ seq,
+ segment.get_segment_id(),
+ segment_provider->get_journal_tail_target(),
+ current_segment_nonce};
+ encode(header, bl);
+
+ bufferptr bp(
+ ceph::buffer::create_page_aligned(
+ segment_manager.get_block_size()));
+ bp.zero();
+ auto iter = bl.cbegin();
+ iter.copy(bl.length(), bp.c_str());
+ bl.clear();
+ bl.append(bp);
+
+ written_to = segment_manager.get_block_size();
+ committed_to = 0;
+ return segment.write(0, bl).safe_then(
+ [=] {
+ segment_provider->update_journal_tail_committed(new_tail);
+ return seq;
+ },
+ initialize_segment_ertr::pass_further{},
+ crimson::ct_error::assert_all{ "TODO" });
+}
+
+ceph::bufferlist Journal::encode_record(
+ record_size_t rsize,
+ record_t &&record)
+{
+ bufferlist data_bl;
+ for (auto &i: record.extents) {
+ data_bl.append(i.bl);
+ }
+
+ bufferlist bl;
+ record_header_t header{
+ rsize.mdlength,
+ rsize.dlength,
+ (uint32_t)record.deltas.size(),
+ (uint32_t)record.extents.size(),
+ current_segment_nonce,
+ committed_to,
+ data_bl.crc32c(-1)
+ };
+ encode(header, bl);
+
+ auto metadata_crc_filler = bl.append_hole(sizeof(uint32_t));
+
+ for (const auto &i: record.extents) {
+ encode(extent_info_t(i), bl);
+ }
+ for (const auto &i: record.deltas) {
+ encode(i, bl);
+ }
+ if (bl.length() % block_size != 0) {
+ bl.append_zero(
+ block_size - (bl.length() % block_size));
+ }
+ ceph_assert(bl.length() == rsize.mdlength);
+
+
+ auto bliter = bl.cbegin();
+ auto metadata_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<record_header_t>(),
+ -1);
+ bliter += sizeof(checksum_t); /* crc hole again */
+ metadata_crc = bliter.crc32c(
+ bliter.get_remaining(),
+ metadata_crc);
+ ceph_le32 metadata_crc_le;
+ metadata_crc_le = metadata_crc;
+ metadata_crc_filler.copy_in(
+ sizeof(checksum_t),
+ reinterpret_cast<const char *>(&metadata_crc_le));
+
+ bl.claim_append(data_bl);
+ ceph_assert(bl.length() == (rsize.dlength + rsize.mdlength));
+
+ return bl;
+}
+
+bool Journal::validate_metadata(const bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ auto test_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<record_header_t>(),
+ -1);
+ ceph_le32 recorded_crc_le;
+ ::decode(recorded_crc_le, bliter);
+ uint32_t recorded_crc = recorded_crc_le;
+ test_crc = bliter.crc32c(
+ bliter.get_remaining(),
+ test_crc);
+ return test_crc == recorded_crc;
+}
+
+Journal::read_validate_data_ret Journal::read_validate_data(
+ paddr_t record_base,
+ const record_header_t &header)
+{
+ return segment_manager.read(
+ record_base.add_offset(header.mdlength),
+ header.dlength
+ ).safe_then([=, &header](auto bptr) {
+ bufferlist bl;
+ bl.append(bptr);
+ return bl.crc32c(-1) == header.data_crc;
+ });
+}
+
+Journal::write_record_ret Journal::write_record(
+ record_size_t rsize,
+ record_t &&record)
+{
+ ceph::bufferlist to_write = encode_record(
+ rsize, std::move(record));
+ auto target = written_to;
+ assert((to_write.length() % block_size) == 0);
+ written_to += to_write.length();
+ logger().debug(
+ "write_record, mdlength {}, dlength {}, target {}",
+ rsize.mdlength,
+ rsize.dlength,
+ target);
+ return current_journal_segment->write(target, to_write).handle_error(
+ write_record_ertr::pass_further{},
+ crimson::ct_error::assert_all{ "TODO" }).safe_then([this, target] {
+ committed_to = target;
+ return write_record_ret(
+ write_record_ertr::ready_future_marker{},
+ paddr_t{
+ current_journal_segment->get_segment_id(),
+ target});
+ });
+}
+
+Journal::record_size_t Journal::get_encoded_record_length(
+ const record_t &record) const {
+ extent_len_t metadata =
+ (extent_len_t)ceph::encoded_sizeof_bounded<record_header_t>();
+ metadata += sizeof(checksum_t) /* crc */;
+ metadata += record.extents.size() *
+ ceph::encoded_sizeof_bounded<extent_info_t>();
+ extent_len_t data = 0;
+ for (const auto &i: record.deltas) {
+ metadata += ceph::encoded_sizeof(i);
+ }
+ for (const auto &i: record.extents) {
+ data += i.bl.length();
+ }
+ metadata = p2roundup(metadata, block_size);
+ return record_size_t{metadata, data};
+}
+
+bool Journal::needs_roll(segment_off_t length) const
+{
+ return length + written_to >
+ current_journal_segment->get_write_capacity();
+}
+
+Journal::roll_journal_segment_ertr::future<segment_seq_t>
+Journal::roll_journal_segment()
+{
+ auto old_segment_id = current_journal_segment ?
+ current_journal_segment->get_segment_id() :
+ NULL_SEG_ID;
+
+ return (current_journal_segment ?
+ current_journal_segment->close() :
+ Segment::close_ertr::now()).safe_then([this] {
+ return segment_provider->get_segment();
+ }).safe_then([this](auto segment) {
+ return segment_manager.open(segment);
+ }).safe_then([this](auto sref) {
+ current_journal_segment = sref;
+ written_to = 0;
+ return initialize_segment(*current_journal_segment);
+ }).safe_then([=](auto seq) {
+ if (old_segment_id != NULL_SEG_ID) {
+ segment_provider->close_segment(old_segment_id);
+ }
+ segment_provider->set_journal_segment(
+ current_journal_segment->get_segment_id(),
+ seq);
+ return seq;
+ }).handle_error(
+ roll_journal_segment_ertr::pass_further{},
+ crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
+ );
+}
+
+Journal::read_segment_header_ret
+Journal::read_segment_header(segment_id_t segment)
+{
+ return segment_manager.read(paddr_t{segment, 0}, block_size
+ ).handle_error(
+ read_segment_header_ertr::pass_further{},
+ crimson::ct_error::assert_all{}
+ ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
+ logger().debug("segment {} bptr size {}", segment, bptr.length());
+
+ segment_header_t header;
+ bufferlist bl;
+ bl.push_back(bptr);
+
+ logger().debug(
+ "Journal::read_segment_header: segment {} block crc {}",
+ segment,
+ bl.begin().crc32c(block_size, 0));
+
+ auto bp = bl.cbegin();
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "Journal::read_segment_header: segment {} unable to decode "
+ "header, skipping",
+ segment);
+ return crimson::ct_error::enodata::make();
+ }
+ logger().debug(
+ "Journal::read_segment_header: segment {} header {}",
+ segment,
+ header);
+ return read_segment_header_ret(
+ read_segment_header_ertr::ready_future_marker{},
+ header);
+ });
+}
+
+Journal::open_for_write_ret Journal::open_for_write()
+{
+ return roll_journal_segment().safe_then([this](auto seq) {
+ return open_for_write_ret(
+ open_for_write_ertr::ready_future_marker{},
+ journal_seq_t{
+ seq,
+ paddr_t{
+ current_journal_segment->get_segment_id(),
+ static_cast<segment_off_t>(block_size)}
+ });
+ });
+}
+
+Journal::find_replay_segments_fut Journal::find_replay_segments()
+{
+ return seastar::do_with(
+ std::vector<std::pair<segment_id_t, segment_header_t>>(),
+ [this](auto &&segments) mutable {
+ return crimson::do_for_each(
+ boost::make_counting_iterator(segment_id_t{0}),
+ boost::make_counting_iterator(segment_manager.get_num_segments()),
+ [this, &segments](auto i) {
+ return read_segment_header(i
+ ).safe_then([this, &segments, i](auto header) mutable {
+ if (generate_nonce(
+ header.journal_segment_seq,
+ segment_manager.get_meta()) != header.segment_nonce) {
+ logger().debug(
+ "find_replay_segments: nonce mismatch segment {} header {}",
+ i,
+ header);
+ assert(0 == "impossible");
+ return find_replay_segments_ertr::now();
+ }
+
+ segments.emplace_back(i, std::move(header));
+ return find_replay_segments_ertr::now();
+ }).handle_error(
+ crimson::ct_error::enoent::handle([i](auto) {
+ logger().debug(
+ "find_replay_segments: segment {} not available for read",
+ i);
+ return find_replay_segments_ertr::now();
+ }),
+ crimson::ct_error::enodata::handle([i](auto) {
+ logger().debug(
+ "find_replay_segments: segment {} header undecodable",
+ i);
+ return find_replay_segments_ertr::now();
+ }),
+ find_replay_segments_ertr::pass_further{},
+ crimson::ct_error::assert_all{}
+ );
+ }).safe_then([this, &segments]() mutable -> find_replay_segments_fut {
+ logger().debug(
+ "find_replay_segments: have {} segments",
+ segments.size());
+ if (segments.empty()) {
+ return crimson::ct_error::input_output_error::make();
+ }
+ std::sort(
+ segments.begin(),
+ segments.end(),
+ [](const auto &lt, const auto &rt) {
+ return lt.second.journal_segment_seq <
+ rt.second.journal_segment_seq;
+ });
+
+ next_journal_segment_seq =
+ segments.rbegin()->second.journal_segment_seq + 1;
+ std::for_each(
+ segments.begin(),
+ segments.end(),
+ [this](auto &seg) {
+ segment_provider->init_mark_segment_closed(
+ seg.first,
+ seg.second.journal_segment_seq);
+ });
+
+ auto journal_tail = segments.rbegin()->second.journal_tail;
+ segment_provider->update_journal_tail_committed(journal_tail);
+ auto replay_from = journal_tail.offset;
+ logger().debug(
+ "Journal::find_replay_segments: journal_tail={}",
+ journal_tail);
+ auto from = segments.begin();
+ if (replay_from != P_ADDR_NULL) {
+ from = std::find_if(
+ segments.begin(),
+ segments.end(),
+ [&replay_from](const auto &seg) -> bool {
+ return seg.first == replay_from.segment;
+ });
+ if (from->second.journal_segment_seq != journal_tail.segment_seq) {
+ logger().error(
+ "find_replay_segments: journal_tail {} does not match {}",
+ journal_tail,
+ from->second);
+ assert(0 == "invalid");
+ }
+ } else {
+ replay_from = paddr_t{from->first, (segment_off_t)block_size};
+ }
+ auto ret = replay_segments_t(segments.end() - from);
+ std::transform(
+ from, segments.end(), ret.begin(),
+ [this](const auto &p) {
+ auto ret = journal_seq_t{
+ p.second.journal_segment_seq,
+ paddr_t{p.first, (segment_off_t)block_size}};
+ logger().debug(
+ "Journal::find_replay_segments: replaying from {}",
+ ret);
+ return std::make_pair(ret, p.second);
+ });
+ ret[0].first.offset = replay_from;
+ return find_replay_segments_fut(
+ find_replay_segments_ertr::ready_future_marker{},
+ std::move(ret));
+ });
+ });
+}
+
+Journal::read_validate_record_metadata_ret Journal::read_validate_record_metadata(
+ paddr_t start,
+ segment_nonce_t nonce)
+{
+ if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ return segment_manager.read(start, block_size
+ ).safe_then(
+ [=](bufferptr bptr) mutable
+ -> read_validate_record_metadata_ret {
+ logger().debug("read_validate_record_metadata: reading {}", start);
+ bufferlist bl;
+ bl.append(bptr);
+ auto bp = bl.cbegin();
+ record_header_t header;
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ if (header.segment_nonce != nonce) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ if (header.mdlength > block_size) {
+ if (start.offset + header.mdlength >
+ (int64_t)segment_manager.get_segment_size()) {
+ return crimson::ct_error::input_output_error::make();
+ }
+ return segment_manager.read(
+ {start.segment, start.offset + (segment_off_t)block_size},
+ header.mdlength - block_size).safe_then(
+ [header=std::move(header), bl=std::move(bl)](
+ auto &&bptail) mutable {
+ bl.push_back(bptail);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl)));
+ });
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl))
+ );
+ }
+ }).safe_then([=](auto p) {
+ if (p && validate_metadata(p->second)) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::move(*p)
+ );
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ });
+}
+
+std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
+ record_header_t header,
+ const bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ bliter += sizeof(checksum_t) /* crc */;
+ bliter += header.extents * ceph::encoded_sizeof_bounded<extent_info_t>();
+ logger().debug("{}: decoding {} deltas", __func__, header.deltas);
+ std::vector<delta_info_t> deltas(header.deltas);
+ for (auto &&i : deltas) {
+ try {
+ decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ return std::nullopt;
+ }
+ }
+ return deltas;
+}
+
+std::optional<std::vector<extent_info_t>> Journal::try_decode_extent_infos(
+ record_header_t header,
+ const bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ bliter += sizeof(checksum_t) /* crc */;
+ logger().debug("{}: decoding {} extents", __func__, header.extents);
+ std::vector<extent_info_t> extent_infos(header.extents);
+ for (auto &&i : extent_infos) {
+ try {
+ decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ return std::nullopt;
+ }
+ }
+ return extent_infos;
+}
+
+Journal::replay_ertr::future<>
+Journal::replay_segment(
+ journal_seq_t seq,
+ segment_header_t header,
+ delta_handler_t &handler)
+{
+ logger().debug("replay_segment: starting at {}", seq);
+ return seastar::do_with(
+ scan_valid_records_cursor(seq.offset),
+ found_record_handler_t(
+ [=, &handler](paddr_t base,
+ const record_header_t &header,
+ const bufferlist &mdbuf) {
+ auto deltas = try_decode_deltas(
+ header,
+ mdbuf);
+ if (!deltas) {
+ // This should be impossible, we did check the crc on the mdbuf
+ logger().error(
+ "Journal::replay_segment unable to decode deltas for record {}",
+ base);
+ assert(deltas);
+ }
+
+ return seastar::do_with(
+ std::move(*deltas),
+ [=](auto &deltas) {
+ return crimson::do_for_each(
+ deltas,
+ [=](auto &delta) {
+ /* The journal may validly contain deltas for extents in
+ * since released segments. We can detect those cases by
+ * checking whether the segment in question currently has a
+ * sequence number > the current journal segment seq. We can
+ * safetly skip these deltas because the extent must already
+ * have been rewritten.
+ *
+ * Note, this comparison exploits the fact that
+ * SEGMENT_SEQ_NULL is a large number.
+ */
+ if (delta.paddr != P_ADDR_NULL &&
+ (segment_provider->get_seq(delta.paddr.segment) >
+ seq.segment_seq)) {
+ return replay_ertr::now();
+ } else {
+ return handler(
+ journal_seq_t{seq.segment_seq, base},
+ base.add_offset(header.mdlength),
+ delta);
+ }
+ });
+ });
+ }),
+ [=](auto &cursor, auto &dhandler) {
+ return scan_valid_records(
+ cursor,
+ header.segment_nonce,
+ std::numeric_limits<size_t>::max(),
+ dhandler).safe_then([](auto){});
+ });
+}
+
+Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
+{
+ return seastar::do_with(
+ std::move(delta_handler), replay_segments_t(),
+ [this](auto &handler, auto &segments) mutable -> replay_ret {
+ return find_replay_segments().safe_then(
+ [this, &handler, &segments](auto replay_segs) mutable {
+ logger().debug("replay: found {} segments", replay_segs.size());
+ segments = std::move(replay_segs);
+ return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
+ return replay_segment(i.first, i.second, handler);
+ });
+ });
+ });
+}
+
+Journal::scan_extents_ret Journal::scan_extents(
+ scan_extents_cursor &cursor,
+ extent_len_t bytes_to_read)
+{
+ auto ret = std::make_unique<scan_extents_ret_bare>();
+ auto &retref = *ret;
+ return read_segment_header(cursor.get_offset().segment
+ ).handle_error(
+ scan_extents_ertr::pass_further{},
+ crimson::ct_error::assert_all{}
+ ).safe_then([&](auto segment_header) {
+ auto segment_nonce = segment_header.segment_nonce;
+ return seastar::do_with(
+ found_record_handler_t(
+ [&](
+ paddr_t base,
+ const record_header_t &header,
+ const bufferlist &mdbuf) mutable {
+
+ auto infos = try_decode_extent_infos(
+ header,
+ mdbuf);
+ if (!infos) {
+ // This should be impossible, we did check the crc on the mdbuf
+ logger().error(
+ "Journal::scan_extents unable to decode extents for record {}",
+ base);
+ assert(infos);
+ }
+
+ paddr_t extent_offset = base.add_offset(header.mdlength);
+ for (const auto &i : *infos) {
+ retref.emplace_back(extent_offset, i);
+ extent_offset.offset += i.len;
+ }
+ return scan_extents_ertr::now();
+ }),
+ [=, &cursor](auto &dhandler) {
+ return scan_valid_records(
+ cursor,
+ segment_nonce,
+ std::numeric_limits<size_t>::max(),
+ dhandler).safe_then([](auto){});
+ });
+ }).safe_then([ret=std::move(ret)] {
+ return std::move(*ret);
+ });
+}
+
+Journal::scan_valid_records_ret Journal::scan_valid_records(
+ scan_valid_records_cursor &cursor,
+ segment_nonce_t nonce,
+ size_t budget,
+ found_record_handler_t &handler)
+{
+ if (cursor.offset.offset == 0) {
+ cursor.offset.offset = block_size;
+ }
+ auto retref = std::make_unique<size_t>(0);
+ auto budget_used = *retref;
+ return crimson::do_until(
+ [=, &cursor, &budget_used, &handler]() mutable
+ -> scan_valid_records_ertr::future<bool> {
+ return [=, &handler, &cursor, &budget_used] {
+ if (!cursor.last_valid_header_found) {
+ return read_validate_record_metadata(cursor.offset, nonce
+ ).safe_then([=, &cursor](auto md) {
+ logger().debug(
+ "Journal::scan_valid_records: read complete {}",
+ cursor.offset);
+ if (!md) {
+ logger().debug(
+ "Journal::scan_valid_records: found invalid header at {}, presumably at end",
+ cursor.offset);
+ cursor.last_valid_header_found = true;
+ return scan_valid_records_ertr::now();
+ } else {
+ logger().debug(
+ "Journal::scan_valid_records: valid record read at {}",
+ cursor.offset);
+ cursor.last_committed = paddr_t{
+ cursor.offset.segment,
+ md->first.committed_to};
+ cursor.pending_records.emplace_back(
+ cursor.offset,
+ md->first,
+ md->second);
+ cursor.offset.offset +=
+ md->first.dlength + md->first.mdlength;
+ return scan_valid_records_ertr::now();
+ }
+ }).safe_then([=, &cursor, &budget_used, &handler] {
+ return crimson::do_until(
+ [=, &budget_used, &cursor, &handler] {
+ logger().debug(
+ "Journal::scan_valid_records: valid record read, processing queue");
+ if (cursor.pending_records.empty()) {
+ /* This is only possible if the segment is empty.
+ * A record's last_commited must be prior to its own
+ * location since it itself cannot yet have been committed
+ * at its own time of submission. Thus, the most recently
+ * read record must always fall after cursor.last_committed */
+ return scan_valid_records_ertr::make_ready_future<bool>(true);
+ }
+ auto &next = cursor.pending_records.front();
+ if (next.offset > cursor.last_committed) {
+ return scan_valid_records_ertr::make_ready_future<bool>(true);
+ }
+ budget_used +=
+ next.header.dlength + next.header.mdlength;
+ return handler(
+ next.offset,
+ next.header,
+ next.mdbuffer
+ ).safe_then([&cursor] {
+ cursor.pending_records.pop_front();
+ return scan_valid_records_ertr::make_ready_future<bool>(false);
+ });
+ });
+ });
+ } else {
+ assert(!cursor.pending_records.empty());
+ auto &next = cursor.pending_records.front();
+ return read_validate_data(next.offset, next.header
+ ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
+ if (!valid) {
+ cursor.pending_records.clear();
+ return scan_valid_records_ertr::now();
+ }
+ budget_used +=
+ next.header.dlength + next.header.mdlength;
+ return handler(
+ next.offset,
+ next.header,
+ next.mdbuffer
+ ).safe_then([&cursor] {
+ cursor.pending_records.pop_front();
+ return scan_valid_records_ertr::now();
+ });
+ });
+ }
+ }().safe_then([=, &budget_used, &cursor] {
+ return scan_valid_records_ertr::make_ready_future<bool>(
+ cursor.is_complete() || budget_used >= budget);
+ });
+ }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+ return scan_valid_records_ret(
+ scan_valid_records_ertr::ready_future_marker{},
+ std::move(*retref));
+ });
+}
+
+
+}