diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/osd/PGLog.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/osd/PGLog.cc | 1290 |
1 files changed, 1290 insertions, 0 deletions
diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc new file mode 100644 index 000000000..116036693 --- /dev/null +++ b/src/osd/PGLog.cc @@ -0,0 +1,1290 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com> + * + * Author: Loic Dachary <loic@dachary.org> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "PGLog.h" +#include "include/unordered_map.h" +#include "common/ceph_context.h" + +using std::make_pair; +using std::map; +using std::ostream; +using std::set; +using std::string; + +using ceph::bufferlist; +using ceph::decode; +using ceph::encode; + +#define dout_context cct +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +static ostream& _prefix(std::ostream *_dout, const PGLog *pglog) +{ + return pglog->gen_prefix(*_dout); +} + +//////////////////// PGLog::IndexedLog //////////////////// + +void PGLog::IndexedLog::split_out_child( + pg_t child_pgid, + unsigned split_bits, + PGLog::IndexedLog *target) +{ + unindex(); + *target = IndexedLog(pg_log_t::split_out_child(child_pgid, split_bits)); + index(); + target->index(); + reset_rollback_info_trimmed_to_riter(); +} + +void PGLog::IndexedLog::trim( + CephContext* cct, + eversion_t s, + set<eversion_t> *trimmed, + set<string>* trimmed_dups, + eversion_t *write_from_dups) +{ + lgeneric_subdout(cct, osd, 10) << "IndexedLog::trim s=" << s << dendl; + ceph_assert(s <= can_rollback_to); + if (complete_to != log.end()) + lgeneric_subdout(cct, osd, 20) << " complete_to " << complete_to->version << dendl; + + auto earliest_dup_version = + log.rbegin()->version.version < cct->_conf->osd_pg_log_dups_tracked + ? 0u + : log.rbegin()->version.version - cct->_conf->osd_pg_log_dups_tracked + 1; + + lgeneric_subdout(cct, osd, 20) << "earliest_dup_version = " << earliest_dup_version << dendl; + while (!log.empty()) { + const pg_log_entry_t &e = *log.begin(); + if (e.version > s) + break; + lgeneric_subdout(cct, osd, 20) << "trim " << e << dendl; + if (trimmed) + trimmed->emplace(e.version); + + unindex(e); // remove from index, + + // add to dup list + if (e.version.version >= earliest_dup_version) { + if (write_from_dups != nullptr && *write_from_dups > e.version) { + lgeneric_subdout(cct, osd, 20) << "updating write_from_dups from " << *write_from_dups << " to " << e.version << dendl; + *write_from_dups = e.version; + } + dups.push_back(pg_log_dup_t(e)); + index(dups.back()); + uint32_t idx = 0; + for (const auto& extra : e.extra_reqids) { + int return_code = e.return_code; + if (return_code >= 0) { + auto it = e.extra_reqid_return_codes.find(idx); + if (it != e.extra_reqid_return_codes.end()) { + return_code = it->second; + // FIXME: we aren't setting op_returns for these extra_reqids + } + } + ++idx; + + // note: extras have the same version as outer op + dups.push_back(pg_log_dup_t(e.version, extra.second, + extra.first, return_code)); + index(dups.back()); + } + } + + bool reset_complete_to = false; + // we are trimming past complete_to, so reset complete_to + if (complete_to != log.end() && e.version >= complete_to->version) + reset_complete_to = true; + if (rollback_info_trimmed_to_riter == log.rend() || + e.version == rollback_info_trimmed_to_riter->version) { + log.pop_front(); + rollback_info_trimmed_to_riter = log.rend(); + } else { + log.pop_front(); + } + + // reset complete_to to the beginning of the log + if (reset_complete_to) { + complete_to = log.begin(); + if (complete_to != log.end()) { + lgeneric_subdout(cct, osd, 20) << " moving complete_to to " + << log.begin()->version << dendl; + } else { + lgeneric_subdout(cct, osd, 20) << " log is now empty" << dendl; + } + } + } + + // we can hit an inflated `dups` b/c of https://tracker.ceph.com/issues/53729 + // the idea is to slowly trim them over a prolonged period of time and mix + // omap deletes with writes (if we're here, a new log entry got added) to + // neither: 1) blow size of single Transaction nor 2) generate-n-accumulate + // large amount of tombstones in BlueStore's RocksDB. + // if trimming immediately is a must, then the ceph-objectstore-tool is + // the way to go. + const size_t max_dups = cct->_conf->osd_pg_log_dups_tracked; + for (size_t max_dups_to_trim = cct->_conf->osd_pg_log_trim_max; + max_dups_to_trim > 0 && dups.size() > max_dups; + max_dups_to_trim--) { + const auto& e = *dups.begin(); + lgeneric_subdout(cct, osd, 20) << "trim dup " << e << dendl; + if (trimmed_dups) + trimmed_dups->insert(e.get_key_name()); + unindex(e); + dups.pop_front(); + } + + // raise tail? + if (tail < s) + tail = s; + lgeneric_subdout(cct, osd, 20) << "IndexedLog::trim after trim" + << " dups.size()=" << dups.size() + << " tail=" << tail + << " s=" << s << dendl; +} + +ostream& PGLog::IndexedLog::print(ostream& out) const +{ + out << *this << std::endl; + for (auto p = log.begin(); p != log.end(); ++p) { + out << *p << " " << + (logged_object(p->soid) ? "indexed" : "NOT INDEXED") << + std::endl; + ceph_assert(!p->reqid_is_indexed() || logged_req(p->reqid)); + } + + for (auto p = dups.begin(); p != dups.end(); ++p) { + out << *p << std::endl; + } + + return out; +} + +//////////////////// PGLog //////////////////// + +void PGLog::reset_backfill() +{ + missing.clear(); +} + +void PGLog::clear() { + missing.clear(); + log.clear(); + log_keys_debug.clear(); + undirty(); +} + +void PGLog::clear_info_log( + spg_t pgid, + ObjectStore::Transaction *t) { + coll_t coll(pgid); + t->remove(coll, pgid.make_pgmeta_oid()); +} + +void PGLog::trim( + eversion_t trim_to, + pg_info_t &info, + bool transaction_applied, + bool async) +{ + dout(10) << __func__ << " proposed trim_to = " << trim_to << dendl; + // trim? + if (trim_to > log.tail) { + dout(10) << __func__ << " missing = " << missing.num_missing() << dendl; + // Don't assert for async_recovery_targets or backfill_targets + // or whenever there are missing items + if (transaction_applied && !async && (missing.num_missing() == 0)) + ceph_assert(trim_to <= info.last_complete); + + dout(10) << "trim " << log << " to " << trim_to << dendl; + log.trim(cct, trim_to, &trimmed, &trimmed_dups, &write_from_dups); + info.log_tail = log.tail; + if (log.complete_to != log.log.end()) + dout(10) << " after trim complete_to " << log.complete_to->version << dendl; + } +} + +void PGLog::proc_replica_log( + pg_info_t &oinfo, + const pg_log_t &olog, + pg_missing_t& omissing, + pg_shard_t from) const +{ + dout(10) << "proc_replica_log for osd." << from << ": " + << oinfo << " " << olog << " " << omissing << dendl; + + if (olog.head < log.tail) { + dout(10) << __func__ << ": osd." << from << " does not overlap, not looking " + << "for divergent objects" << dendl; + return; + } + if (olog.head == log.head) { + dout(10) << __func__ << ": osd." << from << " same log head, not looking " + << "for divergent objects" << dendl; + return; + } + + /* + basically what we're doing here is rewinding the remote log, + dropping divergent entries, until we find something that matches + our master log. we then reset last_update to reflect the new + point up to which missing is accurate. + + later, in activate(), missing will get wound forward again and + we will send the peer enough log to arrive at the same state. + */ + + for (auto i = omissing.get_items().begin(); + i != omissing.get_items().end(); + ++i) { + dout(20) << " before missing " << i->first << " need " << i->second.need + << " have " << i->second.have << dendl; + } + + auto first_non_divergent = log.log.rbegin(); + while (1) { + if (first_non_divergent == log.log.rend()) + break; + if (first_non_divergent->version <= olog.head) { + dout(20) << "merge_log point (usually last shared) is " + << *first_non_divergent << dendl; + break; + } + ++first_non_divergent; + } + + /* Because olog.head >= log.tail, we know that both pgs must at least have + * the event represented by log.tail. Similarly, because log.head >= olog.tail, + * we know that the event represented by olog.tail must be common to both logs. + * Furthermore, the event represented by a log tail was necessarily trimmed, + * thus neither olog.tail nor log.tail can be divergent. It's + * possible that olog/log contain no actual events between olog.head and + * max(log.tail, olog.tail), however, since they might have been split out. + * Thus, if we cannot find an event e such that + * log.tail <= e.version <= log.head, the last_update must actually be + * max(log.tail, olog.tail). + */ + eversion_t limit = std::max(olog.tail, log.tail); + eversion_t lu = + (first_non_divergent == log.log.rend() || + first_non_divergent->version < limit) ? + limit : + first_non_divergent->version; + + // we merge and adjust the replica's log, rollback the rollbackable divergent entry, + // remove the unrollbackable divergent entry and mark the according object as missing. + // the rollback boundary must choose crt of the olog which going to be merged. + // The replica log's(olog) crt will not be modified, so it could get passed + // to _merge_divergent_entries() directly. + IndexedLog folog(olog); + auto divergent = folog.rewind_from_head(lu); + _merge_divergent_entries( + folog, + divergent, + oinfo, + olog.get_can_rollback_to(), + omissing, + 0, + this); + + if (lu < oinfo.last_update) { + dout(10) << " peer osd." << from << " last_update now " << lu << dendl; + oinfo.last_update = lu; + } + + if (omissing.have_missing()) { + eversion_t first_missing = + omissing.get_items().at(omissing.get_rmissing().begin()->second).need; + oinfo.last_complete = eversion_t(); + for (auto i = olog.log.begin(); i != olog.log.end(); ++i) { + if (i->version < first_missing) + oinfo.last_complete = i->version; + else + break; + } + } else { + oinfo.last_complete = oinfo.last_update; + } +} // proc_replica_log + +/** + * rewind divergent entries at the head of the log + * + * This rewinds entries off the head of our log that are divergent. + * This is used by replicas during activation. + * + * @param newhead new head to rewind to + */ +void PGLog::rewind_divergent_log(eversion_t newhead, + pg_info_t &info, LogEntryHandler *rollbacker, + bool &dirty_info, bool &dirty_big_info) +{ + dout(10) << "rewind_divergent_log truncate divergent future " << + newhead << dendl; + + // We need to preserve the original crt before it gets updated in rewind_from_head(). + // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback + // a divergent entry or not. + eversion_t original_crt = log.get_can_rollback_to(); + dout(20) << __func__ << " original_crt = " << original_crt << dendl; + if (info.last_complete > newhead) + info.last_complete = newhead; + + auto divergent = log.rewind_from_head(newhead); + if (!divergent.empty()) { + mark_dirty_from(divergent.front().version); + } + for (auto &&entry: divergent) { + dout(10) << "rewind_divergent_log future divergent " << entry << dendl; + } + info.last_update = newhead; + + _merge_divergent_entries( + log, + divergent, + info, + original_crt, + missing, + rollbacker, + this); + + dirty_info = true; + dirty_big_info = true; +} + +void PGLog::merge_log(pg_info_t &oinfo, pg_log_t&& olog, pg_shard_t fromosd, + pg_info_t &info, LogEntryHandler *rollbacker, + bool &dirty_info, bool &dirty_big_info) +{ + dout(10) << "merge_log " << olog << " from osd." << fromosd + << " into " << log << dendl; + + // Check preconditions + + // If our log is empty, the incoming log needs to have not been trimmed. + ceph_assert(!log.null() || olog.tail == eversion_t()); + // The logs must overlap. + ceph_assert(log.head >= olog.tail && olog.head >= log.tail); + + for (auto i = missing.get_items().begin(); + i != missing.get_items().end(); + ++i) { + dout(20) << "pg_missing_t sobject: " << i->first << dendl; + } + + bool changed = false; + + // extend on tail? + // this is just filling in history. it does not affect our + // missing set, as that should already be consistent with our + // current log. + eversion_t orig_tail = log.tail; + if (olog.tail < log.tail) { + dout(10) << "merge_log extending tail to " << olog.tail << dendl; + auto from = olog.log.begin(); + auto to = from; + eversion_t last; + for (; to != olog.log.end(); ++to) { + if (to->version > log.tail) + break; + log.index(*to); + dout(15) << *to << dendl; + last = to->version; + } + mark_dirty_to(last); + + // splice into our log. + log.log.splice(log.log.begin(), + std::move(olog.log), from, to); + + info.log_tail = log.tail = olog.tail; + changed = true; + } + + if (oinfo.stats.reported_seq < info.stats.reported_seq || // make sure reported always increases + oinfo.stats.reported_epoch < info.stats.reported_epoch) { + oinfo.stats.reported_seq = info.stats.reported_seq; + oinfo.stats.reported_epoch = info.stats.reported_epoch; + } + if (info.last_backfill.is_max()) + info.stats = oinfo.stats; + info.hit_set = oinfo.hit_set; + + // do we have divergent entries to throw out? + if (olog.head < log.head) { + rewind_divergent_log(olog.head, info, rollbacker, dirty_info, dirty_big_info); + changed = true; + } + + // extend on head? + if (olog.head > log.head) { + dout(10) << "merge_log extending head to " << olog.head << dendl; + + // find start point in olog + auto to = olog.log.end(); + auto from = olog.log.end(); + eversion_t lower_bound = std::max(olog.tail, orig_tail); + while (1) { + if (from == olog.log.begin()) + break; + --from; + dout(20) << " ? " << *from << dendl; + if (from->version <= log.head) { + lower_bound = std::max(lower_bound, from->version); + ++from; + break; + } + } + dout(20) << "merge_log cut point (usually last shared) is " + << lower_bound << dendl; + mark_dirty_from(lower_bound); + + // We need to preserve the original crt before it gets updated in rewind_from_head(). + // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback + // a divergent entry or not. + eversion_t original_crt = log.get_can_rollback_to(); + dout(20) << __func__ << " original_crt = " << original_crt << dendl; + auto divergent = log.rewind_from_head(lower_bound); + // move aside divergent items + for (auto &&oe: divergent) { + dout(10) << "merge_log divergent " << oe << dendl; + } + log.roll_forward_to(log.head, rollbacker); + + mempool::osd_pglog::list<pg_log_entry_t> new_entries; + new_entries.splice(new_entries.end(), olog.log, from, to); + append_log_entries_update_missing( + info.last_backfill, + new_entries, + false, + &log, + missing, + rollbacker, + this); + + _merge_divergent_entries( + log, + divergent, + info, + original_crt, + missing, + rollbacker, + this); + + info.last_update = log.head = olog.head; + + // We cannot rollback into the new log entries + log.skip_can_rollback_to_to_head(); + + info.last_user_version = oinfo.last_user_version; + info.purged_snaps = oinfo.purged_snaps; + // update num_missing too + // we might have appended some more missing objects above + info.stats.stats.sum.num_objects_missing = missing.num_missing(); + + changed = true; + } + + // now handle dups + if (merge_log_dups(olog)) { + changed = true; + } + + dout(10) << "merge_log result " << log << " " << missing << + " changed=" << changed << dendl; + + if (changed) { + dirty_info = true; + dirty_big_info = true; + } +} + + +// returns true if any changes were made to log.dups +bool PGLog::merge_log_dups(const pg_log_t& olog) { + dout(5) << __func__ + << " log.dups.size()=" << log.dups.size() + << "olog.dups.size()=" << olog.dups.size() << dendl; + bool changed = false; + + if (!olog.dups.empty()) { + if (log.dups.empty()) { + dout(10) << "merge_log copying olog dups to log " << + olog.dups.front().version << " to " << + olog.dups.back().version << dendl; + changed = true; + dirty_from_dups = eversion_t(); + dirty_to_dups = eversion_t::max(); + // since our log.dups is empty just copy them + for (const auto& i : olog.dups) { + log.dups.push_back(i); + log.index(log.dups.back()); + } + } else { + // since our log.dups is not empty try to extend on each end + + if (olog.dups.back().version > log.dups.back().version) { + // extend the dups's tail (i.e., newer dups) + dout(10) << "merge_log extending dups tail to " << + olog.dups.back().version << dendl; + changed = true; + + auto log_tail_version = log.dups.back().version; + + auto insert_cursor = log.dups.end(); + eversion_t last_shared = eversion_t::max(); + for (auto i = olog.dups.crbegin(); i != olog.dups.crend(); ++i) { + if (i->version <= log_tail_version) break; + log.dups.insert(insert_cursor, *i); + last_shared = i->version; + + auto prev = insert_cursor; + --prev; + // be sure to pass reference of copy in log.dups + log.index(*prev); + + --insert_cursor; // make sure we insert in reverse order + } + mark_dirty_from_dups(last_shared); + } + + if (olog.dups.front().version < log.dups.front().version) { + // extend the dups's head (i.e., older dups) + dout(10) << "merge_log extending dups head to " << + olog.dups.front().version << dendl; + changed = true; + + eversion_t last; + auto insert_cursor = log.dups.begin(); + for (auto i = olog.dups.cbegin(); i != olog.dups.cend(); ++i) { + if (i->version >= insert_cursor->version) break; + log.dups.insert(insert_cursor, *i); + last = i->version; + auto prev = insert_cursor; + --prev; + // be sure to pass address of copy in log.dups + log.index(*prev); + } + mark_dirty_to_dups(last); + } + } + } + + // remove any dup entries that overlap with pglog + if (!log.dups.empty() && log.dups.back().version > log.tail) { + dout(10) << "merge_log removed dups overlapping log entries (" << + log.tail << "," << log.dups.back().version << "]" << dendl; + changed = true; + + while (!log.dups.empty() && log.dups.back().version > log.tail) { + log.unindex(log.dups.back()); + mark_dirty_from_dups(log.dups.back().version); + log.dups.pop_back(); + } + } + + dout(5) << "end of " << __func__ << " changed=" << changed + << " log.dups.size()=" << log.dups.size() + << " olog.dups.size()=" << olog.dups.size() << dendl; + + return changed; +} + +void PGLog::check() { + if (!pg_log_debug) + return; + if (log.log.size() != log_keys_debug.size()) { + derr << "log.log.size() != log_keys_debug.size()" << dendl; + derr << "actual log:" << dendl; + for (auto i = log.log.begin(); i != log.log.end(); ++i) { + derr << " " << *i << dendl; + } + derr << "log_keys_debug:" << dendl; + for (auto i = log_keys_debug.begin(); + i != log_keys_debug.end(); + ++i) { + derr << " " << *i << dendl; + } + } + ceph_assert(log.log.size() == log_keys_debug.size()); + for (auto i = log.log.begin(); i != log.log.end(); ++i) { + ceph_assert(log_keys_debug.count(i->get_key_name())); + } +} + +// non-static +void PGLog::write_log_and_missing( + ObjectStore::Transaction& t, + map<string,bufferlist> *km, + const coll_t& coll, + const ghobject_t &log_oid, + bool require_rollback) +{ + if (needs_write()) { + dout(6) << "write_log_and_missing with: " + << "dirty_to: " << dirty_to + << ", dirty_from: " << dirty_from + << ", writeout_from: " << writeout_from + << ", trimmed: " << trimmed + << ", trimmed_dups: " << trimmed_dups + << ", clear_divergent_priors: " << clear_divergent_priors + << dendl; + _write_log_and_missing( + t, km, log, coll, log_oid, + dirty_to, + dirty_from, + writeout_from, + std::move(trimmed), + std::move(trimmed_dups), + missing, + !touched_log, + require_rollback, + clear_divergent_priors, + dirty_to_dups, + dirty_from_dups, + write_from_dups, + &may_include_deletes_in_missing_dirty, + (pg_log_debug ? &log_keys_debug : nullptr), + this); + undirty(); + } else { + dout(10) << "log is not dirty" << dendl; + } +} + +// static +void PGLog::write_log_and_missing_wo_missing( + ObjectStore::Transaction& t, + map<string,bufferlist> *km, + pg_log_t &log, + const coll_t& coll, const ghobject_t &log_oid, + map<eversion_t, hobject_t> &divergent_priors, + bool require_rollback, + const DoutPrefixProvider *dpp + ) +{ + _write_log_and_missing_wo_missing( + t, km, log, coll, log_oid, + divergent_priors, eversion_t::max(), eversion_t(), eversion_t(), + true, true, require_rollback, + eversion_t::max(), eversion_t(), eversion_t(), nullptr, dpp); +} + +// static +void PGLog::write_log_and_missing( + ObjectStore::Transaction& t, + map<string,bufferlist> *km, + pg_log_t &log, + const coll_t& coll, + const ghobject_t &log_oid, + const pg_missing_tracker_t &missing, + bool require_rollback, + bool *may_include_deletes_in_missing_dirty, + const DoutPrefixProvider *dpp) +{ + _write_log_and_missing( + t, km, log, coll, log_oid, + eversion_t::max(), + eversion_t(), + eversion_t(), + set<eversion_t>(), + set<string>(), + missing, + true, require_rollback, false, + eversion_t::max(), + eversion_t(), + eversion_t(), + may_include_deletes_in_missing_dirty, nullptr, dpp); +} + +// static +void PGLog::_write_log_and_missing_wo_missing( + ObjectStore::Transaction& t, + map<string,bufferlist> *km, + pg_log_t &log, + const coll_t& coll, const ghobject_t &log_oid, + map<eversion_t, hobject_t> &divergent_priors, + eversion_t dirty_to, + eversion_t dirty_from, + eversion_t writeout_from, + bool dirty_divergent_priors, + bool touch_log, + bool require_rollback, + eversion_t dirty_to_dups, + eversion_t dirty_from_dups, + eversion_t write_from_dups, + set<string> *log_keys_debug, + const DoutPrefixProvider *dpp + ) +{ + ldpp_dout(dpp, 10) << "_write_log_and_missing_wo_missing, clearing up to " << dirty_to + << " dirty_to_dups=" << dirty_to_dups + << " dirty_from_dups=" << dirty_from_dups + << " write_from_dups=" << write_from_dups << dendl; + if (touch_log) + t.touch(coll, log_oid); + if (dirty_to != eversion_t()) { + t.omap_rmkeyrange( + coll, log_oid, + eversion_t().get_key_name(), dirty_to.get_key_name()); + clear_up_to(log_keys_debug, dirty_to.get_key_name()); + } + if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) { + // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl; + t.omap_rmkeyrange( + coll, log_oid, + dirty_from.get_key_name(), eversion_t::max().get_key_name()); + clear_after(log_keys_debug, dirty_from.get_key_name()); + } + + for (auto p = log.log.begin(); + p != log.log.end() && p->version <= dirty_to; + ++p) { + bufferlist bl(sizeof(*p) * 2); + p->encode_with_checksum(bl); + (*km)[p->get_key_name()] = std::move(bl); + } + + for (auto p = log.log.rbegin(); + p != log.log.rend() && + (p->version >= dirty_from || p->version >= writeout_from) && + p->version >= dirty_to; + ++p) { + bufferlist bl(sizeof(*p) * 2); + p->encode_with_checksum(bl); + (*km)[p->get_key_name()] = std::move(bl); + } + + if (log_keys_debug) { + for (auto i = (*km).begin(); + i != (*km).end(); + ++i) { + if (i->first[0] == '_') + continue; + ceph_assert(!log_keys_debug->count(i->first)); + log_keys_debug->insert(i->first); + } + } + + // process dups after log_keys_debug is filled, so dups do not + // end up in that set + if (dirty_to_dups != eversion_t()) { + pg_log_dup_t min, dirty_to_dup; + dirty_to_dup.version = dirty_to_dups; + ldpp_dout(dpp, 10) << __func__ << " remove dups min=" << min.get_key_name() + << " to dirty_to_dup=" << dirty_to_dup.get_key_name() << dendl; + t.omap_rmkeyrange( + coll, log_oid, + min.get_key_name(), dirty_to_dup.get_key_name()); + } + if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) { + pg_log_dup_t max, dirty_from_dup; + max.version = eversion_t::max(); + dirty_from_dup.version = dirty_from_dups; + ldpp_dout(dpp, 10) << __func__ << " remove dups dirty_from_dup=" + << dirty_from_dup.get_key_name() + << " to max=" << max.get_key_name() << dendl; + t.omap_rmkeyrange( + coll, log_oid, + dirty_from_dup.get_key_name(), max.get_key_name()); + } + + ldpp_dout(dpp, 10) << __func__ << " going to encode log.dups.size()=" + << log.dups.size() << dendl; + for (const auto& entry : log.dups) { + if (entry.version > dirty_to_dups) + break; + bufferlist bl; + encode(entry, bl); + (*km)[entry.get_key_name()] = std::move(bl); + } + ldpp_dout(dpp, 10) << __func__ << " 1st round encoded log.dups.size()=" + << log.dups.size() << dendl; + for (auto p = log.dups.rbegin(); + p != log.dups.rend() && + (p->version >= dirty_from_dups || p->version >= write_from_dups) && + p->version >= dirty_to_dups; + ++p) { + bufferlist bl; + encode(*p, bl); + (*km)[p->get_key_name()] = std::move(bl); + } + ldpp_dout(dpp, 10) << __func__ << " 2st round encoded log.dups.size()=" + << log.dups.size() << dendl; + + if (dirty_divergent_priors) { + ldpp_dout(dpp, 10) << "write_log_and_missing: writing divergent_priors" + << dendl; + encode(divergent_priors, (*km)["divergent_priors"]); + } + if (require_rollback) { + encode( + log.get_can_rollback_to(), + (*km)["can_rollback_to"]); + encode( + log.get_rollback_info_trimmed_to(), + (*km)["rollback_info_trimmed_to"]); + } + ldpp_dout(dpp, 10) << "end of " << __func__ << dendl; +} + +// static +void PGLog::_write_log_and_missing( + ObjectStore::Transaction& t, + map<string,bufferlist>* km, + pg_log_t &log, + const coll_t& coll, const ghobject_t &log_oid, + eversion_t dirty_to, + eversion_t dirty_from, + eversion_t writeout_from, + set<eversion_t> &&trimmed, + set<string> &&trimmed_dups, + const pg_missing_tracker_t &missing, + bool touch_log, + bool require_rollback, + bool clear_divergent_priors, + eversion_t dirty_to_dups, + eversion_t dirty_from_dups, + eversion_t write_from_dups, + bool *may_include_deletes_in_missing_dirty, // in/out param + set<string> *log_keys_debug, + const DoutPrefixProvider *dpp + ) { + ldpp_dout(dpp, 10) << __func__ << " clearing up to " << dirty_to + << " dirty_to_dups=" << dirty_to_dups + << " dirty_from_dups=" << dirty_from_dups + << " write_from_dups=" << write_from_dups + << " trimmed_dups.size()=" << trimmed_dups.size() << dendl; + set<string> to_remove; + to_remove.swap(trimmed_dups); + for (auto& t : trimmed) { + string key = t.get_key_name(); + if (log_keys_debug) { + auto it = log_keys_debug->find(key); + ceph_assert(it != log_keys_debug->end()); + log_keys_debug->erase(it); + } + to_remove.emplace(std::move(key)); + } + trimmed.clear(); + + if (touch_log) + t.touch(coll, log_oid); + if (dirty_to != eversion_t()) { + t.omap_rmkeyrange( + coll, log_oid, + eversion_t().get_key_name(), dirty_to.get_key_name()); + clear_up_to(log_keys_debug, dirty_to.get_key_name()); + } + if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) { + ldpp_dout(dpp, 10) << "write_log_and_missing, clearing from " + << dirty_from << dendl; + t.omap_rmkeyrange( + coll, log_oid, + dirty_from.get_key_name(), eversion_t::max().get_key_name()); + clear_after(log_keys_debug, dirty_from.get_key_name()); + } + + for (auto p = log.log.begin(); + p != log.log.end() && p->version <= dirty_to; + ++p) { + bufferlist bl(sizeof(*p) * 2); + p->encode_with_checksum(bl); + (*km)[p->get_key_name()] = std::move(bl); + } + + for (auto p = log.log.rbegin(); + p != log.log.rend() && + (p->version >= dirty_from || p->version >= writeout_from) && + p->version >= dirty_to; + ++p) { + bufferlist bl(sizeof(*p) * 2); + p->encode_with_checksum(bl); + (*km)[p->get_key_name()] = std::move(bl); + } + + if (log_keys_debug) { + for (auto i = (*km).begin(); + i != (*km).end(); + ++i) { + if (i->first[0] == '_') + continue; + ceph_assert(!log_keys_debug->count(i->first)); + log_keys_debug->insert(i->first); + } + } + + // process dups after log_keys_debug is filled, so dups do not + // end up in that set + if (dirty_to_dups != eversion_t()) { + pg_log_dup_t min, dirty_to_dup; + dirty_to_dup.version = dirty_to_dups; + ldpp_dout(dpp, 10) << __func__ << " remove dups min=" << min.get_key_name() + << " to dirty_to_dup=" << dirty_to_dup.get_key_name() << dendl; + t.omap_rmkeyrange( + coll, log_oid, + min.get_key_name(), dirty_to_dup.get_key_name()); + } + if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) { + pg_log_dup_t max, dirty_from_dup; + max.version = eversion_t::max(); + dirty_from_dup.version = dirty_from_dups; + ldpp_dout(dpp, 10) << __func__ << " remove dups dirty_from_dup=" + << dirty_from_dup.get_key_name() + << " to max=" << max.get_key_name() << dendl; + t.omap_rmkeyrange( + coll, log_oid, + dirty_from_dup.get_key_name(), max.get_key_name()); + } + + ldpp_dout(dpp, 10) << __func__ << " going to encode log.dups.size()=" + << log.dups.size() << dendl; + for (const auto& entry : log.dups) { + if (entry.version > dirty_to_dups) + break; + bufferlist bl; + encode(entry, bl); + (*km)[entry.get_key_name()] = std::move(bl); + } + ldpp_dout(dpp, 10) << __func__ << " 1st round encoded log.dups.size()=" + << log.dups.size() << dendl; + + for (auto p = log.dups.rbegin(); + p != log.dups.rend() && + (p->version >= dirty_from_dups || p->version >= write_from_dups) && + p->version >= dirty_to_dups; + ++p) { + bufferlist bl; + encode(*p, bl); + (*km)[p->get_key_name()] = std::move(bl); + } + ldpp_dout(dpp, 10) << __func__ << " 2st round encoded log.dups.size()=" + << log.dups.size() << dendl; + + if (clear_divergent_priors) { + ldpp_dout(dpp, 10) << "write_log_and_missing: writing divergent_priors" + << dendl; + to_remove.insert("divergent_priors"); + } + // since we encode individual missing items instead of a whole + // missing set, we need another key to store this bit of state + if (*may_include_deletes_in_missing_dirty) { + (*km)["may_include_deletes_in_missing"] = bufferlist(); + *may_include_deletes_in_missing_dirty = false; + } + missing.get_changed( + [&](const hobject_t &obj) { + string key = string("missing/") + obj.to_str(); + pg_missing_item item; + if (!missing.is_missing(obj, &item)) { + to_remove.insert(key); + } else { + encode(make_pair(obj, item), (*km)[key], CEPH_FEATUREMASK_SERVER_OCTOPUS); + } + }); + if (require_rollback) { + encode( + log.get_can_rollback_to(), + (*km)["can_rollback_to"]); + encode( + log.get_rollback_info_trimmed_to(), + (*km)["rollback_info_trimmed_to"]); + } + + if (!to_remove.empty()) + t.omap_rmkeys(coll, log_oid, to_remove); + ldpp_dout(dpp, 10) << "end of " << __func__ << dendl; +} + +void PGLog::rebuild_missing_set_with_deletes( + ObjectStore *store, + ObjectStore::CollectionHandle& ch, + const pg_info_t &info) +{ + // save entries not generated from the current log (e.g. added due + // to repair, EIO handling, or divergent_priors). + map<hobject_t, pg_missing_item> extra_missing; + for (const auto& p : missing.get_items()) { + if (!log.logged_object(p.first)) { + dout(20) << __func__ << " extra missing entry: " << p.first + << " " << p.second << dendl; + extra_missing[p.first] = p.second; + } + } + missing.clear(); + + // go through the log and add items that are not present or older + // versions on disk, just as if we were reading the log + metadata + // off disk originally + set<hobject_t> did; + for (auto i = log.log.rbegin(); + i != log.log.rend(); + ++i) { + if (i->version <= info.last_complete) + break; + if (i->soid > info.last_backfill || + i->is_error() || + did.find(i->soid) != did.end()) + continue; + did.insert(i->soid); + + bufferlist bv; + int r = store->getattr( + ch, + ghobject_t(i->soid, ghobject_t::NO_GEN, info.pgid.shard), + OI_ATTR, + bv); + dout(20) << __func__ << " check for log entry: " << *i << " = " << r << dendl; + + if (r >= 0) { + object_info_t oi(bv); + dout(20) << __func__ << " store version = " << oi.version << dendl; + if (oi.version < i->version) { + missing.add(i->soid, i->version, oi.version, i->is_delete()); + } + } else { + missing.add(i->soid, i->version, eversion_t(), i->is_delete()); + } + } + + for (const auto& p : extra_missing) { + missing.add(p.first, p.second.need, p.second.have, p.second.is_delete()); + } + + set_missing_may_contain_deletes(); +} + +#ifdef WITH_SEASTAR + +namespace { + struct FuturizedShardStoreLogReader { + crimson::os::FuturizedStore::Shard &store; + const pg_info_t &info; + PGLog::IndexedLog &log; + std::set<std::string>* log_keys_debug = NULL; + pg_missing_tracker_t &missing; + const DoutPrefixProvider *dpp; + + eversion_t on_disk_can_rollback_to; + eversion_t on_disk_rollback_info_trimmed_to; + + std::map<eversion_t, hobject_t> divergent_priors; + bool must_rebuild = false; + std::list<pg_log_entry_t> entries; + std::list<pg_log_dup_t> dups; + + std::optional<std::string> next; + + void process_entry(const auto& key, const auto& value) { + if (key[0] == '_') + return; + //Copy ceph::buffer::list before creating iterator + auto bl = value; + auto bp = bl.cbegin(); + if (key == "divergent_priors") { + decode(divergent_priors, bp); + ldpp_dout(dpp, 20) << "read_log_and_missing " << divergent_priors.size() + << " divergent_priors" << dendl; + ceph_assert("crimson shouldn't have had divergent_priors" == 0); + } else if (key == "can_rollback_to") { + decode(on_disk_can_rollback_to, bp); + } else if (key == "rollback_info_trimmed_to") { + decode(on_disk_rollback_info_trimmed_to, bp); + } else if (key == "may_include_deletes_in_missing") { + missing.may_include_deletes = true; + } else if (key.substr(0, 7) == std::string("missing")) { + hobject_t oid; + pg_missing_item item; + decode(oid, bp); + decode(item, bp); + if (item.is_delete()) { + ceph_assert(missing.may_include_deletes); + } + missing.add(oid, std::move(item)); + } else if (key.substr(0, 4) == std::string("dup_")) { + pg_log_dup_t dup; + decode(dup, bp); + if (!dups.empty()) { + ceph_assert(dups.back().version < dup.version); + } + dups.push_back(dup); + } else { + pg_log_entry_t e; + e.decode_with_checksum(bp); + ldpp_dout(dpp, 20) << "read_log_and_missing " << e << dendl; + if (!entries.empty()) { + pg_log_entry_t last_e(entries.back()); + ceph_assert(last_e.version.version < e.version.version); + ceph_assert(last_e.version.epoch <= e.version.epoch); + } + entries.push_back(e); + if (log_keys_debug) + log_keys_debug->insert(e.get_key_name()); + } + } + + seastar::future<> read(crimson::os::CollectionRef ch, + ghobject_t pgmeta_oid) { + // will get overridden if recorded + on_disk_can_rollback_to = info.last_update; + missing.may_include_deletes = false; + + return seastar::do_with( + std::move(ch), + std::move(pgmeta_oid), + std::make_optional<std::string>(), + [this](crimson::os::CollectionRef &ch, + ghobject_t &pgmeta_oid, + std::optional<std::string> &start) { + return seastar::repeat([this, &ch, &pgmeta_oid, &start]() { + return store.omap_get_values( + ch, pgmeta_oid, start + ).safe_then([this, &start](const auto& ret) { + const auto& [done, kvs] = ret; + for (const auto& [key, value] : kvs) { + process_entry(key, value); + start = key; + } + return seastar::make_ready_future<seastar::stop_iteration>( + done ? seastar::stop_iteration::yes : seastar::stop_iteration::no + ); + }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all{}); + }).then([this] { + if (info.pgid.is_no_shard()) { + // replicated pool pg does not persist this key + assert(on_disk_rollback_info_trimmed_to == eversion_t()); + on_disk_rollback_info_trimmed_to = info.last_update; + } + log = PGLog::IndexedLog( + info.last_update, + info.log_tail, + on_disk_can_rollback_to, + on_disk_rollback_info_trimmed_to, + std::move(entries), + std::move(dups)); + }); + }); + } + }; +} + +seastar::future<> PGLog::read_log_and_missing_crimson( + crimson::os::FuturizedStore::Shard &store, + crimson::os::CollectionRef ch, + const pg_info_t &info, + IndexedLog &log, + std::set<std::string>* log_keys_debug, + pg_missing_tracker_t &missing, + ghobject_t pgmeta_oid, + const DoutPrefixProvider *dpp) +{ + ldpp_dout(dpp, 20) << "read_log_and_missing coll " + << ch->get_cid() + << " " << pgmeta_oid << dendl; + return seastar::do_with(FuturizedShardStoreLogReader{ + store, info, log, log_keys_debug, + missing, dpp}, + [ch, pgmeta_oid](FuturizedShardStoreLogReader& reader) { + return reader.read(ch, pgmeta_oid); + }); +} + +seastar::future<> PGLog::rebuild_missing_set_with_deletes_crimson( + crimson::os::FuturizedStore::Shard &store, + crimson::os::CollectionRef ch, + const pg_info_t &info) +{ + // save entries not generated from the current log (e.g. added due + // to repair, EIO handling, or divergent_priors). + map<hobject_t, pg_missing_item> extra_missing; + for (const auto& p : missing.get_items()) { + if (!log.logged_object(p.first)) { + ldpp_dout(this, 20) << __func__ << " extra missing entry: " << p.first + << " " << p.second << dendl; + extra_missing[p.first] = p.second; + } + } + missing.clear(); + + // go through the log and add items that are not present or older + // versions on disk, just as if we were reading the log + metadata + // off disk originally + return seastar::do_with( + set<hobject_t>(), + log.log.rbegin(), + [this, &store, ch, &info](auto &did, auto &it) { + return seastar::repeat([this, &store, ch, &info, &it, &did] { + if (it == log.log.rend()) { + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::yes); + } + auto &log_entry = *it; + it++; + if (log_entry.version <= info.last_complete) + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::yes); + if (log_entry.soid > info.last_backfill || + log_entry.is_error() || + did.find(log_entry.soid) != did.end()) + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::no); + did.insert(log_entry.soid); + return store.get_attr( + ch, + ghobject_t(log_entry.soid, ghobject_t::NO_GEN, info.pgid.shard), + OI_ATTR + ).safe_then([this, &log_entry](auto bv) { + object_info_t oi(bv); + ldpp_dout(this, 20) + << "rebuild_missing_set_with_deletes_crimson found obj " + << log_entry.soid + << " version = " << oi.version << dendl; + if (oi.version < log_entry.version) { + ldpp_dout(this, 20) + << "rebuild_missing_set_with_deletes_crimson missing obj " + << log_entry.soid + << " for version = " << log_entry.version << dendl; + missing.add( + log_entry.soid, + log_entry.version, + oi.version, + log_entry.is_delete()); + } + }, + crimson::ct_error::enoent::handle([this, &log_entry] { + ldpp_dout(this, 20) + << "rebuild_missing_set_with_deletes_crimson missing object " + << log_entry.soid << dendl; + missing.add( + log_entry.soid, + log_entry.version, + eversion_t(), + log_entry.is_delete()); + }), + crimson::ct_error::enodata::handle([] { ceph_abort("unexpected enodata"); }) + ).then([] { + return seastar::stop_iteration::no; + }); + }); + }).then([this] { + set_missing_may_contain_deletes(); + }); +} +#endif |