diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/mds/OpenFileTable.cc | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/mds/OpenFileTable.cc')
-rw-r--r-- | src/mds/OpenFileTable.cc | 1228 |
1 files changed, 1228 insertions, 0 deletions
diff --git a/src/mds/OpenFileTable.cc b/src/mds/OpenFileTable.cc new file mode 100644 index 000000000..687ddc3e9 --- /dev/null +++ b/src/mds/OpenFileTable.cc @@ -0,0 +1,1228 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "acconfig.h" +#include "mds/CInode.h" +#include "mds/CDir.h" +#include "mds/MDSRank.h" +#include "mds/MDCache.h" +#include "osdc/Objecter.h" +#include "OpenFileTable.h" + +#include "common/config.h" +#include "common/errno.h" + +enum { + l_oft_first = 1000000, + l_oft_omap_total_objs, + l_oft_omap_total_kv_pairs, + l_oft_omap_total_updates, + l_oft_omap_total_removes, + l_oft_last +}; + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_prefix _prefix(_dout, mds) +static ostream& _prefix(std::ostream *_dout, MDSRank *mds) { + return *_dout << "mds." << mds->get_nodeid() << ".openfiles "; +} + +OpenFileTable::OpenFileTable(MDSRank *m) : mds(m) { + PerfCountersBuilder b(mds->cct, "oft", l_oft_first, l_oft_last); + + b.add_u64(l_oft_omap_total_objs, "omap_total_objs"); + b.add_u64(l_oft_omap_total_kv_pairs, "omap_total_kv_pairs"); + b.add_u64(l_oft_omap_total_updates, "omap_total_updates"); + b.add_u64(l_oft_omap_total_removes, "omap_total_removes"); + logger.reset(b.create_perf_counters()); + mds->cct->get_perfcounters_collection()->add(logger.get()); + logger->set(l_oft_omap_total_objs, 0); + logger->set(l_oft_omap_total_kv_pairs, 0); + logger->set(l_oft_omap_total_updates, 0); + logger->set(l_oft_omap_total_removes, 0); +} + +OpenFileTable::~OpenFileTable() { + if (logger) { + mds->cct->get_perfcounters_collection()->remove(logger.get()); + } +} + +void OpenFileTable::get_ref(CInode *in, frag_t fg) +{ + do { + auto p = anchor_map.find(in->ino()); + if (!in->is_dir()) { + ceph_assert(fg == -1U); + ceph_assert(p == anchor_map.end()); + } + + if (p != anchor_map.end()) { + ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT)); + ceph_assert(p->second.nref > 0); + p->second.nref++; + + if (fg != -1U) { + auto ret = p->second.frags.insert(fg); + ceph_assert(ret.second); + dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF); + } + break; + } + + CDentry *dn = in->get_parent_dn(); + CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr; + + auto ret = anchor_map.emplace(std::piecewise_construct, std::forward_as_tuple(in->ino()), + std::forward_as_tuple(in->ino(), (pin ? pin->ino() : inodeno_t(0)), + (dn ? dn->get_name() : string()), in->d_type(), 1)); + ceph_assert(ret.second == true); + in->state_set(CInode::STATE_TRACKEDBYOFT); + + if (fg != -1U) + ret.first->second.frags.insert(fg); + + auto ret1 = dirty_items.emplace(in->ino(), (int)DIRTY_NEW); + if (!ret1.second) { + int omap_idx = ret1.first->second; + ceph_assert(omap_idx >= 0); + ret.first->second.omap_idx = omap_idx; + } + + in = pin; + fg = -1U; + } while (in); +} + +void OpenFileTable::put_ref(CInode *in, frag_t fg) +{ + do { + ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT)); + auto p = anchor_map.find(in->ino()); + ceph_assert(p != anchor_map.end()); + ceph_assert(p->second.nref > 0); + + if (!in->is_dir()) { + ceph_assert(fg == -1U); + ceph_assert(p->second.nref == 1); + } + + if (p->second.nref > 1) { + p->second.nref--; + if (fg != -1U) { + auto ret = p->second.frags.erase(fg); + ceph_assert(ret); + dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF); + } + break; + } + + CDentry *dn = in->get_parent_dn(); + CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr; + if (dn) { + ceph_assert(p->second.dirino == pin->ino()); + ceph_assert(p->second.d_name == dn->get_name()); + } else { + ceph_assert(p->second.dirino == inodeno_t(0)); + ceph_assert(p->second.d_name == ""); + } + + if (fg != -1U) { + ceph_assert(p->second.frags.size() == 1); + ceph_assert(*p->second.frags.begin() == fg); + } + + int omap_idx = p->second.omap_idx; + anchor_map.erase(p); + in->state_clear(CInode::STATE_TRACKEDBYOFT); + + auto ret = dirty_items.emplace(in->ino(), omap_idx); + if (!ret.second) { + if (ret.first->second == DIRTY_NEW) { + ceph_assert(omap_idx < 0); + dirty_items.erase(ret.first); + } else { + ceph_assert(omap_idx >= 0); + ret.first->second = omap_idx; + } + } + + in = pin; + fg = -1U; + } while (in); +} + +void OpenFileTable::add_inode(CInode *in) +{ + dout(10) << __func__ << " " << *in << dendl; + get_ref(in); +} + +void OpenFileTable::remove_inode(CInode *in) +{ + dout(10) << __func__ << " " << *in << dendl; + put_ref(in); +} + +void OpenFileTable::add_dirfrag(CDir *dir) +{ + dout(10) << __func__ << " " << *dir << dendl; + ceph_assert(!dir->state_test(CDir::STATE_TRACKEDBYOFT)); + dir->state_set(CDir::STATE_TRACKEDBYOFT); + get_ref(dir->get_inode(), dir->get_frag()); +} + +void OpenFileTable::remove_dirfrag(CDir *dir) +{ + dout(10) << __func__ << " " << *dir << dendl; + ceph_assert(dir->state_test(CDir::STATE_TRACKEDBYOFT)); + dir->state_clear(CDir::STATE_TRACKEDBYOFT); + put_ref(dir->get_inode(), dir->get_frag()); +} + +void OpenFileTable::notify_link(CInode *in) +{ + dout(10) << __func__ << " " << *in << dendl; + auto p = anchor_map.find(in->ino()); + ceph_assert(p != anchor_map.end()); + ceph_assert(p->second.nref > 0); + ceph_assert(p->second.dirino == inodeno_t(0)); + ceph_assert(p->second.d_name == ""); + + CDentry *dn = in->get_parent_dn(); + CInode *pin = dn->get_dir()->get_inode(); + + p->second.dirino = pin->ino(); + p->second.d_name = dn->get_name(); + dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF); + + get_ref(pin); +} + +void OpenFileTable::notify_unlink(CInode *in) +{ + dout(10) << __func__ << " " << *in << dendl; + auto p = anchor_map.find(in->ino()); + ceph_assert(p != anchor_map.end()); + ceph_assert(p->second.nref > 0); + + CDentry *dn = in->get_parent_dn(); + CInode *pin = dn->get_dir()->get_inode(); + ceph_assert(p->second.dirino == pin->ino()); + ceph_assert(p->second.d_name == dn->get_name()); + + p->second.dirino = inodeno_t(0); + p->second.d_name = ""; + dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF); + + put_ref(pin); +} + +object_t OpenFileTable::get_object_name(unsigned idx) const +{ + char s[30]; + snprintf(s, sizeof(s), "mds%d_openfiles.%x", int(mds->get_nodeid()), idx); + return object_t(s); +} + +void OpenFileTable::_encode_header(bufferlist &bl, int j_state) +{ + std::string_view magic = CEPH_FS_ONDISK_MAGIC; + encode(magic, bl); + ENCODE_START(1, 1, bl); + encode(omap_version, bl); + encode(omap_num_objs, bl); + encode((__u8)j_state, bl); + ENCODE_FINISH(bl); +} + +class C_IO_OFT_Save : public MDSIOContextBase { +protected: + OpenFileTable *oft; + uint64_t log_seq; + MDSContext *fin; + MDSRank *get_mds() override { return oft->mds; } +public: + C_IO_OFT_Save(OpenFileTable *t, uint64_t s, MDSContext *c) : + oft(t), log_seq(s), fin(c) {} + void finish(int r) { + oft->_commit_finish(r, log_seq, fin); + } + void print(ostream& out) const override { + out << "openfiles_save"; + } +}; + +void OpenFileTable::_commit_finish(int r, uint64_t log_seq, MDSContext *fin) +{ + dout(10) << __func__ << " log_seq " << log_seq << " committed_log_seq " << committed_log_seq + << " committing_log_seq " << committing_log_seq << dendl; + if (r < 0) { + mds->handle_write_error(r); + return; + } + + ceph_assert(log_seq == committing_log_seq); + ceph_assert(log_seq >= committed_log_seq); + committed_log_seq = log_seq; + num_pending_commit--; + + if (fin) + fin->complete(r); +} + +class C_IO_OFT_Journal : public MDSIOContextBase { +protected: + OpenFileTable *oft; + uint64_t log_seq; + MDSContext *fin; + std::map<unsigned, std::vector<ObjectOperation> > ops_map; + MDSRank *get_mds() override { return oft->mds; } +public: + C_IO_OFT_Journal(OpenFileTable *t, uint64_t s, MDSContext *c, + std::map<unsigned, std::vector<ObjectOperation> >& ops) : + oft(t), log_seq(s), fin(c) { + ops_map.swap(ops); + } + void finish(int r) { + oft->_journal_finish(r, log_seq, fin, ops_map); + } + void print(ostream& out) const override { + out << "openfiles_journal"; + } +}; + +void OpenFileTable::_journal_finish(int r, uint64_t log_seq, MDSContext *c, + std::map<unsigned, std::vector<ObjectOperation> >& ops_map) +{ + dout(10) << __func__ << " log_seq " << log_seq << dendl; + if (r < 0) { + mds->handle_write_error(r); + return; + } + + C_GatherBuilder gather(g_ceph_context, + new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c), + mds->finisher)); + SnapContext snapc; + object_locator_t oloc(mds->get_metadata_pool()); + for (auto& [idx, vops] : ops_map) { + object_t oid = get_object_name(idx); + for (auto& op : vops) { + mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), + 0, gather.new_sub()); + } + } + gather.activate(); + + journal_state = JOURNAL_NONE; + return; +} + +void OpenFileTable::commit(MDSContext *c, uint64_t log_seq, int op_prio) +{ + dout(10) << __func__ << " log_seq " << log_seq << " committing_log_seq:" + << committing_log_seq << dendl; + + ceph_assert(num_pending_commit == 0); + num_pending_commit++; + ceph_assert(log_seq >= committing_log_seq); + committing_log_seq = log_seq; + + omap_version++; + + C_GatherBuilder gather(g_ceph_context); + + SnapContext snapc; + object_locator_t oloc(mds->get_metadata_pool()); + + const unsigned max_write_size = mds->mdcache->max_dir_commit_size; + + struct omap_update_ctl { + unsigned write_size = 0; + unsigned journal_idx = 0; + bool clear = false; + std::map<string, bufferlist> to_update, journaled_update; + std::set<string> to_remove, journaled_remove; + }; + std::vector<omap_update_ctl> omap_updates(omap_num_objs); + + using ceph::encode; + auto journal_func = [&](unsigned idx) { + auto& ctl = omap_updates.at(idx); + + ObjectOperation op; + op.priority = op_prio; + + if (ctl.clear) { + ctl.clear = false; + op.omap_clear(); + op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK); + } + + if (ctl.journal_idx == 0) { + if (journal_state == JOURNAL_NONE) + journal_state = JOURNAL_START; + else + ceph_assert(journal_state == JOURNAL_START); + + bufferlist header; + _encode_header(header, journal_state); + op.omap_set_header(header); + } + + bufferlist bl; + encode(omap_version, bl); + encode(ctl.to_update, bl); + encode(ctl.to_remove, bl); + + char key[32]; + snprintf(key, sizeof(key), "_journal.%x", ctl.journal_idx++); + std::map<string, bufferlist> tmp_map; + tmp_map[key].swap(bl); + op.omap_set(tmp_map); + + object_t oid = get_object_name(idx); + mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), 0, + gather.new_sub()); + +#ifdef HAVE_STDLIB_MAP_SPLICING + ctl.journaled_update.merge(ctl.to_update); + ctl.journaled_remove.merge(ctl.to_remove); +#else + ctl.journaled_update.insert(make_move_iterator(begin(ctl.to_update)), + make_move_iterator(end(ctl.to_update))); + ctl.journaled_remove.insert(make_move_iterator(begin(ctl.to_remove)), + make_move_iterator(end(ctl.to_remove))); +#endif + ctl.to_update.clear(); + ctl.to_remove.clear(); + }; + + std::map<unsigned, std::vector<ObjectOperation> > ops_map; + + auto create_op_func = [&](unsigned idx, bool update_header) { + auto& ctl = omap_updates.at(idx); + + auto& op_vec = ops_map[idx]; + op_vec.resize(op_vec.size() + 1); + ObjectOperation& op = op_vec.back(); + op.priority = op_prio; + + if (ctl.clear) { + ctl.clear = false; + op.omap_clear(); + op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK); + } + + if (update_header) { + bufferlist header; + _encode_header(header, journal_state); + op.omap_set_header(header); + } + + if (!ctl.to_update.empty()) { + op.omap_set(ctl.to_update); + ctl.to_update.clear(); + } + if (!ctl.to_remove.empty()) { + op.omap_rm_keys(ctl.to_remove); + ctl.to_remove.clear(); + } + }; + + auto submit_ops_func = [&]() { + gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c), + mds->finisher)); + for (auto& [idx, vops] : ops_map) { + object_t oid = get_object_name(idx); + for (auto& op : vops) { + mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), + 0, gather.new_sub()); + } + } + gather.activate(); + }; + + bool first_commit = !loaded_anchor_map.empty(); + + unsigned first_free_idx = 0; + unsigned old_num_objs = omap_num_objs; + if (omap_num_objs == 0) { + omap_num_objs = 1; + omap_num_items.resize(omap_num_objs); + omap_updates.resize(omap_num_objs); + omap_updates.back().clear = true; + } + + for (auto& [ino, state] : dirty_items) { + auto p = anchor_map.find(ino); + + if (first_commit) { + auto q = loaded_anchor_map.find(ino); + if (q != loaded_anchor_map.end()) { + ceph_assert(p != anchor_map.end()); + p->second.omap_idx = q->second.omap_idx; + bool same = (p->second == q->second); + loaded_anchor_map.erase(q); + if (same) + continue; + } + } + + char key[32]; + int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)ino.val); + + int omap_idx; + if (p != anchor_map.end()) { + omap_idx = p->second.omap_idx; + if (omap_idx < 0) { + ceph_assert(state == DIRTY_NEW); + // find omap object to store the key + for (unsigned i = first_free_idx; i < omap_num_objs; i++) { + if (omap_num_items[i] < MAX_ITEMS_PER_OBJ) { + omap_idx = i; + break; + } + } + if (omap_idx < 0) { + ++omap_num_objs; + ceph_assert(omap_num_objs <= MAX_OBJECTS); + omap_num_items.resize(omap_num_objs); + omap_updates.resize(omap_num_objs); + omap_updates.back().clear = true; + omap_idx = omap_num_objs - 1; + } + first_free_idx = omap_idx; + + p->second.omap_idx = omap_idx; + ++omap_num_items[omap_idx]; + } + } else { + omap_idx = state; + unsigned& count = omap_num_items.at(omap_idx); + ceph_assert(count > 0); + --count; + if ((unsigned)omap_idx < first_free_idx && count < MAX_ITEMS_PER_OBJ) + first_free_idx = omap_idx; + } + auto& ctl = omap_updates.at(omap_idx); + if (ctl.write_size >= max_write_size) { + journal_func(omap_idx); + ctl.write_size = 0; + } + if (p != anchor_map.end()) { + bufferlist bl; + encode(p->second, bl); + encode((__u32)0, bl); // frags set was encoded here + + ctl.write_size += bl.length() + len + 2 * sizeof(__u32); + ctl.to_update[key].swap(bl); + } else { + ctl.write_size += len + sizeof(__u32); + ctl.to_remove.emplace(key); + } + } + + dirty_items.clear(); + + if (first_commit) { + for (auto& [ino, anchor] : loaded_anchor_map) { + char key[32]; + int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)ino.val); + + int omap_idx = anchor.omap_idx; + unsigned& count = omap_num_items.at(omap_idx); + ceph_assert(count > 0); + --count; + + auto& ctl = omap_updates.at(omap_idx); + if (ctl.write_size >= max_write_size) { + journal_func(omap_idx); + ctl.write_size = 0; + } + ctl.write_size += len + sizeof(__u32); + ctl.to_remove.emplace(key); + } + loaded_anchor_map.clear(); + } + + size_t total_items = 0; + { + unsigned used_objs = 1; + std::vector<unsigned> objs_to_write; + bool journaled = false; + for (unsigned i = 0; i < omap_num_objs; i++) { + total_items += omap_num_items[i]; + if (omap_updates[i].journal_idx) + journaled = true; + else if (omap_updates[i].write_size) + objs_to_write.push_back(i); + + if (omap_num_items[i] > 0) + used_objs = i + 1; + } + ceph_assert(total_items == anchor_map.size()); + // adjust omap object count + if (used_objs < omap_num_objs) { + omap_num_objs = used_objs; + omap_num_items.resize(omap_num_objs); + } + // skip journal if only one osd request is required and object count + // does not change. + if (!journaled && old_num_objs == omap_num_objs && + objs_to_write.size() <= 1) { + ceph_assert(journal_state == JOURNAL_NONE); + ceph_assert(!gather.has_subs()); + + unsigned omap_idx = objs_to_write.empty() ? 0 : objs_to_write.front(); + create_op_func(omap_idx, true); + submit_ops_func(); + return; + } + } + + for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) { + auto& ctl = omap_updates[omap_idx]; + if (ctl.write_size > 0) { + journal_func(omap_idx); + ctl.write_size = 0; + } + } + + if (journal_state == JOURNAL_START) { + ceph_assert(gather.has_subs()); + journal_state = JOURNAL_FINISH; + } else { + // only object count changes + ceph_assert(journal_state == JOURNAL_NONE); + ceph_assert(!gather.has_subs()); + } + + uint64_t total_updates = 0; + uint64_t total_removes = 0; + + for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) { + auto& ctl = omap_updates[omap_idx]; + ceph_assert(ctl.to_update.empty() && ctl.to_remove.empty()); + if (ctl.journal_idx == 0) + ceph_assert(ctl.journaled_update.empty() && ctl.journaled_remove.empty()); + + bool first = true; + for (auto& it : ctl.journaled_update) { + if (ctl.write_size >= max_write_size) { + create_op_func(omap_idx, first); + ctl.write_size = 0; + first = false; + } + ctl.write_size += it.first.length() + it.second.length() + 2 * sizeof(__u32); + ctl.to_update[it.first].swap(it.second); + total_updates++; + } + + for (auto& key : ctl.journaled_remove) { + if (ctl.write_size >= max_write_size) { + create_op_func(omap_idx, first); + ctl.write_size = 0; + first = false; + } + + ctl.write_size += key.length() + sizeof(__u32); + ctl.to_remove.emplace(key); + total_removes++; + } + + for (unsigned i = 0; i < ctl.journal_idx; ++i) { + char key[32]; + snprintf(key, sizeof(key), "_journal.%x", i); + ctl.to_remove.emplace(key); + } + + // update first object's omap header if object count changes + if (ctl.clear || + ctl.journal_idx > 0 || + (omap_idx == 0 && old_num_objs != omap_num_objs)) + create_op_func(omap_idx, first); + } + + ceph_assert(!ops_map.empty()); + if (journal_state == JOURNAL_FINISH) { + gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Journal(this, log_seq, c, ops_map), + mds->finisher)); + gather.activate(); + } else { + submit_ops_func(); + } + logger->set(l_oft_omap_total_objs, omap_num_objs); + logger->set(l_oft_omap_total_kv_pairs, total_items); + logger->inc(l_oft_omap_total_updates, total_updates); + logger->inc(l_oft_omap_total_removes, total_removes); +} + +class C_IO_OFT_Load : public MDSIOContextBase { +protected: + OpenFileTable *oft; + MDSRank *get_mds() override { return oft->mds; } + +public: + int header_r = 0; //< Return value from OMAP header read + int values_r = 0; //< Return value from OMAP value read + bufferlist header_bl; + std::map<std::string, bufferlist> values; + unsigned index; + bool first; + bool more = false; + + C_IO_OFT_Load(OpenFileTable *t, unsigned i, bool f) : + oft(t), index(i), first(f) {} + void finish(int r) override { + oft->_load_finish(r, header_r, values_r, index, first, more, header_bl, values); + } + void print(ostream& out) const override { + out << "openfiles_load"; + } +}; + +class C_IO_OFT_Recover : public MDSIOContextBase { +protected: + OpenFileTable *oft; + MDSRank *get_mds() override { return oft->mds; } +public: + C_IO_OFT_Recover(OpenFileTable *t) : oft(t) {} + void finish(int r) override { + oft->_recover_finish(r); + } + void print(ostream& out) const override { + out << "openfiles_recover"; + } +}; + +void OpenFileTable::_recover_finish(int r) +{ + if (r < 0) { + derr << __func__ << " got " << cpp_strerror(r) << dendl; + _reset_states(); + } else { + dout(10) << __func__ << ": load complete" << dendl; + } + + journal_state = JOURNAL_NONE; + load_done = true; + finish_contexts(g_ceph_context, waiting_for_load); + waiting_for_load.clear(); +} + +void OpenFileTable::_read_omap_values(const std::string& key, unsigned idx, + bool first) +{ + object_t oid = get_object_name(idx); + dout(10) << __func__ << ": load from '" << oid << ":" << key << "'" << dendl; + object_locator_t oloc(mds->get_metadata_pool()); + C_IO_OFT_Load *c = new C_IO_OFT_Load(this, idx, first); + ObjectOperation op; + if (first) + op.omap_get_header(&c->header_bl, &c->header_r); + op.omap_get_vals(key, "", uint64_t(-1), + &c->values, &c->more, &c->values_r); + mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, nullptr, 0, + new C_OnFinisher(c, mds->finisher)); +} + +void OpenFileTable::_load_finish(int op_r, int header_r, int values_r, + unsigned idx, bool first, bool more, + bufferlist &header_bl, + std::map<std::string, bufferlist> &values) +{ + using ceph::decode; + int err = -CEPHFS_EINVAL; + + auto decode_func = [this](unsigned idx, inodeno_t ino, bufferlist &bl) { + auto p = bl.cbegin(); + + size_t count = loaded_anchor_map.size(); + auto it = loaded_anchor_map.emplace_hint(loaded_anchor_map.end(), + std::piecewise_construct, + std::make_tuple(ino), + std::make_tuple()); + RecoveredAnchor& anchor = it->second; + decode(anchor, p); + frag_vec_t frags; // unused + decode(frags, p); + ceph_assert(ino == anchor.ino); + anchor.omap_idx = idx; + anchor.auth = MDS_RANK_NONE; + + + if (loaded_anchor_map.size() > count) + ++omap_num_items[idx]; + }; + + if (op_r < 0) { + derr << __func__ << " got " << cpp_strerror(op_r) << dendl; + err = op_r; + goto out; + } + + try { + if (first) { + auto p = header_bl.cbegin(); + + string magic; + version_t version; + unsigned num_objs; + __u8 jstate; + + if (header_bl.length() == 13) { + // obsolete format. + decode(version, p); + decode(num_objs, p); + decode(jstate, p); + } else { + decode(magic, p); + if (magic != CEPH_FS_ONDISK_MAGIC) { + CachedStackStringStream css; + *css << "invalid magic '" << magic << "'"; + throw buffer::malformed_input(css->str()); + } + + DECODE_START(1, p); + decode(version, p); + decode(num_objs, p); + decode(jstate, p); + DECODE_FINISH(p); + } + + if (num_objs > MAX_OBJECTS) { + CachedStackStringStream css; + *css << "invalid object count '" << num_objs << "'"; + throw buffer::malformed_input(css->str()); + } + if (jstate > JOURNAL_FINISH) { + CachedStackStringStream css; + *css << "invalid journal state '" << jstate << "'"; + throw buffer::malformed_input(css->str()); + } + + if (version > omap_version) { + omap_version = version; + omap_num_objs = num_objs; + omap_num_items.resize(omap_num_objs); + journal_state = jstate; + } else if (version == omap_version) { + ceph_assert(omap_num_objs == num_objs); + if (jstate > journal_state) + journal_state = jstate; + } + } + + for (auto& it : values) { + if (it.first.compare(0, 9, "_journal.") == 0) { + if (idx >= loaded_journals.size()) + loaded_journals.resize(idx + 1); + + if (journal_state == JOURNAL_FINISH) { + loaded_journals[idx][it.first].swap(it.second); + } else { // incomplete journal + loaded_journals[idx][it.first].length(); + } + continue; + } + + inodeno_t ino; + sscanf(it.first.c_str(), "%llx", (unsigned long long*)&ino.val); + decode_func(idx, ino, it.second); + } + } catch (buffer::error &e) { + derr << __func__ << ": corrupted header/values: " << e.what() << dendl; + goto out; + } + + if (more || idx + 1 < omap_num_objs) { + // Issue another read if we're not at the end of the omap + std::string last_key; + if (more) + last_key = values.rbegin()->first; + else + idx++; + + _read_omap_values(last_key, idx, !more); + return; + } + + // replay journal + if (loaded_journals.size() > 0) { + dout(10) << __func__ << ": recover journal" << dendl; + + C_GatherBuilder gather(g_ceph_context, + new C_OnFinisher(new C_IO_OFT_Recover(this), + mds->finisher)); + object_locator_t oloc(mds->get_metadata_pool()); + SnapContext snapc; + + for (unsigned omap_idx = 0; omap_idx < loaded_journals.size(); omap_idx++) { + auto& loaded_journal = loaded_journals[omap_idx]; + + std::vector<ObjectOperation> op_vec; + try { + for (auto& it : loaded_journal) { + if (journal_state != JOURNAL_FINISH) + continue; + auto p = it.second.cbegin(); + version_t version; + std::map<string, bufferlist> to_update; + std::set<string> to_remove; + decode(version, p); + if (version != omap_version) + continue; + decode(to_update, p); + decode(to_remove, p); + it.second.clear(); + + for (auto& q : to_update) { + inodeno_t ino; + sscanf(q.first.c_str(), "%llx", (unsigned long long*)&ino.val); + decode_func(omap_idx, ino, q.second); + } + for (auto& q : to_remove) { + inodeno_t ino; + sscanf(q.c_str(), "%llx",(unsigned long long*)&ino.val); + ceph_assert(ino.val > 0); + if (loaded_anchor_map.erase(ino)) { + unsigned& count = omap_num_items[omap_idx]; + ceph_assert(count > 0); + --count; + } + } + + op_vec.resize(op_vec.size() + 1); + ObjectOperation& op = op_vec.back(); + op.priority = CEPH_MSG_PRIO_HIGH; + if (!to_update.empty()) + op.omap_set(to_update); + if (!to_remove.empty()) + op.omap_rm_keys(to_remove); + } + } catch (buffer::error &e) { + derr << __func__ << ": corrupted journal: " << e.what() << dendl; + goto out; + } + + op_vec.resize(op_vec.size() + 1); + ObjectOperation& op = op_vec.back(); + { + bufferlist header; + if (journal_state == JOURNAL_FINISH) + _encode_header(header, JOURNAL_FINISH); + else + _encode_header(header, JOURNAL_NONE); + op.omap_set_header(header); + } + { + // remove journal + std::set<string> to_remove; + for (auto &it : loaded_journal) + to_remove.emplace(it.first); + op.omap_rm_keys(to_remove); + } + loaded_journal.clear(); + + object_t oid = get_object_name(omap_idx); + for (auto& op : op_vec) { + mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), + 0, gather.new_sub()); + } + } + gather.activate(); + return; + } + + journal_state = JOURNAL_NONE; + err = 0; + dout(10) << __func__ << ": load complete" << dendl; +out: + + if (err < 0) + _reset_states(); + + load_done = true; + finish_contexts(g_ceph_context, waiting_for_load); + waiting_for_load.clear(); +} + +void OpenFileTable::load(MDSContext *onload) +{ + dout(10) << __func__ << dendl; + ceph_assert(!load_done); + if (onload) + waiting_for_load.push_back(onload); + + _read_omap_values("", 0, true); +} + +void OpenFileTable::_get_ancestors(const Anchor& parent, + vector<inode_backpointer_t>& ancestors, + mds_rank_t& auth_hint) +{ + inodeno_t dirino = parent.dirino; + std::string_view d_name = parent.d_name; + + bool first = true; + ancestors.clear(); + while (true) { + ancestors.push_back(inode_backpointer_t(dirino, string{d_name}, 0)); + + auto p = loaded_anchor_map.find(dirino); + if (p == loaded_anchor_map.end()) + break; + + if (first) + auth_hint = p->second.auth; + + dirino = p->second.dirino; + d_name = p->second.d_name; + if (dirino == inodeno_t(0)) + break; + + first = false; + } +} + +class C_OFT_OpenInoFinish: public MDSContext { + OpenFileTable *oft; + inodeno_t ino; + MDSRank *get_mds() override { return oft->mds; } +public: + C_OFT_OpenInoFinish(OpenFileTable *t, inodeno_t i) : oft(t), ino(i) {} + void finish(int r) override { + oft->_open_ino_finish(ino, r); + } +}; + +void OpenFileTable::_open_ino_finish(inodeno_t ino, int r) +{ + if (prefetch_state == DIR_INODES && r >= 0 && ino != inodeno_t(0)) { + auto p = loaded_anchor_map.find(ino); + ceph_assert(p != loaded_anchor_map.end()); + p->second.auth = mds_rank_t(r); + } + + if (r != mds->get_nodeid()) + mds->mdcache->rejoin_prefetch_ino_finish(ino, r); + + num_opening_inodes--; + if (num_opening_inodes == 0) { + if (prefetch_state == DIR_INODES) { + if (g_conf().get_val<bool>("mds_oft_prefetch_dirfrags")) { + prefetch_state = DIRFRAGS; + _prefetch_dirfrags(); + } else { + prefetch_state = FILE_INODES; + _prefetch_inodes(); + } + } else if (prefetch_state == FILE_INODES) { + prefetch_state = DONE; + logseg_destroyed_inos.clear(); + destroyed_inos_set.clear(); + finish_contexts(g_ceph_context, waiting_for_prefetch); + waiting_for_prefetch.clear(); + } else { + ceph_abort(); + } + } +} + +void OpenFileTable::_prefetch_dirfrags() +{ + dout(10) << __func__ << dendl; + ceph_assert(prefetch_state == DIRFRAGS); + + MDCache *mdcache = mds->mdcache; + std::vector<CDir*> fetch_queue; + + for (auto& [ino, anchor] : loaded_anchor_map) { + if (anchor.frags.empty()) + continue; + CInode *diri = mdcache->get_inode(ino); + if (!diri) + continue; + + if (!diri->is_dir()) { + dout(10) << " " << *diri << " is not dir" << dendl; + continue; + } + + if (diri->state_test(CInode::STATE_REJOINUNDEF)) + continue; + + for (auto& fg: anchor.frags) { + CDir *dir = diri->get_dirfrag(fg); + if (dir) { + if (dir->is_auth() && !dir->is_complete()) + fetch_queue.push_back(dir); + } else { + frag_vec_t leaves; + diri->dirfragtree.get_leaves_under(fg, leaves); + for (auto& leaf : leaves) { + if (diri->is_auth()) { + dir = diri->get_or_open_dirfrag(mdcache, leaf); + } else { + dir = diri->get_dirfrag(leaf); + } + if (dir && dir->is_auth() && !dir->is_complete()) + fetch_queue.push_back(dir); + } + } + } + } + + MDSGatherBuilder gather(g_ceph_context); + int num_opening_dirfrags = 0; + for (const auto& dir : fetch_queue) { + if (dir->state_test(CDir::STATE_REJOINUNDEF)) + ceph_assert(dir->get_inode()->dirfragtree.is_leaf(dir->get_frag())); + dir->fetch(gather.new_sub()); + + if (!(++num_opening_dirfrags % mds->heartbeat_reset_grace())) + mds->heartbeat_reset(); + } + + auto finish_func = [this](int r) { + prefetch_state = FILE_INODES; + _prefetch_inodes(); + }; + if (gather.has_subs()) { + gather.set_finisher( + new MDSInternalContextWrapper(mds, + new LambdaContext(std::move(finish_func)))); + gather.activate(); + } else { + finish_func(0); + } +} + +void OpenFileTable::_prefetch_inodes() +{ + dout(10) << __func__ << " state " << prefetch_state << dendl; + ceph_assert(!num_opening_inodes); + num_opening_inodes = 1; + + int64_t pool; + if (prefetch_state == DIR_INODES) + pool = mds->get_metadata_pool(); + else if (prefetch_state == FILE_INODES) + pool = mds->mdsmap->get_first_data_pool(); + else + ceph_abort(); + + MDCache *mdcache = mds->mdcache; + + if (destroyed_inos_set.empty()) { + for (auto& it : logseg_destroyed_inos) + destroyed_inos_set.insert(it.second.begin(), it.second.end()); + } + + for (auto& [ino, anchor] : loaded_anchor_map) { + if (destroyed_inos_set.count(ino)) + continue; + if (anchor.d_type == DT_DIR) { + if (prefetch_state != DIR_INODES) + continue; + if (MDS_INO_IS_MDSDIR(ino)) { + anchor.auth = MDS_INO_MDSDIR_OWNER(ino); + continue; + } + if (MDS_INO_IS_STRAY(ino)) { + anchor.auth = MDS_INO_STRAY_OWNER(ino); + continue; + } + } else { + if (prefetch_state != FILE_INODES) + continue; + // load all file inodes for MDCache::identify_files_to_recover() + } + CInode *in = mdcache->get_inode(ino); + if (in) + continue; + + num_opening_inodes++; + + auto fin = new C_OFT_OpenInoFinish(this, ino); + if (anchor.dirino != inodeno_t(0)) { + vector<inode_backpointer_t> ancestors; + mds_rank_t auth_hint = MDS_RANK_NONE; + _get_ancestors(anchor, ancestors, auth_hint); + mdcache->open_ino(ino, pool, fin, false, false, &ancestors, auth_hint); + } else { + mdcache->open_ino(ino, pool, fin, false); + } + + if (!(num_opening_inodes % mds->heartbeat_reset_grace())) + mds->heartbeat_reset(); + } + + _open_ino_finish(inodeno_t(0), 0); +} + +bool OpenFileTable::prefetch_inodes() +{ + dout(10) << __func__ << dendl; + ceph_assert(!prefetch_state); + prefetch_state = DIR_INODES; + + if (!load_done) { + wait_for_load( + new MDSInternalContextWrapper(mds, + new LambdaContext([this](int r) { + _prefetch_inodes(); + }) + ) + ); + return true; + } + + _prefetch_inodes(); + return !is_prefetched(); +} + +bool OpenFileTable::should_log_open(CInode *in) +{ + if (in->state_test(CInode::STATE_TRACKEDBYOFT)) { + // inode just journaled + if (in->last_journaled >= committing_log_seq) + return false; + // item not dirty. it means the item has already been saved + auto p = dirty_items.find(in->ino()); + if (p == dirty_items.end()) + return false; + } + return true; +} + +void OpenFileTable::note_destroyed_inos(uint64_t seq, const vector<inodeno_t>& inos) +{ + auto& vec = logseg_destroyed_inos[seq]; + vec.insert(vec.end(), inos.begin(), inos.end()); +} + +void OpenFileTable::trim_destroyed_inos(uint64_t seq) +{ + auto p = logseg_destroyed_inos.begin(); + while (p != logseg_destroyed_inos.end()) { + if (p->first >= seq) + break; + logseg_destroyed_inos.erase(p++); + } +} |