summaryrefslogtreecommitdiffstats
path: root/src/mds/journal.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/mds/journal.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/mds/journal.cc')
-rw-r--r--src/mds/journal.cc3336
1 files changed, 3336 insertions, 0 deletions
diff --git a/src/mds/journal.cc b/src/mds/journal.cc
new file mode 100644
index 000000000..4b9e73b5a
--- /dev/null
+++ b/src/mds/journal.cc
@@ -0,0 +1,3336 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/config.h"
+#include "osdc/Journaler.h"
+#include "events/ESubtreeMap.h"
+#include "events/ESession.h"
+#include "events/ESessions.h"
+
+#include "events/EMetaBlob.h"
+#include "events/EResetJournal.h"
+#include "events/ENoOp.h"
+
+#include "events/EUpdate.h"
+#include "events/EPeerUpdate.h"
+#include "events/EOpen.h"
+#include "events/ECommitted.h"
+#include "events/EPurged.h"
+
+#include "events/EExport.h"
+#include "events/EImportStart.h"
+#include "events/EImportFinish.h"
+#include "events/EFragment.h"
+
+#include "events/ETableClient.h"
+#include "events/ETableServer.h"
+
+#include "include/stringify.h"
+
+#include "LogSegment.h"
+
+#include "MDSRank.h"
+#include "MDLog.h"
+#include "MDCache.h"
+#include "Server.h"
+#include "Migrator.h"
+#include "Mutation.h"
+
+#include "InoTable.h"
+#include "MDSTableClient.h"
+#include "MDSTableServer.h"
+
+#include "Locker.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".journal "
+
+using std::list;
+using std::map;
+using std::ostream;
+using std::pair;
+using std::set;
+using std::string;
+using std::vector;
+
+// -----------------------
+// LogSegment
+
+struct BatchStoredBacktrace : public MDSIOContext {
+ MDSContext *fin;
+ std::vector<CInodeCommitOperations> ops_vec;
+
+ BatchStoredBacktrace(MDSRank *m, MDSContext *f,
+ std::vector<CInodeCommitOperations>&& ops) :
+ MDSIOContext(m), fin(f), ops_vec(std::move(ops)) {}
+ void finish(int r) override {
+ for (auto& op : ops_vec) {
+ op.in->_stored_backtrace(r, op.version, nullptr);
+ }
+ fin->complete(r);
+ }
+ void print(ostream& out) const override {
+ out << "batch backtrace_store";
+ }
+};
+
+struct BatchCommitBacktrace : public Context {
+ MDSRank *mds;
+ MDSContext *fin;
+ std::vector<CInodeCommitOperations> ops_vec;
+
+ BatchCommitBacktrace(MDSRank *m, MDSContext *f,
+ std::vector<CInodeCommitOperations>&& ops) :
+ mds(m), fin(f), ops_vec(std::move(ops)) {}
+ void finish(int r) override {
+ C_GatherBuilder gather(g_ceph_context);
+
+ for (auto &op : ops_vec) {
+ op.in->_commit_ops(r, gather, op.ops_vec, op.bt);
+ op.ops_vec.clear();
+ op.bt.clear();
+ }
+ ceph_assert(gather.has_subs());
+ gather.set_finisher(new C_OnFinisher(
+ new BatchStoredBacktrace(mds, fin, std::move(ops_vec)),
+ mds->finisher));
+ gather.activate();
+ }
+};
+
+void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int op_prio)
+{
+ set<CDir*> commit;
+
+ dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire" << dendl;
+
+ ceph_assert(g_conf()->mds_kill_journal_expire_at != 1);
+
+ // commit dirs
+ for (elist<CDir*>::iterator p = new_dirfrags.begin(); !p.end(); ++p) {
+ dout(20) << " new_dirfrag " << **p << dendl;
+ ceph_assert((*p)->is_auth());
+ commit.insert(*p);
+ }
+ for (elist<CDir*>::iterator p = dirty_dirfrags.begin(); !p.end(); ++p) {
+ dout(20) << " dirty_dirfrag " << **p << dendl;
+ ceph_assert((*p)->is_auth());
+ commit.insert(*p);
+ }
+ for (elist<CDentry*>::iterator p = dirty_dentries.begin(); !p.end(); ++p) {
+ dout(20) << " dirty_dentry " << **p << dendl;
+ ceph_assert((*p)->is_auth());
+ commit.insert((*p)->get_dir());
+ }
+ for (elist<CInode*>::iterator p = dirty_inodes.begin(); !p.end(); ++p) {
+ dout(20) << " dirty_inode " << **p << dendl;
+ ceph_assert((*p)->is_auth());
+ if ((*p)->is_base()) {
+ (*p)->store(gather_bld.new_sub());
+ } else
+ commit.insert((*p)->get_parent_dn()->get_dir());
+ }
+
+ if (!commit.empty()) {
+ for (set<CDir*>::iterator p = commit.begin();
+ p != commit.end();
+ ++p) {
+ CDir *dir = *p;
+ ceph_assert(dir->is_auth());
+ if (dir->can_auth_pin()) {
+ dout(15) << "try_to_expire committing " << *dir << dendl;
+ dir->commit(0, gather_bld.new_sub(), false, op_prio);
+ } else {
+ dout(15) << "try_to_expire waiting for unfreeze on " << *dir << dendl;
+ dir->add_waiter(CDir::WAIT_UNFREEZE, gather_bld.new_sub());
+ }
+ }
+ }
+
+ // leader ops with possibly uncommitted peers
+ for (set<metareqid_t>::iterator p = uncommitted_leaders.begin();
+ p != uncommitted_leaders.end();
+ ++p) {
+ dout(10) << "try_to_expire waiting for peers to ack commit on " << *p << dendl;
+ mds->mdcache->wait_for_uncommitted_leader(*p, gather_bld.new_sub());
+ }
+
+ // peer ops that haven't been committed
+ for (set<metareqid_t>::iterator p = uncommitted_peers.begin();
+ p != uncommitted_peers.end();
+ ++p) {
+ dout(10) << "try_to_expire waiting for leader to ack OP_FINISH on " << *p << dendl;
+ mds->mdcache->wait_for_uncommitted_peer(*p, gather_bld.new_sub());
+ }
+
+ // uncommitted fragments
+ for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
+ p != uncommitted_fragments.end();
+ ++p) {
+ dout(10) << "try_to_expire waiting for uncommitted fragment " << *p << dendl;
+ mds->mdcache->wait_for_uncommitted_fragment(*p, gather_bld.new_sub());
+ }
+
+ // nudge scatterlocks
+ for (elist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) {
+ CInode *in = *p;
+ dout(10) << "try_to_expire waiting for dirlock flush on " << *in << dendl;
+ mds->locker->scatter_nudge(&in->filelock, gather_bld.new_sub());
+ }
+ for (elist<CInode*>::iterator p = dirty_dirfrag_dirfragtree.begin(); !p.end(); ++p) {
+ CInode *in = *p;
+ dout(10) << "try_to_expire waiting for dirfragtreelock flush on " << *in << dendl;
+ mds->locker->scatter_nudge(&in->dirfragtreelock, gather_bld.new_sub());
+ }
+ for (elist<CInode*>::iterator p = dirty_dirfrag_nest.begin(); !p.end(); ++p) {
+ CInode *in = *p;
+ dout(10) << "try_to_expire waiting for nest flush on " << *in << dendl;
+ mds->locker->scatter_nudge(&in->nestlock, gather_bld.new_sub());
+ }
+
+ ceph_assert(g_conf()->mds_kill_journal_expire_at != 2);
+
+ // open files and snap inodes
+ if (!open_files.empty()) {
+ ceph_assert(!mds->mdlog->is_capped()); // hmm FIXME
+ EOpen *le = 0;
+ LogSegment *ls = mds->mdlog->get_current_segment();
+ ceph_assert(ls != this);
+ elist<CInode*>::iterator p = open_files.begin(member_offset(CInode, item_open_file));
+ while (!p.end()) {
+ CInode *in = *p;
+ ++p;
+ if (in->last != CEPH_NOSNAP && in->is_auth() && !in->client_snap_caps.empty()) {
+ // journal snap inodes that need flush. This simplify the mds failover hanlding
+ dout(20) << "try_to_expire requeueing snap needflush inode " << *in << dendl;
+ if (!le) {
+ le = new EOpen(mds->mdlog);
+ mds->mdlog->start_entry(le);
+ }
+ le->add_clean_inode(in);
+ ls->open_files.push_back(&in->item_open_file);
+ } else {
+ // open files are tracked by open file table, no need to journal them again
+ in->item_open_file.remove_myself();
+ }
+ }
+ if (le) {
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_safe(gather_bld.new_sub());
+ dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
+ }
+ }
+
+ ceph_assert(g_conf()->mds_kill_journal_expire_at != 3);
+
+ size_t count = 0;
+ for (elist<CInode*>::iterator it = dirty_parent_inodes.begin(); !it.end(); ++it)
+ count++;
+
+ std::vector<CInodeCommitOperations> ops_vec;
+ ops_vec.reserve(count);
+ // backtraces to be stored/updated
+ for (elist<CInode*>::iterator p = dirty_parent_inodes.begin(); !p.end(); ++p) {
+ CInode *in = *p;
+ ceph_assert(in->is_auth());
+ if (in->can_auth_pin()) {
+ dout(15) << "try_to_expire waiting for storing backtrace on " << *in << dendl;
+ ops_vec.resize(ops_vec.size() + 1);
+ in->store_backtrace(ops_vec.back(), op_prio);
+ } else {
+ dout(15) << "try_to_expire waiting for unfreeze on " << *in << dendl;
+ in->add_waiter(CInode::WAIT_UNFREEZE, gather_bld.new_sub());
+ }
+ }
+ if (!ops_vec.empty())
+ mds->finisher->queue(new BatchCommitBacktrace(mds, gather_bld.new_sub(), std::move(ops_vec)));
+
+ ceph_assert(g_conf()->mds_kill_journal_expire_at != 4);
+
+ // idalloc
+ if (inotablev > mds->inotable->get_committed_version()) {
+ dout(10) << "try_to_expire saving inotable table, need " << inotablev
+ << ", committed is " << mds->inotable->get_committed_version()
+ << " (" << mds->inotable->get_committing_version() << ")"
+ << dendl;
+ mds->inotable->save(gather_bld.new_sub(), inotablev);
+ }
+
+ // sessionmap
+ if (sessionmapv > mds->sessionmap.get_committed()) {
+ dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv
+ << ", committed is " << mds->sessionmap.get_committed()
+ << " (" << mds->sessionmap.get_committing() << ")"
+ << dendl;
+ mds->sessionmap.save(gather_bld.new_sub(), sessionmapv);
+ }
+
+ // updates to sessions for completed_requests
+ mds->sessionmap.save_if_dirty(touched_sessions, &gather_bld);
+ touched_sessions.clear();
+
+ // pending commit atids
+ for (map<int, ceph::unordered_set<version_t> >::iterator p = pending_commit_tids.begin();
+ p != pending_commit_tids.end();
+ ++p) {
+ MDSTableClient *client = mds->get_table_client(p->first);
+ ceph_assert(client);
+ for (ceph::unordered_set<version_t>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ dout(10) << "try_to_expire " << get_mdstable_name(p->first) << " transaction " << *q
+ << " pending commit (not yet acked), waiting" << dendl;
+ ceph_assert(!client->has_committed(*q));
+ client->wait_for_ack(*q, gather_bld.new_sub());
+ }
+ }
+
+ // table servers
+ for (map<int, version_t>::iterator p = tablev.begin();
+ p != tablev.end();
+ ++p) {
+ MDSTableServer *server = mds->get_table_server(p->first);
+ ceph_assert(server);
+ if (p->second > server->get_committed_version()) {
+ dout(10) << "try_to_expire waiting for " << get_mdstable_name(p->first)
+ << " to save, need " << p->second << dendl;
+ server->save(gather_bld.new_sub());
+ }
+ }
+
+ // truncating
+ for (set<CInode*>::iterator p = truncating_inodes.begin();
+ p != truncating_inodes.end();
+ ++p) {
+ dout(10) << "try_to_expire waiting for truncate of " << **p << dendl;
+ (*p)->add_waiter(CInode::WAIT_TRUNC, gather_bld.new_sub());
+ }
+ // purge inodes
+ dout(10) << "try_to_expire waiting for purge of " << purging_inodes << dendl;
+ if (purging_inodes.size())
+ set_purged_cb(gather_bld.new_sub());
+
+ if (gather_bld.has_subs()) {
+ dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire waiting" << dendl;
+ mds->mdlog->flush();
+ } else {
+ ceph_assert(g_conf()->mds_kill_journal_expire_at != 5);
+ dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire success" << dendl;
+ }
+}
+
+// -----------------------
+// EMetaBlob
+
+void EMetaBlob::add_dir_context(CDir *dir, int mode)
+{
+ MDSRank *mds = dir->mdcache->mds;
+
+ list<CDentry*> parents;
+
+ // it may be okay not to include the maybe items, if
+ // - we journaled the maybe child inode in this segment
+ // - that subtree turns out to be unambiguously auth
+ list<CDentry*> maybe;
+ bool maybenot = false;
+
+ while (true) {
+ // already have this dir? (we must always add in order)
+ if (lump_map.count(dir->dirfrag())) {
+ dout(20) << "EMetaBlob::add_dir_context(" << dir << ") have lump " << dir->dirfrag() << dendl;
+ break;
+ }
+
+ // stop at root/stray
+ CInode *diri = dir->get_inode();
+ CDentry *parent = diri->get_projected_parent_dn();
+
+ if (mode == TO_AUTH_SUBTREE_ROOT) {
+ // subtree root?
+ if (dir->is_subtree_root()) {
+ // match logic in MDCache::create_subtree_map()
+ if (dir->get_dir_auth().first == mds->get_nodeid()) {
+ mds_authority_t parent_auth = parent ? parent->authority() : CDIR_AUTH_UNDEF;
+ if (parent_auth.first == dir->get_dir_auth().first) {
+ if (parent_auth.second == CDIR_AUTH_UNKNOWN &&
+ !dir->is_ambiguous_dir_auth() &&
+ !dir->state_test(CDir::STATE_EXPORTBOUND) &&
+ !dir->state_test(CDir::STATE_AUXSUBTREE) &&
+ !diri->state_test(CInode::STATE_AMBIGUOUSAUTH)) {
+ dout(0) << "EMetaBlob::add_dir_context unexpected subtree " << *dir << dendl;
+ ceph_abort();
+ }
+ dout(20) << "EMetaBlob::add_dir_context(" << dir << ") ambiguous or transient subtree " << dendl;
+ } else {
+ // it's an auth subtree, we don't need maybe (if any), and we're done.
+ dout(20) << "EMetaBlob::add_dir_context(" << dir << ") reached unambig auth subtree, don't need " << maybe
+ << " at " << *dir << dendl;
+ maybe.clear();
+ break;
+ }
+ } else {
+ dout(20) << "EMetaBlob::add_dir_context(" << dir << ") reached ambig or !auth subtree, need " << maybe
+ << " at " << *dir << dendl;
+ // we need the maybe list after all!
+ parents.splice(parents.begin(), maybe);
+ maybenot = false;
+ }
+ }
+
+ // was the inode journaled in this blob?
+ if (event_seq && diri->last_journaled == event_seq) {
+ dout(20) << "EMetaBlob::add_dir_context(" << dir << ") already have diri this blob " << *diri << dendl;
+ break;
+ }
+
+ // have we journaled this inode since the last subtree map?
+ if (!maybenot && last_subtree_map && diri->last_journaled >= last_subtree_map) {
+ dout(20) << "EMetaBlob::add_dir_context(" << dir << ") already have diri in this segment ("
+ << diri->last_journaled << " >= " << last_subtree_map << "), setting maybenot flag "
+ << *diri << dendl;
+ maybenot = true;
+ }
+ }
+
+ if (!parent)
+ break;
+
+ if (maybenot) {
+ dout(25) << "EMetaBlob::add_dir_context(" << dir << ") maybe " << *parent << dendl;
+ maybe.push_front(parent);
+ } else {
+ dout(25) << "EMetaBlob::add_dir_context(" << dir << ") definitely " << *parent << dendl;
+ parents.push_front(parent);
+ }
+
+ dir = parent->get_dir();
+ }
+
+ parents.splice(parents.begin(), maybe);
+
+ dout(20) << "EMetaBlob::add_dir_context final: " << parents << dendl;
+ for (const auto& dentry : parents) {
+ ceph_assert(dentry->get_projected_linkage()->is_primary());
+ add_dentry(dentry, false);
+ }
+}
+
+void EMetaBlob::update_segment(LogSegment *ls)
+{
+ // dirty inode mtimes
+ // -> handled directly by Server.cc, replay()
+
+ // alloc table update?
+ if (inotablev)
+ ls->inotablev = inotablev;
+ if (sessionmapv)
+ ls->sessionmapv = sessionmapv;
+
+ // truncated inodes
+ // -> handled directly by Server.cc
+
+ // client requests
+ // note the newest request per client
+ //if (!client_reqs.empty())
+ // ls->last_client_tid[client_reqs.rbegin()->client] = client_reqs.rbegin()->tid);
+}
+
+// EMetaBlob::fullbit
+
+void EMetaBlob::fullbit::encode(bufferlist& bl, uint64_t features) const {
+ ENCODE_START(9, 5, bl);
+ encode(dn, bl);
+ encode(dnfirst, bl);
+ encode(dnlast, bl);
+ encode(dnv, bl);
+ encode(*inode, bl, features);
+ if (xattrs)
+ encode(*xattrs, bl);
+ else
+ encode((__u32)0, bl);
+
+ if (inode->is_symlink())
+ encode(symlink, bl);
+ if (inode->is_dir()) {
+ encode(dirfragtree, bl);
+ encode(snapbl, bl);
+ }
+ encode(state, bl);
+ if (!old_inodes || old_inodes->empty()) {
+ encode(false, bl);
+ } else {
+ encode(true, bl);
+ encode(*old_inodes, bl, features);
+ }
+ if (!inode->is_dir())
+ encode(snapbl, bl);
+ encode(oldest_snap, bl);
+ encode(alternate_name, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EMetaBlob::fullbit::decode(bufferlist::const_iterator &bl) {
+ DECODE_START(9, bl);
+ decode(dn, bl);
+ decode(dnfirst, bl);
+ decode(dnlast, bl);
+ decode(dnv, bl);
+ {
+ auto _inode = CInode::allocate_inode();
+ decode(*_inode, bl);
+ inode = std::move(_inode);
+ }
+ {
+ CInode::mempool_xattr_map tmp;
+ decode_noshare(tmp, bl);
+ if (!tmp.empty())
+ xattrs = CInode::allocate_xattr_map(std::move(tmp));
+ }
+ if (inode->is_symlink())
+ decode(symlink, bl);
+ if (inode->is_dir()) {
+ decode(dirfragtree, bl);
+ decode(snapbl, bl);
+ }
+ decode(state, bl);
+ bool old_inodes_present;
+ decode(old_inodes_present, bl);
+ if (old_inodes_present) {
+ auto _old_inodes = CInode::allocate_old_inode_map();
+ decode(*_old_inodes, bl);
+ old_inodes = std::move(_old_inodes);
+ }
+ if (!inode->is_dir()) {
+ decode(snapbl, bl);
+ }
+ decode(oldest_snap, bl);
+ if (struct_v >= 9) {
+ decode(alternate_name, bl);
+ }
+ DECODE_FINISH(bl);
+}
+
+void EMetaBlob::fullbit::dump(Formatter *f) const
+{
+ f->dump_string("dentry", dn);
+ f->dump_stream("snapid.first") << dnfirst;
+ f->dump_stream("snapid.last") << dnlast;
+ f->dump_int("dentry version", dnv);
+ f->open_object_section("inode");
+ inode->dump(f);
+ f->close_section(); // inode
+ f->open_object_section("xattrs");
+ if (xattrs) {
+ for (const auto &p : *xattrs) {
+ std::string s(p.second.c_str(), p.second.length());
+ f->dump_string(p.first.c_str(), s);
+ }
+ }
+ f->close_section(); // xattrs
+ if (inode->is_symlink()) {
+ f->dump_string("symlink", symlink);
+ }
+ if (inode->is_dir()) {
+ f->dump_stream("frag tree") << dirfragtree;
+ f->dump_string("has_snapbl", snapbl.length() ? "true" : "false");
+ if (inode->has_layout()) {
+ f->open_object_section("file layout policy");
+ // FIXME
+ f->dump_string("layout", "the layout exists");
+ f->close_section(); // file layout policy
+ }
+ }
+ f->dump_string("state", state_string());
+ if (old_inodes && !old_inodes->empty()) {
+ f->open_array_section("old inodes");
+ for (const auto &p : *old_inodes) {
+ f->open_object_section("inode");
+ f->dump_int("snapid", p.first);
+ p.second.dump(f);
+ f->close_section(); // inode
+ }
+ f->close_section(); // old inodes
+ }
+ f->dump_string("alternate_name", alternate_name);
+}
+
+void EMetaBlob::fullbit::generate_test_instances(std::list<EMetaBlob::fullbit*>& ls)
+{
+ auto _inode = CInode::allocate_inode();
+ fragtree_t fragtree;
+ auto _xattrs = CInode::allocate_xattr_map();
+ bufferlist empty_snapbl;
+ fullbit *sample = new fullbit("/testdn", "", 0, 0, 0,
+ _inode, fragtree, _xattrs, "", 0, empty_snapbl,
+ false, NULL);
+ ls.push_back(sample);
+}
+
+void EMetaBlob::fullbit::update_inode(MDSRank *mds, CInode *in)
+{
+ in->reset_inode(std::move(inode));
+ in->reset_xattrs(std::move(xattrs));
+ if (in->is_dir()) {
+ if (is_export_ephemeral_random()) {
+ dout(15) << "random ephemeral pin on " << *in << dendl;
+ in->set_ephemeral_pin(false, true);
+ }
+ in->maybe_export_pin();
+ if (!(in->dirfragtree == dirfragtree)) {
+ dout(10) << "EMetaBlob::fullbit::update_inode dft " << in->dirfragtree << " -> "
+ << dirfragtree << " on " << *in << dendl;
+ in->dirfragtree = std::move(dirfragtree);
+ in->force_dirfrags();
+ if (in->get_num_dirfrags() && in->authority() == CDIR_AUTH_UNDEF) {
+ auto&& ls = in->get_nested_dirfrags();
+ for (const auto& dir : ls) {
+ if (dir->get_num_any() == 0 &&
+ mds->mdcache->can_trim_non_auth_dirfrag(dir)) {
+ dout(10) << " closing empty non-auth dirfrag " << *dir << dendl;
+ in->close_dirfrag(dir->get_frag());
+ }
+ }
+ }
+ }
+ } else if (in->is_symlink()) {
+ in->symlink = symlink;
+ }
+ in->reset_old_inodes(std::move(old_inodes));
+ if (in->is_any_old_inodes()) {
+ snapid_t min_first = in->get_old_inodes()->rbegin()->first + 1;
+ if (min_first > in->first)
+ in->first = min_first;
+ }
+
+ /*
+ * we can do this before linking hte inode bc the split_at would
+ * be a no-op.. we have no children (namely open snaprealms) to
+ * divy up
+ */
+ in->oldest_snap = oldest_snap;
+ in->decode_snap_blob(snapbl);
+
+ /*
+ * In case there was anything malformed in the journal that we are
+ * replaying, do sanity checks on the inodes we're replaying and
+ * go damaged instead of letting any trash into a live cache
+ */
+ if (in->is_file()) {
+ // Files must have valid layouts with a pool set
+ if (in->get_inode()->layout.pool_id == -1 ||
+ !in->get_inode()->layout.is_valid()) {
+ dout(0) << "EMetaBlob.replay invalid layout on ino " << *in
+ << ": " << in->get_inode()->layout << dendl;
+ CachedStackStringStream css;
+ *css << "Invalid layout for inode " << in->ino() << " in journal";
+ mds->clog->error() << css->strv();
+ mds->damaged();
+ ceph_abort(); // Should be unreachable because damaged() calls respawn()
+ }
+ }
+}
+
+// EMetaBlob::remotebit
+
+void EMetaBlob::remotebit::encode(bufferlist& bl) const
+{
+ ENCODE_START(3, 2, bl);
+ encode(dn, bl);
+ encode(dnfirst, bl);
+ encode(dnlast, bl);
+ encode(dnv, bl);
+ encode(ino, bl);
+ encode(d_type, bl);
+ encode(dirty, bl);
+ encode(alternate_name, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EMetaBlob::remotebit::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+ decode(dn, bl);
+ decode(dnfirst, bl);
+ decode(dnlast, bl);
+ decode(dnv, bl);
+ decode(ino, bl);
+ decode(d_type, bl);
+ decode(dirty, bl);
+ if (struct_v >= 3)
+ decode(alternate_name, bl);
+ DECODE_FINISH(bl);
+}
+
+void EMetaBlob::remotebit::dump(Formatter *f) const
+{
+ f->dump_string("dentry", dn);
+ f->dump_int("snapid.first", dnfirst);
+ f->dump_int("snapid.last", dnlast);
+ f->dump_int("dentry version", dnv);
+ f->dump_int("inodeno", ino);
+ uint32_t type = DTTOIF(d_type) & S_IFMT; // convert to type entries
+ string type_string;
+ switch(type) {
+ case S_IFREG:
+ type_string = "file"; break;
+ case S_IFLNK:
+ type_string = "symlink"; break;
+ case S_IFDIR:
+ type_string = "directory"; break;
+ case S_IFIFO:
+ type_string = "fifo"; break;
+ case S_IFCHR:
+ type_string = "chr"; break;
+ case S_IFBLK:
+ type_string = "blk"; break;
+ case S_IFSOCK:
+ type_string = "sock"; break;
+ default:
+ assert (0 == "unknown d_type!");
+ }
+ f->dump_string("d_type", type_string);
+ f->dump_string("dirty", dirty ? "true" : "false");
+ f->dump_string("alternate_name", alternate_name);
+}
+
+void EMetaBlob::remotebit::
+generate_test_instances(std::list<EMetaBlob::remotebit*>& ls)
+{
+ remotebit *remote = new remotebit("/test/dn", "", 0, 10, 15, 1, IFTODT(S_IFREG), false);
+ ls.push_back(remote);
+ remote = new remotebit("/test/dn2", "foo", 0, 10, 15, 1, IFTODT(S_IFREG), false);
+ ls.push_back(remote);
+}
+
+// EMetaBlob::nullbit
+
+void EMetaBlob::nullbit::encode(bufferlist& bl) const
+{
+ ENCODE_START(2, 2, bl);
+ encode(dn, bl);
+ encode(dnfirst, bl);
+ encode(dnlast, bl);
+ encode(dnv, bl);
+ encode(dirty, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EMetaBlob::nullbit::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
+ decode(dn, bl);
+ decode(dnfirst, bl);
+ decode(dnlast, bl);
+ decode(dnv, bl);
+ decode(dirty, bl);
+ DECODE_FINISH(bl);
+}
+
+void EMetaBlob::nullbit::dump(Formatter *f) const
+{
+ f->dump_string("dentry", dn);
+ f->dump_int("snapid.first", dnfirst);
+ f->dump_int("snapid.last", dnlast);
+ f->dump_int("dentry version", dnv);
+ f->dump_string("dirty", dirty ? "true" : "false");
+}
+
+void EMetaBlob::nullbit::generate_test_instances(std::list<nullbit*>& ls)
+{
+ nullbit *sample = new nullbit("/test/dentry", 0, 10, 15, false);
+ nullbit *sample2 = new nullbit("/test/dirty", 10, 20, 25, true);
+ ls.push_back(sample);
+ ls.push_back(sample2);
+}
+
+// EMetaBlob::dirlump
+
+void EMetaBlob::dirlump::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(2, 2, bl);
+ encode(*fnode, bl);
+ encode(state, bl);
+ encode(nfull, bl);
+ encode(nremote, bl);
+ encode(nnull, bl);
+ _encode_bits(features);
+ encode(dnbl, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EMetaBlob::dirlump::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl)
+ {
+ auto _fnode = CDir::allocate_fnode();
+ decode(*_fnode, bl);
+ fnode = std::move(_fnode);
+ }
+ decode(state, bl);
+ decode(nfull, bl);
+ decode(nremote, bl);
+ decode(nnull, bl);
+ decode(dnbl, bl);
+ dn_decoded = false; // don't decode bits unless we need them.
+ DECODE_FINISH(bl);
+}
+
+void EMetaBlob::dirlump::dump(Formatter *f) const
+{
+ if (!dn_decoded) {
+ dirlump *me = const_cast<dirlump*>(this);
+ me->_decode_bits();
+ }
+ f->open_object_section("fnode");
+ fnode->dump(f);
+ f->close_section(); // fnode
+ f->dump_string("state", state_string());
+ f->dump_int("nfull", nfull);
+ f->dump_int("nremote", nremote);
+ f->dump_int("nnull", nnull);
+
+ f->open_array_section("full bits");
+ for (const auto& iter : dfull) {
+ f->open_object_section("fullbit");
+ iter.dump(f);
+ f->close_section(); // fullbit
+ }
+ f->close_section(); // full bits
+ f->open_array_section("remote bits");
+ for (const auto& iter : dremote) {
+ f->open_object_section("remotebit");
+ iter.dump(f);
+ f->close_section(); // remotebit
+ }
+ f->close_section(); // remote bits
+ f->open_array_section("null bits");
+ for (const auto& iter : dnull) {
+ f->open_object_section("null bit");
+ iter.dump(f);
+ f->close_section(); // null bit
+ }
+ f->close_section(); // null bits
+}
+
+void EMetaBlob::dirlump::generate_test_instances(std::list<dirlump*>& ls)
+{
+ auto dl = new dirlump();
+ dl->fnode = CDir::allocate_fnode();
+ ls.push_back(dl);
+}
+
+/**
+ * EMetaBlob proper
+ */
+void EMetaBlob::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(8, 5, bl);
+ encode(lump_order, bl);
+ encode(lump_map, bl, features);
+ encode(roots, bl, features);
+ encode(table_tids, bl);
+ encode(opened_ino, bl);
+ encode(allocated_ino, bl);
+ encode(used_preallocated_ino, bl);
+ encode(preallocated_inos, bl);
+ encode(client_name, bl);
+ encode(inotablev, bl);
+ encode(sessionmapv, bl);
+ encode(truncate_start, bl);
+ encode(truncate_finish, bl);
+ encode(destroyed_inodes, bl);
+ encode(client_reqs, bl);
+ encode(renamed_dirino, bl);
+ encode(renamed_dir_frags, bl);
+ {
+ // make MDSRank use v6 format happy
+ int64_t i = -1;
+ bool b = false;
+ encode(i, bl);
+ encode(b, bl);
+ }
+ encode(client_flushes, bl);
+ ENCODE_FINISH(bl);
+}
+void EMetaBlob::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(8, 5, 5, bl);
+ decode(lump_order, bl);
+ decode(lump_map, bl);
+ if (struct_v >= 4) {
+ decode(roots, bl);
+ } else {
+ bufferlist rootbl;
+ decode(rootbl, bl);
+ if (rootbl.length()) {
+ auto p = rootbl.cbegin();
+ roots.emplace_back(p);
+ }
+ }
+ decode(table_tids, bl);
+ decode(opened_ino, bl);
+ decode(allocated_ino, bl);
+ decode(used_preallocated_ino, bl);
+ decode(preallocated_inos, bl);
+ decode(client_name, bl);
+ decode(inotablev, bl);
+ decode(sessionmapv, bl);
+ decode(truncate_start, bl);
+ decode(truncate_finish, bl);
+ decode(destroyed_inodes, bl);
+ if (struct_v >= 2) {
+ decode(client_reqs, bl);
+ } else {
+ list<metareqid_t> r;
+ decode(r, bl);
+ while (!r.empty()) {
+ client_reqs.push_back(pair<metareqid_t,uint64_t>(r.front(), 0));
+ r.pop_front();
+ }
+ }
+ if (struct_v >= 3) {
+ decode(renamed_dirino, bl);
+ decode(renamed_dir_frags, bl);
+ }
+ if (struct_v >= 6) {
+ // ignore
+ int64_t i;
+ bool b;
+ decode(i, bl);
+ decode(b, bl);
+ }
+ if (struct_v >= 8) {
+ decode(client_flushes, bl);
+ }
+ DECODE_FINISH(bl);
+}
+
+
+/**
+ * Get all inodes touched by this metablob. Includes the 'bits' within
+ * dirlumps, and the inodes of the dirs themselves.
+ */
+void EMetaBlob::get_inodes(
+ std::set<inodeno_t> &inodes) const
+{
+ // For all dirlumps in this metablob
+ for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
+ // Record inode of dirlump
+ inodeno_t const dir_ino = i->first.ino;
+ inodes.insert(dir_ino);
+
+ // Decode dirlump bits
+ dirlump const &dl = i->second;
+ dl._decode_bits();
+
+ // Record inodes of fullbits
+ for (const auto& iter : dl.get_dfull()) {
+ inodes.insert(iter.inode->ino);
+ }
+
+ // Record inodes of remotebits
+ for (const auto& iter : dl.get_dremote()) {
+ inodes.insert(iter.ino);
+ }
+ }
+}
+
+
+/**
+ * Get a map of dirfrag to set of dentries in that dirfrag which are
+ * touched in this operation.
+ */
+void EMetaBlob::get_dentries(std::map<dirfrag_t, std::set<std::string> > &dentries) const
+{
+ for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
+ dirlump const &dl = i->second;
+ dirfrag_t const &df = i->first;
+
+ // Get all bits
+ dl._decode_bits();
+
+ // For all bits, store dentry
+ for (const auto& iter : dl.get_dfull()) {
+ dentries[df].insert(iter.dn);
+ }
+ for (const auto& iter : dl.get_dremote()) {
+ dentries[df].insert(iter.dn);
+ }
+ for (const auto& iter : dl.get_dnull()) {
+ dentries[df].insert(iter.dn);
+ }
+ }
+}
+
+
+
+/**
+ * Calculate all paths that we can infer are touched by this metablob. Only uses
+ * information local to this metablob so it may only be the path within the
+ * subtree.
+ */
+void EMetaBlob::get_paths(
+ std::vector<std::string> &paths) const
+{
+ // Each dentry has a 'location' which is a 2-tuple of parent inode and dentry name
+ typedef std::pair<inodeno_t, std::string> Location;
+
+ // Whenever we see a dentry within a dirlump, we remember it as a child of
+ // the dirlump's inode
+ std::map<inodeno_t, std::vector<std::string> > children;
+
+ // Whenever we see a location for an inode, remember it: this allows us to
+ // build a path given an inode
+ std::map<inodeno_t, Location> ino_locations;
+
+ // Special case: operations on root inode populate roots but not dirlumps
+ if (lump_map.empty() && !roots.empty()) {
+ paths.push_back("/");
+ return;
+ }
+
+ // First pass
+ // ==========
+ // Build a tiny local metadata cache for the path structure in this metablob
+ for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
+ inodeno_t const dir_ino = i->first.ino;
+ dirlump const &dl = i->second;
+ dl._decode_bits();
+
+ for (const auto& iter : dl.get_dfull()) {
+ std::string_view dentry = iter.dn;
+ children[dir_ino].emplace_back(dentry);
+ ino_locations[iter.inode->ino] = Location(dir_ino, dentry);
+ }
+
+ for (const auto& iter : dl.get_dremote()) {
+ std::string_view dentry = iter.dn;
+ children[dir_ino].emplace_back(dentry);
+ }
+
+ for (const auto& iter : dl.get_dnull()) {
+ std::string_view dentry = iter.dn;
+ children[dir_ino].emplace_back(dentry);
+ }
+ }
+
+ std::vector<Location> leaf_locations;
+
+ // Second pass
+ // ===========
+ // Output paths for all childless nodes in the metablob
+ for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
+ inodeno_t const dir_ino = i->first.ino;
+ dirlump const &dl = i->second;
+ dl._decode_bits();
+
+ for (const auto& iter : dl.get_dfull()) {
+ std::string_view dentry = iter.dn;
+ if (children.find(iter.inode->ino) == children.end()) {
+ leaf_locations.push_back(Location(dir_ino, dentry));
+ }
+ }
+
+ for (const auto& iter : dl.get_dremote()) {
+ std::string_view dentry = iter.dn;
+ leaf_locations.push_back(Location(dir_ino, dentry));
+ }
+
+ for (const auto& iter : dl.get_dnull()) {
+ std::string_view dentry = iter.dn;
+ leaf_locations.push_back(Location(dir_ino, dentry));
+ }
+ }
+
+ // For all the leaf locations identified, generate paths
+ for (std::vector<Location>::iterator i = leaf_locations.begin(); i != leaf_locations.end(); ++i) {
+ Location const &loc = *i;
+ std::string path = loc.second;
+ inodeno_t ino = loc.first;
+ std::map<inodeno_t, Location>::iterator iter = ino_locations.find(ino);
+ while(iter != ino_locations.end()) {
+ Location const &loc = iter->second;
+ if (!path.empty()) {
+ path = loc.second + "/" + path;
+ } else {
+ path = loc.second + path;
+ }
+ iter = ino_locations.find(loc.first);
+ }
+
+ paths.push_back(path);
+ }
+}
+
+
+void EMetaBlob::dump(Formatter *f) const
+{
+ f->open_array_section("lumps");
+ for (const auto& d : lump_order) {
+ f->open_object_section("lump");
+ f->open_object_section("dirfrag");
+ f->dump_stream("dirfrag") << d;
+ f->close_section(); // dirfrag
+ f->open_object_section("dirlump");
+ lump_map.at(d).dump(f);
+ f->close_section(); // dirlump
+ f->close_section(); // lump
+ }
+ f->close_section(); // lumps
+
+ f->open_array_section("roots");
+ for (const auto& iter : roots) {
+ f->open_object_section("root");
+ iter.dump(f);
+ f->close_section(); // root
+ }
+ f->close_section(); // roots
+
+ f->open_array_section("tableclient tranactions");
+ for (const auto& p : table_tids) {
+ f->open_object_section("transaction");
+ f->dump_int("tid", p.first);
+ f->dump_int("version", p.second);
+ f->close_section(); // transaction
+ }
+ f->close_section(); // tableclient transactions
+
+ f->dump_int("renamed directory inodeno", renamed_dirino);
+
+ f->open_array_section("renamed directory fragments");
+ for (const auto& p : renamed_dir_frags) {
+ f->dump_int("frag", p);
+ }
+ f->close_section(); // renamed directory fragments
+
+ f->dump_int("inotable version", inotablev);
+ f->dump_int("SessionMap version", sessionmapv);
+ f->dump_int("allocated ino", allocated_ino);
+
+ f->dump_stream("preallocated inos") << preallocated_inos;
+ f->dump_int("used preallocated ino", used_preallocated_ino);
+
+ f->open_object_section("client name");
+ client_name.dump(f);
+ f->close_section(); // client name
+
+ f->open_array_section("inodes starting a truncate");
+ for(const auto& ino : truncate_start) {
+ f->dump_int("inodeno", ino);
+ }
+ f->close_section(); // truncate inodes
+ f->open_array_section("inodes finishing a truncated");
+ for(const auto& p : truncate_finish) {
+ f->open_object_section("inode+segment");
+ f->dump_int("inodeno", p.first);
+ f->dump_int("truncate starting segment", p.second);
+ f->close_section(); // truncated inode
+ }
+ f->close_section(); // truncate finish inodes
+
+ f->open_array_section("destroyed inodes");
+ for(vector<inodeno_t>::const_iterator i = destroyed_inodes.begin();
+ i != destroyed_inodes.end(); ++i) {
+ f->dump_int("inodeno", *i);
+ }
+ f->close_section(); // destroyed inodes
+
+ f->open_array_section("client requests");
+ for(const auto& p : client_reqs) {
+ f->open_object_section("Client request");
+ f->dump_stream("request ID") << p.first;
+ f->dump_int("oldest request on client", p.second);
+ f->close_section(); // request
+ }
+ f->close_section(); // client requests
+}
+
+void EMetaBlob::generate_test_instances(std::list<EMetaBlob*>& ls)
+{
+ ls.push_back(new EMetaBlob());
+}
+
+void EMetaBlob::replay(MDSRank *mds, LogSegment *logseg, int type, MDPeerUpdate *peerup)
+{
+ dout(10) << "EMetaBlob.replay " << lump_map.size() << " dirlumps by " << client_name << dendl;
+
+ ceph_assert(logseg);
+
+ ceph_assert(g_conf()->mds_kill_journal_replay_at != 1);
+
+ for (auto& p : roots) {
+ CInode *in = mds->mdcache->get_inode(p.inode->ino);
+ bool isnew = in ? false:true;
+ if (!in)
+ in = new CInode(mds->mdcache, false, 2, CEPH_NOSNAP);
+ p.update_inode(mds, in);
+
+ if (isnew)
+ mds->mdcache->add_inode(in);
+ if (p.is_dirty()) in->_mark_dirty(logseg);
+ dout(10) << "EMetaBlob.replay " << (isnew ? " added root ":" updated root ") << *in << dendl;
+ }
+
+ CInode *renamed_diri = 0;
+ CDir *olddir = 0;
+ if (renamed_dirino) {
+ renamed_diri = mds->mdcache->get_inode(renamed_dirino);
+ if (renamed_diri)
+ dout(10) << "EMetaBlob.replay renamed inode is " << *renamed_diri << dendl;
+ else
+ dout(10) << "EMetaBlob.replay don't have renamed ino " << renamed_dirino << dendl;
+
+ int nnull = 0;
+ for (const auto& lp : lump_order) {
+ dirlump &lump = lump_map[lp];
+ if (lump.nnull) {
+ dout(10) << "EMetaBlob.replay found null dentry in dir " << lp << dendl;
+ nnull += lump.nnull;
+ }
+ }
+ ceph_assert(nnull <= 1);
+ }
+
+ // keep track of any inodes we unlink and don't relink elsewhere
+ map<CInode*, CDir*> unlinked;
+ set<CInode*> linked;
+
+ // walk through my dirs (in order!)
+ int count = 0;
+ for (const auto& lp : lump_order) {
+ dout(10) << "EMetaBlob.replay dir " << lp << dendl;
+ dirlump &lump = lump_map[lp];
+
+ // the dir
+ CDir *dir = mds->mdcache->get_force_dirfrag(lp, true);
+ if (!dir) {
+ // hmm. do i have the inode?
+ CInode *diri = mds->mdcache->get_inode((lp).ino);
+ if (!diri) {
+ if (MDS_INO_IS_MDSDIR(lp.ino)) {
+ ceph_assert(MDS_INO_MDSDIR(mds->get_nodeid()) != lp.ino);
+ diri = mds->mdcache->create_system_inode(lp.ino, S_IFDIR|0755);
+ diri->state_clear(CInode::STATE_AUTH);
+ dout(10) << "EMetaBlob.replay created base " << *diri << dendl;
+ } else {
+ dout(0) << "EMetaBlob.replay missing dir ino " << lp.ino << dendl;
+ mds->clog->error() << "failure replaying journal (EMetaBlob)";
+ mds->damaged();
+ ceph_abort(); // Should be unreachable because damaged() calls respawn()
+ }
+ }
+
+ // create the dirfrag
+ dir = diri->get_or_open_dirfrag(mds->mdcache, lp.frag);
+
+ if (MDS_INO_IS_BASE(lp.ino))
+ mds->mdcache->adjust_subtree_auth(dir, CDIR_AUTH_UNDEF);
+
+ dout(10) << "EMetaBlob.replay added dir " << *dir << dendl;
+ }
+ dir->reset_fnode(std::move(lump.fnode));
+ dir->update_projected_version();
+
+ if (lump.is_importing()) {
+ dir->state_set(CDir::STATE_AUTH);
+ dir->state_clear(CDir::STATE_COMPLETE);
+ }
+ if (lump.is_dirty()) {
+ dir->_mark_dirty(logseg);
+
+ if (!(dir->get_fnode()->rstat == dir->get_fnode()->accounted_rstat)) {
+ dout(10) << "EMetaBlob.replay dirty nestinfo on " << *dir << dendl;
+ mds->locker->mark_updated_scatterlock(&dir->inode->nestlock);
+ logseg->dirty_dirfrag_nest.push_back(&dir->inode->item_dirty_dirfrag_nest);
+ } else {
+ dout(10) << "EMetaBlob.replay clean nestinfo on " << *dir << dendl;
+ }
+ if (!(dir->get_fnode()->fragstat == dir->get_fnode()->accounted_fragstat)) {
+ dout(10) << "EMetaBlob.replay dirty fragstat on " << *dir << dendl;
+ mds->locker->mark_updated_scatterlock(&dir->inode->filelock);
+ logseg->dirty_dirfrag_dir.push_back(&dir->inode->item_dirty_dirfrag_dir);
+ } else {
+ dout(10) << "EMetaBlob.replay clean fragstat on " << *dir << dendl;
+ }
+ }
+ if (lump.is_dirty_dft()) {
+ dout(10) << "EMetaBlob.replay dirty dirfragtree on " << *dir << dendl;
+ dir->state_set(CDir::STATE_DIRTYDFT);
+ mds->locker->mark_updated_scatterlock(&dir->inode->dirfragtreelock);
+ logseg->dirty_dirfrag_dirfragtree.push_back(&dir->inode->item_dirty_dirfrag_dirfragtree);
+ }
+ if (lump.is_new())
+ dir->mark_new(logseg);
+ if (lump.is_complete())
+ dir->mark_complete();
+
+ dout(10) << "EMetaBlob.replay updated dir " << *dir << dendl;
+
+ // decode bits
+ lump._decode_bits();
+
+ // full dentry+inode pairs
+ for (auto& fb : lump._get_dfull()) {
+ CDentry *dn = dir->lookup_exact_snap(fb.dn, fb.dnlast);
+ if (!dn) {
+ dn = dir->add_null_dentry(fb.dn, fb.dnfirst, fb.dnlast);
+ dn->set_version(fb.dnv);
+ if (fb.is_dirty()) dn->_mark_dirty(logseg);
+ dout(10) << "EMetaBlob.replay added (full) " << *dn << dendl;
+ } else {
+ dn->set_version(fb.dnv);
+ if (fb.is_dirty()) dn->_mark_dirty(logseg);
+ dout(10) << "EMetaBlob.replay for [" << fb.dnfirst << "," << fb.dnlast << "] had " << *dn << dendl;
+ dn->first = fb.dnfirst;
+ ceph_assert(dn->last == fb.dnlast);
+ }
+ if (lump.is_importing())
+ dn->mark_auth();
+
+ CInode *in = mds->mdcache->get_inode(fb.inode->ino, fb.dnlast);
+ if (!in) {
+ in = new CInode(mds->mdcache, dn->is_auth(), fb.dnfirst, fb.dnlast);
+ fb.update_inode(mds, in);
+ mds->mdcache->add_inode(in);
+ if (!dn->get_linkage()->is_null()) {
+ if (dn->get_linkage()->is_primary()) {
+ unlinked[dn->get_linkage()->get_inode()] = dir;
+ CachedStackStringStream css;
+ *css << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
+ << " " << *dn->get_linkage()->get_inode() << " should be " << in->ino();
+ dout(0) << css->strv() << dendl;
+ mds->clog->warn() << css->strv();
+ }
+ dir->unlink_inode(dn, false);
+ }
+ if (unlinked.count(in))
+ linked.insert(in);
+ dir->link_primary_inode(dn, in);
+ dout(10) << "EMetaBlob.replay added " << *in << dendl;
+ } else {
+ in->first = fb.dnfirst;
+ fb.update_inode(mds, in);
+ if (dn->get_linkage()->get_inode() != in && in->get_parent_dn()) {
+ dout(10) << "EMetaBlob.replay unlinking " << *in << dendl;
+ unlinked[in] = in->get_parent_dir();
+ in->get_parent_dir()->unlink_inode(in->get_parent_dn());
+ }
+ if (dn->get_linkage()->get_inode() != in) {
+ if (!dn->get_linkage()->is_null()) { // note: might be remote. as with stray reintegration.
+ if (dn->get_linkage()->is_primary()) {
+ unlinked[dn->get_linkage()->get_inode()] = dir;
+ CachedStackStringStream css;
+ *css << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
+ << " " << *dn->get_linkage()->get_inode() << " should be " << in->ino();
+ dout(0) << css->strv() << dendl;
+ mds->clog->warn() << css->strv();
+ }
+ dir->unlink_inode(dn, false);
+ }
+ if (unlinked.count(in))
+ linked.insert(in);
+ dir->link_primary_inode(dn, in);
+ dout(10) << "EMetaBlob.replay linked " << *in << dendl;
+ } else {
+ dout(10) << "EMetaBlob.replay for [" << fb.dnfirst << "," << fb.dnlast << "] had " << *in << dendl;
+ }
+ ceph_assert(in->first == fb.dnfirst ||
+ (in->is_multiversion() && in->first > fb.dnfirst));
+ }
+ if (fb.is_dirty())
+ in->_mark_dirty(logseg);
+ if (fb.is_dirty_parent())
+ in->mark_dirty_parent(logseg, fb.is_dirty_pool());
+ if (fb.need_snapflush())
+ logseg->open_files.push_back(&in->item_open_file);
+ if (dn->is_auth())
+ in->state_set(CInode::STATE_AUTH);
+ else
+ in->state_clear(CInode::STATE_AUTH);
+ ceph_assert(g_conf()->mds_kill_journal_replay_at != 2);
+
+ {
+ auto do_corruption = mds->get_inject_journal_corrupt_dentry_first();
+ if (unlikely(do_corruption > 0.0)) {
+ auto r = ceph::util::generate_random_number(0.0, 1.0);
+ if (r < do_corruption) {
+ dout(0) << "corrupting dn: " << *dn << dendl;
+ dn->first = -10;
+ }
+ }
+ }
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+
+ // remote dentries
+ for (const auto& rb : lump.get_dremote()) {
+ CDentry *dn = dir->lookup_exact_snap(rb.dn, rb.dnlast);
+ if (!dn) {
+ dn = dir->add_remote_dentry(rb.dn, rb.ino, rb.d_type, mempool::mds_co::string(rb.alternate_name), rb.dnfirst, rb.dnlast);
+ dn->set_version(rb.dnv);
+ if (rb.dirty) dn->_mark_dirty(logseg);
+ dout(10) << "EMetaBlob.replay added " << *dn << dendl;
+ } else {
+ if (!dn->get_linkage()->is_null()) {
+ dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
+ if (dn->get_linkage()->is_primary()) {
+ unlinked[dn->get_linkage()->get_inode()] = dir;
+ CachedStackStringStream css;
+ *css << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
+ << " " << *dn->get_linkage()->get_inode() << " should be remote " << rb.ino;
+ dout(0) << css->strv() << dendl;
+ }
+ dir->unlink_inode(dn, false);
+ }
+ dn->set_alternate_name(mempool::mds_co::string(rb.alternate_name));
+ dir->link_remote_inode(dn, rb.ino, rb.d_type);
+ dn->set_version(rb.dnv);
+ if (rb.dirty) dn->_mark_dirty(logseg);
+ dout(10) << "EMetaBlob.replay for [" << rb.dnfirst << "," << rb.dnlast << "] had " << *dn << dendl;
+ dn->first = rb.dnfirst;
+ ceph_assert(dn->last == rb.dnlast);
+ }
+ if (lump.is_importing())
+ dn->mark_auth();
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+
+ // null dentries
+ for (const auto& nb : lump.get_dnull()) {
+ CDentry *dn = dir->lookup_exact_snap(nb.dn, nb.dnlast);
+ if (!dn) {
+ dn = dir->add_null_dentry(nb.dn, nb.dnfirst, nb.dnlast);
+ dn->set_version(nb.dnv);
+ if (nb.dirty) dn->_mark_dirty(logseg);
+ dout(10) << "EMetaBlob.replay added (nullbit) " << *dn << dendl;
+ } else {
+ dn->first = nb.dnfirst;
+ if (!dn->get_linkage()->is_null()) {
+ dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
+ CInode *in = dn->get_linkage()->get_inode();
+ // For renamed inode, We may call CInode::force_dirfrag() later.
+ // CInode::force_dirfrag() doesn't work well when inode is detached
+ // from the hierarchy.
+ if (!renamed_diri || renamed_diri != in) {
+ if (dn->get_linkage()->is_primary())
+ unlinked[in] = dir;
+ dir->unlink_inode(dn);
+ }
+ }
+ dn->set_version(nb.dnv);
+ if (nb.dirty) dn->_mark_dirty(logseg);
+ dout(10) << "EMetaBlob.replay had " << *dn << dendl;
+ ceph_assert(dn->last == nb.dnlast);
+ }
+ olddir = dir;
+ if (lump.is_importing())
+ dn->mark_auth();
+
+ // Make null dentries the first things we trim
+ dout(10) << "EMetaBlob.replay pushing to bottom of lru " << *dn << dendl;
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+ }
+
+ ceph_assert(g_conf()->mds_kill_journal_replay_at != 3);
+
+ if (renamed_dirino) {
+ if (renamed_diri) {
+ ceph_assert(unlinked.count(renamed_diri));
+ ceph_assert(linked.count(renamed_diri));
+ olddir = unlinked[renamed_diri];
+ } else {
+ // we imported a diri we haven't seen before
+ renamed_diri = mds->mdcache->get_inode(renamed_dirino);
+ ceph_assert(renamed_diri); // it was in the metablob
+ }
+
+ if (olddir) {
+ if (olddir->authority() != CDIR_AUTH_UNDEF &&
+ renamed_diri->authority() == CDIR_AUTH_UNDEF) {
+ ceph_assert(peerup); // auth to non-auth, must be peer prepare
+ frag_vec_t leaves;
+ renamed_diri->dirfragtree.get_leaves(leaves);
+ for (const auto& leaf : leaves) {
+ CDir *dir = renamed_diri->get_dirfrag(leaf);
+ ceph_assert(dir);
+ if (dir->get_dir_auth() == CDIR_AUTH_UNDEF)
+ // preserve subtree bound until peer commit
+ peerup->olddirs.insert(dir->inode);
+ else
+ dir->state_set(CDir::STATE_AUTH);
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+ }
+
+ mds->mdcache->adjust_subtree_after_rename(renamed_diri, olddir, false);
+
+ // see if we can discard the subtree we renamed out of
+ CDir *root = mds->mdcache->get_subtree_root(olddir);
+ if (root->get_dir_auth() == CDIR_AUTH_UNDEF) {
+ if (peerup) // preserve the old dir until peer commit
+ peerup->olddirs.insert(olddir->inode);
+ else
+ mds->mdcache->try_trim_non_auth_subtree(root);
+ }
+ }
+
+ // if we are the srci importer, we'll also have some dirfrags we have to open up...
+ if (renamed_diri->authority() != CDIR_AUTH_UNDEF) {
+ for (const auto& p : renamed_dir_frags) {
+ CDir *dir = renamed_diri->get_dirfrag(p);
+ if (dir) {
+ // we already had the inode before, and we already adjusted this subtree accordingly.
+ dout(10) << " already had+adjusted rename import bound " << *dir << dendl;
+ ceph_assert(olddir);
+ continue;
+ }
+ dir = renamed_diri->get_or_open_dirfrag(mds->mdcache, p);
+ dout(10) << " creating new rename import bound " << *dir << dendl;
+ dir->state_clear(CDir::STATE_AUTH);
+ mds->mdcache->adjust_subtree_auth(dir, CDIR_AUTH_UNDEF);
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+ }
+
+ // rename may overwrite an empty directory and move it into stray dir.
+ unlinked.erase(renamed_diri);
+ for (map<CInode*, CDir*>::iterator p = unlinked.begin(); p != unlinked.end(); ++p) {
+ if (!linked.count(p->first))
+ continue;
+ ceph_assert(p->first->is_dir());
+ mds->mdcache->adjust_subtree_after_rename(p->first, p->second, false);
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+ }
+
+ if (!unlinked.empty()) {
+ for (set<CInode*>::iterator p = linked.begin(); p != linked.end(); ++p)
+ unlinked.erase(*p);
+ dout(10) << " unlinked set contains " << unlinked << dendl;
+ for (map<CInode*, CDir*>::iterator p = unlinked.begin(); p != unlinked.end(); ++p) {
+ CInode *in = p->first;
+ if (peerup) { // preserve unlinked inodes until peer commit
+ peerup->unlinked.insert(in);
+ if (in->snaprealm)
+ in->snaprealm->adjust_parent();
+ } else
+ mds->mdcache->remove_inode_recursive(in);
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+ }
+
+ // table client transactions
+ for (const auto& p : table_tids) {
+ dout(10) << "EMetaBlob.replay noting " << get_mdstable_name(p.first)
+ << " transaction " << p.second << dendl;
+ MDSTableClient *client = mds->get_table_client(p.first);
+ if (client)
+ client->got_journaled_agree(p.second, logseg);
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+
+ // opened ino?
+ if (opened_ino) {
+ CInode *in = mds->mdcache->get_inode(opened_ino);
+ ceph_assert(in);
+ dout(10) << "EMetaBlob.replay noting opened inode " << *in << dendl;
+ logseg->open_files.push_back(&in->item_open_file);
+ }
+
+ bool skip_replaying_inotable = g_conf()->mds_inject_skip_replaying_inotable;
+
+ // allocated_inos
+ if (inotablev) {
+ if (mds->inotable->get_version() >= inotablev ||
+ unlikely(type == EVENT_UPDATE && skip_replaying_inotable)) {
+ dout(10) << "EMetaBlob.replay inotable tablev " << inotablev
+ << " <= table " << mds->inotable->get_version() << dendl;
+ if (allocated_ino)
+ mds->mdcache->insert_taken_inos(allocated_ino);
+ } else {
+ dout(10) << "EMetaBlob.replay inotable v " << inotablev
+ << " - 1 == table " << mds->inotable->get_version()
+ << " allocated+used " << allocated_ino
+ << " prealloc " << preallocated_inos
+ << dendl;
+ if (allocated_ino)
+ mds->inotable->replay_alloc_id(allocated_ino);
+ if (preallocated_inos.size())
+ mds->inotable->replay_alloc_ids(preallocated_inos);
+
+ // repair inotable updates in case inotable wasn't persist in time
+ if (inotablev > mds->inotable->get_version()) {
+ mds->clog->error() << "journal replay inotablev mismatch "
+ << mds->inotable->get_version() << " -> " << inotablev
+ << ", will force replay it.";
+ mds->inotable->force_replay_version(inotablev);
+ }
+
+ ceph_assert(inotablev == mds->inotable->get_version());
+ }
+ }
+ if (sessionmapv) {
+ if (mds->sessionmap.get_version() >= sessionmapv ||
+ unlikely(type == EVENT_UPDATE && skip_replaying_inotable)) {
+ dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
+ << " <= table " << mds->sessionmap.get_version() << dendl;
+ if (used_preallocated_ino)
+ mds->mdcache->insert_taken_inos(used_preallocated_ino);
+ } else {
+ dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
+ << ", table " << mds->sessionmap.get_version()
+ << " prealloc " << preallocated_inos
+ << " used " << used_preallocated_ino
+ << dendl;
+ Session *session = mds->sessionmap.get_session(client_name);
+ if (session) {
+ dout(20) << " (session prealloc " << session->info.prealloc_inos << ")" << dendl;
+ if (used_preallocated_ino) {
+ if (!session->info.prealloc_inos.empty()) {
+ inodeno_t ino = session->take_ino(used_preallocated_ino);
+ session->info.prealloc_inos.erase(ino);
+ ceph_assert(ino == used_preallocated_ino);
+ }
+ mds->sessionmap.replay_dirty_session(session);
+ }
+ if (!preallocated_inos.empty()) {
+ session->free_prealloc_inos.insert(preallocated_inos);
+ session->info.prealloc_inos.insert(preallocated_inos);
+ mds->sessionmap.replay_dirty_session(session);
+ }
+ } else {
+ dout(10) << "EMetaBlob.replay no session for " << client_name << dendl;
+ if (used_preallocated_ino)
+ mds->sessionmap.replay_advance_version();
+
+ if (!preallocated_inos.empty())
+ mds->sessionmap.replay_advance_version();
+ }
+
+ // repair sessionmap updates in case sessionmap wasn't persist in time
+ if (sessionmapv > mds->sessionmap.get_version()) {
+ mds->clog->error() << "EMetaBlob.replay sessionmapv mismatch "
+ << sessionmapv << " -> " << mds->sessionmap.get_version()
+ << ", will force replay it.";
+ if (g_conf()->mds_wipe_sessions) {
+ mds->sessionmap.wipe();
+ }
+ // force replay sessionmap version
+ mds->sessionmap.set_version(sessionmapv);
+ }
+ ceph_assert(sessionmapv == mds->sessionmap.get_version());
+ }
+ }
+
+ // truncating inodes
+ for (const auto& ino : truncate_start) {
+ CInode *in = mds->mdcache->get_inode(ino);
+ ceph_assert(in);
+ mds->mdcache->add_recovered_truncate(in, logseg);
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+ for (const auto& p : truncate_finish) {
+ LogSegment *ls = mds->mdlog->get_segment(p.second);
+ if (ls) {
+ CInode *in = mds->mdcache->get_inode(p.first);
+ ceph_assert(in);
+ mds->mdcache->remove_recovered_truncate(in, ls);
+ }
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+
+ // destroyed inodes
+ if (!destroyed_inodes.empty()) {
+ for (vector<inodeno_t>::iterator p = destroyed_inodes.begin();
+ p != destroyed_inodes.end();
+ ++p) {
+ CInode *in = mds->mdcache->get_inode(*p);
+ if (in) {
+ dout(10) << "EMetaBlob.replay destroyed " << *p << ", dropping " << *in << dendl;
+ CDentry *parent = in->get_parent_dn();
+ mds->mdcache->remove_inode(in);
+ if (parent) {
+ dout(10) << "EMetaBlob.replay unlinked from dentry " << *parent << dendl;
+ ceph_assert(parent->get_linkage()->is_null());
+ }
+ } else {
+ dout(10) << "EMetaBlob.replay destroyed " << *p << ", not in cache" << dendl;
+ }
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+ mds->mdcache->open_file_table.note_destroyed_inos(logseg->seq, destroyed_inodes);
+ }
+
+ // client requests
+ for (const auto& p : client_reqs) {
+ if (p.first.name.is_client()) {
+ dout(10) << "EMetaBlob.replay request " << p.first << " trim_to " << p.second << dendl;
+ inodeno_t created = allocated_ino ? allocated_ino : used_preallocated_ino;
+ // if we allocated an inode, there should be exactly one client request id.
+ ceph_assert(created == inodeno_t() || client_reqs.size() == 1);
+
+ Session *session = mds->sessionmap.get_session(p.first.name);
+ if (session) {
+ session->add_completed_request(p.first.tid, created);
+ if (p.second)
+ session->trim_completed_requests(p.second);
+ }
+ }
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+
+ // client flushes
+ for (const auto& p : client_flushes) {
+ if (p.first.name.is_client()) {
+ dout(10) << "EMetaBlob.replay flush " << p.first << " trim_to " << p.second << dendl;
+ Session *session = mds->sessionmap.get_session(p.first.name);
+ if (session) {
+ session->add_completed_flush(p.first.tid);
+ if (p.second)
+ session->trim_completed_flushes(p.second);
+ }
+ }
+
+ if (!(++count % mds->heartbeat_reset_grace()))
+ mds->heartbeat_reset();
+ }
+
+ // update segment
+ update_segment(logseg);
+
+ ceph_assert(g_conf()->mds_kill_journal_replay_at != 4);
+}
+
+// -----------------------
+// EPurged
+void EPurged::update_segment()
+{
+ if (inos.size() && inotablev)
+ get_segment()->inotablev = inotablev;
+ return;
+}
+
+void EPurged::replay(MDSRank *mds)
+{
+ if (inos.size()) {
+ LogSegment *ls = mds->mdlog->get_segment(seq);
+ if (ls)
+ ls->purging_inodes.subtract(inos);
+
+ if (mds->inotable->get_version() >= inotablev) {
+ dout(10) << "EPurged.replay inotable " << mds->inotable->get_version()
+ << " >= " << inotablev << ", noop" << dendl;
+ } else {
+ dout(10) << "EPurged.replay inotable " << mds->inotable->get_version()
+ << " < " << inotablev << " " << dendl;
+ mds->inotable->replay_release_ids(inos);
+ ceph_assert(mds->inotable->get_version() == inotablev);
+ }
+ }
+ update_segment();
+}
+
+void EPurged::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(1, 1, bl);
+ encode(inos, bl);
+ encode(inotablev, bl);
+ encode(seq, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EPurged::decode(bufferlist::const_iterator& bl)
+{
+ DECODE_START(1, bl);
+ decode(inos, bl);
+ decode(inotablev, bl);
+ decode(seq, bl);
+ DECODE_FINISH(bl);
+}
+
+void EPurged::dump(Formatter *f) const
+{
+ f->dump_stream("inos") << inos;
+ f->dump_int("inotable version", inotablev);
+ f->dump_int("segment seq", seq);
+}
+
+// -----------------------
+// ESession
+
+void ESession::update_segment()
+{
+ get_segment()->sessionmapv = cmapv;
+ if (inos_to_free.size() && inotablev)
+ get_segment()->inotablev = inotablev;
+}
+
+void ESession::replay(MDSRank *mds)
+{
+ if (inos_to_purge.size())
+ get_segment()->purging_inodes.insert(inos_to_purge);
+
+ if (mds->sessionmap.get_version() >= cmapv) {
+ dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
+ << " >= " << cmapv << ", noop" << dendl;
+ } else if (mds->sessionmap.get_version() + 1 == cmapv) {
+ dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
+ << " < " << cmapv << " " << (open ? "open":"close") << " " << client_inst << dendl;
+ Session *session;
+ if (open) {
+ session = mds->sessionmap.get_or_add_session(client_inst);
+ mds->sessionmap.set_state(session, Session::STATE_OPEN);
+ session->set_client_metadata(client_metadata);
+ dout(10) << " opened session " << session->info.inst << dendl;
+ } else {
+ session = mds->sessionmap.get_session(client_inst.name);
+ if (session) { // there always should be a session, but there's a bug
+ if (session->get_connection() == NULL) {
+ dout(10) << " removed session " << session->info.inst << dendl;
+ mds->sessionmap.remove_session(session);
+ session = NULL;
+ } else {
+ session->clear(); // the client has reconnected; keep the Session, but reset
+ dout(10) << " reset session " << session->info.inst << " (they reconnected)" << dendl;
+ }
+ } else {
+ mds->clog->error() << "replayed stray Session close event for " << client_inst
+ << " from time " << stamp << ", ignoring";
+ }
+ }
+ if (session) {
+ mds->sessionmap.replay_dirty_session(session);
+ } else {
+ mds->sessionmap.replay_advance_version();
+ }
+ ceph_assert(mds->sessionmap.get_version() == cmapv);
+ } else {
+ mds->clog->error() << "ESession.replay sessionmap v " << cmapv
+ << " - 1 > table " << mds->sessionmap.get_version();
+ ceph_assert(g_conf()->mds_wipe_sessions);
+ mds->sessionmap.wipe();
+ mds->sessionmap.set_version(cmapv);
+ }
+
+ if (inos_to_free.size() && inotablev) {
+ if (mds->inotable->get_version() >= inotablev) {
+ dout(10) << "ESession.replay inotable " << mds->inotable->get_version()
+ << " >= " << inotablev << ", noop" << dendl;
+ } else {
+ dout(10) << "ESession.replay inotable " << mds->inotable->get_version()
+ << " < " << inotablev << " " << (open ? "add":"remove") << dendl;
+ ceph_assert(!open); // for now
+ mds->inotable->replay_release_ids(inos_to_free);
+ ceph_assert(mds->inotable->get_version() == inotablev);
+ }
+ }
+
+ update_segment();
+}
+
+void ESession::encode(bufferlist &bl, uint64_t features) const
+{
+ ENCODE_START(6, 5, bl);
+ encode(stamp, bl);
+ encode(client_inst, bl, features);
+ encode(open, bl);
+ encode(cmapv, bl);
+ encode(inos_to_free, bl);
+ encode(inotablev, bl);
+ encode(client_metadata, bl);
+ encode(inos_to_purge, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ESession::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(6, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(client_inst, bl);
+ decode(open, bl);
+ decode(cmapv, bl);
+ decode(inos_to_free, bl);
+ decode(inotablev, bl);
+ if (struct_v == 4) {
+ decode(client_metadata.kv_map, bl);
+ } else if (struct_v >= 5) {
+ decode(client_metadata, bl);
+ }
+ if (struct_v >= 6){
+ decode(inos_to_purge, bl);
+ }
+
+ DECODE_FINISH(bl);
+}
+
+void ESession::dump(Formatter *f) const
+{
+ f->dump_stream("client instance") << client_inst;
+ f->dump_string("open", open ? "true" : "false");
+ f->dump_int("client map version", cmapv);
+ f->dump_stream("inos_to_free") << inos_to_free;
+ f->dump_int("inotable version", inotablev);
+ f->open_object_section("client_metadata");
+ f->dump_stream("inos_to_purge") << inos_to_purge;
+ client_metadata.dump(f);
+ f->close_section(); // client_metadata
+}
+
+void ESession::generate_test_instances(std::list<ESession*>& ls)
+{
+ ls.push_back(new ESession);
+}
+
+// -----------------------
+// ESessions
+
+void ESessions::encode(bufferlist &bl, uint64_t features) const
+{
+ ENCODE_START(2, 1, bl);
+ encode(client_map, bl, features);
+ encode(cmapv, bl);
+ encode(stamp, bl);
+ encode(client_metadata_map, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ESessions::decode_old(bufferlist::const_iterator &bl)
+{
+ using ceph::decode;
+ decode(client_map, bl);
+ decode(cmapv, bl);
+ if (!bl.end())
+ decode(stamp, bl);
+}
+
+void ESessions::decode_new(bufferlist::const_iterator &bl)
+{
+ DECODE_START(2, bl);
+ decode(client_map, bl);
+ decode(cmapv, bl);
+ decode(stamp, bl);
+ if (struct_v >= 2)
+ decode(client_metadata_map, bl);
+ DECODE_FINISH(bl);
+}
+
+void ESessions::dump(Formatter *f) const
+{
+ f->dump_int("client map version", cmapv);
+
+ f->open_array_section("client map");
+ for (map<client_t,entity_inst_t>::const_iterator i = client_map.begin();
+ i != client_map.end(); ++i) {
+ f->open_object_section("client");
+ f->dump_int("client id", i->first.v);
+ f->dump_stream("client entity") << i->second;
+ f->close_section(); // client
+ }
+ f->close_section(); // client map
+}
+
+void ESessions::generate_test_instances(std::list<ESessions*>& ls)
+{
+ ls.push_back(new ESessions());
+}
+
+void ESessions::update_segment()
+{
+ get_segment()->sessionmapv = cmapv;
+}
+
+void ESessions::replay(MDSRank *mds)
+{
+ if (mds->sessionmap.get_version() >= cmapv) {
+ dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
+ << " >= " << cmapv << ", noop" << dendl;
+ } else {
+ dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
+ << " < " << cmapv << dendl;
+ mds->sessionmap.replay_open_sessions(cmapv, client_map, client_metadata_map);
+ }
+ update_segment();
+}
+
+
+// -----------------------
+// ETableServer
+
+void ETableServer::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(3, 3, bl);
+ encode(stamp, bl);
+ encode(table, bl);
+ encode(op, bl);
+ encode(reqid, bl);
+ encode(bymds, bl);
+ encode(mutation, bl);
+ encode(tid, bl);
+ encode(version, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ETableServer::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(table, bl);
+ decode(op, bl);
+ decode(reqid, bl);
+ decode(bymds, bl);
+ decode(mutation, bl);
+ decode(tid, bl);
+ decode(version, bl);
+ DECODE_FINISH(bl);
+}
+
+void ETableServer::dump(Formatter *f) const
+{
+ f->dump_int("table id", table);
+ f->dump_int("op", op);
+ f->dump_int("request id", reqid);
+ f->dump_int("by mds", bymds);
+ f->dump_int("tid", tid);
+ f->dump_int("version", version);
+}
+
+void ETableServer::generate_test_instances(std::list<ETableServer*>& ls)
+{
+ ls.push_back(new ETableServer());
+}
+
+
+void ETableServer::update_segment()
+{
+ get_segment()->tablev[table] = version;
+}
+
+void ETableServer::replay(MDSRank *mds)
+{
+ MDSTableServer *server = mds->get_table_server(table);
+ if (!server)
+ return;
+
+ if (server->get_version() >= version) {
+ dout(10) << "ETableServer.replay " << get_mdstable_name(table)
+ << " " << get_mdstableserver_opname(op)
+ << " event " << version
+ << " <= table " << server->get_version() << dendl;
+ return;
+ }
+
+ dout(10) << " ETableServer.replay " << get_mdstable_name(table)
+ << " " << get_mdstableserver_opname(op)
+ << " event " << version << " - 1 == table " << server->get_version() << dendl;
+ ceph_assert(version-1 == server->get_version());
+
+ switch (op) {
+ case TABLESERVER_OP_PREPARE: {
+ server->_note_prepare(bymds, reqid, true);
+ bufferlist out;
+ server->_prepare(mutation, reqid, bymds, out);
+ mutation = std::move(out);
+ break;
+ }
+ case TABLESERVER_OP_COMMIT:
+ server->_commit(tid, ref_t<MMDSTableRequest>());
+ server->_note_commit(tid, true);
+ break;
+ case TABLESERVER_OP_ROLLBACK:
+ server->_rollback(tid);
+ server->_note_rollback(tid, true);
+ break;
+ case TABLESERVER_OP_SERVER_UPDATE:
+ server->_server_update(mutation);
+ server->_note_server_update(mutation, true);
+ break;
+ default:
+ mds->clog->error() << "invalid tableserver op in ETableServer";
+ mds->damaged();
+ ceph_abort(); // Should be unreachable because damaged() calls respawn()
+ }
+
+ ceph_assert(version == server->get_version());
+ update_segment();
+}
+
+
+// ---------------------
+// ETableClient
+
+void ETableClient::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(3, 3, bl);
+ encode(stamp, bl);
+ encode(table, bl);
+ encode(op, bl);
+ encode(tid, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ETableClient::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(table, bl);
+ decode(op, bl);
+ decode(tid, bl);
+ DECODE_FINISH(bl);
+}
+
+void ETableClient::dump(Formatter *f) const
+{
+ f->dump_int("table", table);
+ f->dump_int("op", op);
+ f->dump_int("tid", tid);
+}
+
+void ETableClient::generate_test_instances(std::list<ETableClient*>& ls)
+{
+ ls.push_back(new ETableClient());
+}
+
+void ETableClient::replay(MDSRank *mds)
+{
+ dout(10) << " ETableClient.replay " << get_mdstable_name(table)
+ << " op " << get_mdstableserver_opname(op)
+ << " tid " << tid << dendl;
+
+ MDSTableClient *client = mds->get_table_client(table);
+ if (!client)
+ return;
+
+ ceph_assert(op == TABLESERVER_OP_ACK);
+ client->got_journaled_ack(tid);
+}
+
+
+// -----------------------
+// ESnap
+/*
+void ESnap::update_segment()
+{
+ get_segment()->tablev[TABLE_SNAP] = version;
+}
+
+void ESnap::replay(MDSRank *mds)
+{
+ if (mds->snaptable->get_version() >= version) {
+ dout(10) << "ESnap.replay event " << version
+ << " <= table " << mds->snaptable->get_version() << dendl;
+ return;
+ }
+
+ dout(10) << " ESnap.replay event " << version
+ << " - 1 == table " << mds->snaptable->get_version() << dendl;
+ ceph_assert(version-1 == mds->snaptable->get_version());
+
+ if (create) {
+ version_t v;
+ snapid_t s = mds->snaptable->create(snap.dirino, snap.name, snap.stamp, &v);
+ ceph_assert(s == snap.snapid);
+ } else {
+ mds->snaptable->remove(snap.snapid);
+ }
+
+ ceph_assert(version == mds->snaptable->get_version());
+}
+*/
+
+
+
+// -----------------------
+// EUpdate
+
+void EUpdate::encode(bufferlist &bl, uint64_t features) const
+{
+ ENCODE_START(4, 4, bl);
+ encode(stamp, bl);
+ encode(type, bl);
+ encode(metablob, bl, features);
+ encode(client_map, bl);
+ encode(cmapv, bl);
+ encode(reqid, bl);
+ encode(had_peers, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EUpdate::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(4, 4, 4, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(type, bl);
+ decode(metablob, bl);
+ decode(client_map, bl);
+ if (struct_v >= 3)
+ decode(cmapv, bl);
+ decode(reqid, bl);
+ decode(had_peers, bl);
+ DECODE_FINISH(bl);
+}
+
+void EUpdate::dump(Formatter *f) const
+{
+ f->open_object_section("metablob");
+ metablob.dump(f);
+ f->close_section(); // metablob
+
+ f->dump_string("type", type);
+ f->dump_int("client map length", client_map.length());
+ f->dump_int("client map version", cmapv);
+ f->dump_stream("reqid") << reqid;
+ f->dump_string("had peers", had_peers ? "true" : "false");
+}
+
+void EUpdate::generate_test_instances(std::list<EUpdate*>& ls)
+{
+ ls.push_back(new EUpdate());
+}
+
+
+void EUpdate::update_segment()
+{
+ auto&& segment = get_segment();
+ metablob.update_segment(segment);
+
+ if (client_map.length())
+ segment->sessionmapv = cmapv;
+
+ if (had_peers)
+ segment->uncommitted_leaders.insert(reqid);
+}
+
+void EUpdate::replay(MDSRank *mds)
+{
+ auto&& segment = get_segment();
+ dout(10) << "EUpdate::replay" << dendl;
+ metablob.replay(mds, segment, EVENT_UPDATE);
+
+ if (had_peers) {
+ dout(10) << "EUpdate.replay " << reqid << " had peers, expecting a matching ECommitted" << dendl;
+ segment->uncommitted_leaders.insert(reqid);
+ set<mds_rank_t> peers;
+ mds->mdcache->add_uncommitted_leader(reqid, segment, peers, true);
+ }
+
+ if (client_map.length()) {
+ if (mds->sessionmap.get_version() >= cmapv) {
+ dout(10) << "EUpdate.replay sessionmap v " << cmapv
+ << " <= table " << mds->sessionmap.get_version() << dendl;
+ } else {
+ dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.get_version()
+ << " < " << cmapv << dendl;
+ // open client sessions?
+ map<client_t,entity_inst_t> cm;
+ map<client_t,client_metadata_t> cmm;
+ auto blp = client_map.cbegin();
+ using ceph::decode;
+ decode(cm, blp);
+ if (!blp.end())
+ decode(cmm, blp);
+ mds->sessionmap.replay_open_sessions(cmapv, cm, cmm);
+ }
+ }
+ update_segment();
+}
+
+
+// ------------------------
+// EOpen
+
+void EOpen::encode(bufferlist &bl, uint64_t features) const {
+ ENCODE_START(4, 3, bl);
+ encode(stamp, bl);
+ encode(metablob, bl, features);
+ encode(inos, bl);
+ encode(snap_inos, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EOpen::decode(bufferlist::const_iterator &bl) {
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(metablob, bl);
+ decode(inos, bl);
+ if (struct_v >= 4)
+ decode(snap_inos, bl);
+ DECODE_FINISH(bl);
+}
+
+void EOpen::dump(Formatter *f) const
+{
+ f->open_object_section("metablob");
+ metablob.dump(f);
+ f->close_section(); // metablob
+ f->open_array_section("inos involved");
+ for (vector<inodeno_t>::const_iterator i = inos.begin();
+ i != inos.end(); ++i) {
+ f->dump_int("ino", *i);
+ }
+ f->close_section(); // inos
+}
+
+void EOpen::generate_test_instances(std::list<EOpen*>& ls)
+{
+ ls.push_back(new EOpen());
+ ls.push_back(new EOpen());
+ ls.back()->add_ino(0);
+}
+
+void EOpen::update_segment()
+{
+ // ??
+}
+
+void EOpen::replay(MDSRank *mds)
+{
+ dout(10) << "EOpen.replay " << dendl;
+ auto&& segment = get_segment();
+ metablob.replay(mds, segment, EVENT_OPEN);
+
+ // note which segments inodes belong to, so we don't have to start rejournaling them
+ for (const auto &ino : inos) {
+ CInode *in = mds->mdcache->get_inode(ino);
+ if (!in) {
+ dout(0) << "EOpen.replay ino " << ino << " not in metablob" << dendl;
+ ceph_assert(in);
+ }
+ segment->open_files.push_back(&in->item_open_file);
+ }
+ for (const auto &vino : snap_inos) {
+ CInode *in = mds->mdcache->get_inode(vino);
+ if (!in) {
+ dout(0) << "EOpen.replay ino " << vino << " not in metablob" << dendl;
+ ceph_assert(in);
+ }
+ segment->open_files.push_back(&in->item_open_file);
+ }
+}
+
+
+// -----------------------
+// ECommitted
+
+void ECommitted::replay(MDSRank *mds)
+{
+ if (mds->mdcache->uncommitted_leaders.count(reqid)) {
+ dout(10) << "ECommitted.replay " << reqid << dendl;
+ mds->mdcache->uncommitted_leaders[reqid].ls->uncommitted_leaders.erase(reqid);
+ mds->mdcache->uncommitted_leaders.erase(reqid);
+ } else {
+ dout(10) << "ECommitted.replay " << reqid << " -- didn't see original op" << dendl;
+ }
+}
+
+void ECommitted::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(3, 3, bl);
+ encode(stamp, bl);
+ encode(reqid, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ECommitted::decode(bufferlist::const_iterator& bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(reqid, bl);
+ DECODE_FINISH(bl);
+}
+
+void ECommitted::dump(Formatter *f) const {
+ f->dump_stream("stamp") << stamp;
+ f->dump_stream("reqid") << reqid;
+}
+
+void ECommitted::generate_test_instances(std::list<ECommitted*>& ls)
+{
+ ls.push_back(new ECommitted);
+ ls.push_back(new ECommitted);
+ ls.back()->stamp = utime_t(1, 2);
+ ls.back()->reqid = metareqid_t(entity_name_t::CLIENT(123), 456);
+}
+
+// -----------------------
+// EPeerUpdate
+
+void link_rollback::encode(bufferlist &bl) const
+{
+ ENCODE_START(3, 2, bl);
+ encode(reqid, bl);
+ encode(ino, bl);
+ encode(was_inc, bl);
+ encode(old_ctime, bl);
+ encode(old_dir_mtime, bl);
+ encode(old_dir_rctime, bl);
+ encode(snapbl, bl);
+ ENCODE_FINISH(bl);
+}
+
+void link_rollback::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+ decode(reqid, bl);
+ decode(ino, bl);
+ decode(was_inc, bl);
+ decode(old_ctime, bl);
+ decode(old_dir_mtime, bl);
+ decode(old_dir_rctime, bl);
+ if (struct_v >= 3)
+ decode(snapbl, bl);
+ DECODE_FINISH(bl);
+}
+
+void link_rollback::dump(Formatter *f) const
+{
+ f->dump_stream("metareqid") << reqid;
+ f->dump_int("ino", ino);
+ f->dump_string("was incremented", was_inc ? "true" : "false");
+ f->dump_stream("old_ctime") << old_ctime;
+ f->dump_stream("old_dir_mtime") << old_dir_mtime;
+ f->dump_stream("old_dir_rctime") << old_dir_rctime;
+}
+
+void link_rollback::generate_test_instances(std::list<link_rollback*>& ls)
+{
+ ls.push_back(new link_rollback());
+}
+
+void rmdir_rollback::encode(bufferlist& bl) const
+{
+ ENCODE_START(3, 2, bl);
+ encode(reqid, bl);
+ encode(src_dir, bl);
+ encode(src_dname, bl);
+ encode(dest_dir, bl);
+ encode(dest_dname, bl);
+ encode(snapbl, bl);
+ ENCODE_FINISH(bl);
+}
+
+void rmdir_rollback::decode(bufferlist::const_iterator& bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+ decode(reqid, bl);
+ decode(src_dir, bl);
+ decode(src_dname, bl);
+ decode(dest_dir, bl);
+ decode(dest_dname, bl);
+ if (struct_v >= 3)
+ decode(snapbl, bl);
+ DECODE_FINISH(bl);
+}
+
+void rmdir_rollback::dump(Formatter *f) const
+{
+ f->dump_stream("metareqid") << reqid;
+ f->dump_stream("source directory") << src_dir;
+ f->dump_string("source dname", src_dname);
+ f->dump_stream("destination directory") << dest_dir;
+ f->dump_string("destination dname", dest_dname);
+}
+
+void rmdir_rollback::generate_test_instances(std::list<rmdir_rollback*>& ls)
+{
+ ls.push_back(new rmdir_rollback());
+}
+
+void rename_rollback::drec::encode(bufferlist &bl) const
+{
+ ENCODE_START(2, 2, bl);
+ encode(dirfrag, bl);
+ encode(dirfrag_old_mtime, bl);
+ encode(dirfrag_old_rctime, bl);
+ encode(ino, bl);
+ encode(remote_ino, bl);
+ encode(dname, bl);
+ encode(remote_d_type, bl);
+ encode(old_ctime, bl);
+ ENCODE_FINISH(bl);
+}
+
+void rename_rollback::drec::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
+ decode(dirfrag, bl);
+ decode(dirfrag_old_mtime, bl);
+ decode(dirfrag_old_rctime, bl);
+ decode(ino, bl);
+ decode(remote_ino, bl);
+ decode(dname, bl);
+ decode(remote_d_type, bl);
+ decode(old_ctime, bl);
+ DECODE_FINISH(bl);
+}
+
+void rename_rollback::drec::dump(Formatter *f) const
+{
+ f->dump_stream("directory fragment") << dirfrag;
+ f->dump_stream("directory old mtime") << dirfrag_old_mtime;
+ f->dump_stream("directory old rctime") << dirfrag_old_rctime;
+ f->dump_int("ino", ino);
+ f->dump_int("remote ino", remote_ino);
+ f->dump_string("dname", dname);
+ uint32_t type = DTTOIF(remote_d_type) & S_IFMT; // convert to type entries
+ string type_string;
+ switch(type) {
+ case S_IFREG:
+ type_string = "file"; break;
+ case S_IFLNK:
+ type_string = "symlink"; break;
+ case S_IFDIR:
+ type_string = "directory"; break;
+ default:
+ type_string = "UNKNOWN-" + stringify((int)type); break;
+ }
+ f->dump_string("remote dtype", type_string);
+ f->dump_stream("old ctime") << old_ctime;
+}
+
+void rename_rollback::drec::generate_test_instances(std::list<drec*>& ls)
+{
+ ls.push_back(new drec());
+ ls.back()->remote_d_type = IFTODT(S_IFREG);
+}
+
+void rename_rollback::encode(bufferlist &bl) const
+{
+ ENCODE_START(3, 2, bl);
+ encode(reqid, bl);
+ encode(orig_src, bl);
+ encode(orig_dest, bl);
+ encode(stray, bl);
+ encode(ctime, bl);
+ encode(srci_snapbl, bl);
+ encode(desti_snapbl, bl);
+ ENCODE_FINISH(bl);
+}
+
+void rename_rollback::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+ decode(reqid, bl);
+ decode(orig_src, bl);
+ decode(orig_dest, bl);
+ decode(stray, bl);
+ decode(ctime, bl);
+ if (struct_v >= 3) {
+ decode(srci_snapbl, bl);
+ decode(desti_snapbl, bl);
+ }
+ DECODE_FINISH(bl);
+}
+
+void rename_rollback::dump(Formatter *f) const
+{
+ f->dump_stream("request id") << reqid;
+ f->open_object_section("original src drec");
+ orig_src.dump(f);
+ f->close_section(); // original src drec
+ f->open_object_section("original dest drec");
+ orig_dest.dump(f);
+ f->close_section(); // original dest drec
+ f->open_object_section("stray drec");
+ stray.dump(f);
+ f->close_section(); // stray drec
+ f->dump_stream("ctime") << ctime;
+}
+
+void rename_rollback::generate_test_instances(std::list<rename_rollback*>& ls)
+{
+ ls.push_back(new rename_rollback());
+ ls.back()->orig_src.remote_d_type = IFTODT(S_IFREG);
+ ls.back()->orig_dest.remote_d_type = IFTODT(S_IFREG);
+ ls.back()->stray.remote_d_type = IFTODT(S_IFREG);
+}
+
+void EPeerUpdate::encode(bufferlist &bl, uint64_t features) const
+{
+ ENCODE_START(3, 3, bl);
+ encode(stamp, bl);
+ encode(type, bl);
+ encode(reqid, bl);
+ encode(leader, bl);
+ encode(op, bl);
+ encode(origop, bl);
+ encode(commit, bl, features);
+ encode(rollback, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EPeerUpdate::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(type, bl);
+ decode(reqid, bl);
+ decode(leader, bl);
+ decode(op, bl);
+ decode(origop, bl);
+ decode(commit, bl);
+ decode(rollback, bl);
+ DECODE_FINISH(bl);
+}
+
+void EPeerUpdate::dump(Formatter *f) const
+{
+ f->open_object_section("metablob");
+ commit.dump(f);
+ f->close_section(); // metablob
+
+ f->dump_int("rollback length", rollback.length());
+ f->dump_string("type", type);
+ f->dump_stream("metareqid") << reqid;
+ f->dump_int("leader", leader);
+ f->dump_int("op", op);
+ f->dump_int("original op", origop);
+}
+
+void EPeerUpdate::generate_test_instances(std::list<EPeerUpdate*>& ls)
+{
+ ls.push_back(new EPeerUpdate());
+}
+
+void EPeerUpdate::replay(MDSRank *mds)
+{
+ MDPeerUpdate *su;
+ auto&& segment = get_segment();
+ switch (op) {
+ case EPeerUpdate::OP_PREPARE:
+ dout(10) << "EPeerUpdate.replay prepare " << reqid << " for mds." << leader
+ << ": applying commit, saving rollback info" << dendl;
+ su = new MDPeerUpdate(origop, rollback);
+ commit.replay(mds, segment, EVENT_PEERUPDATE, su);
+ mds->mdcache->add_uncommitted_peer(reqid, segment, leader, su);
+ break;
+
+ case EPeerUpdate::OP_COMMIT:
+ dout(10) << "EPeerUpdate.replay commit " << reqid << " for mds." << leader << dendl;
+ mds->mdcache->finish_uncommitted_peer(reqid, false);
+ break;
+
+ case EPeerUpdate::OP_ROLLBACK:
+ dout(10) << "EPeerUpdate.replay abort " << reqid << " for mds." << leader
+ << ": applying rollback commit blob" << dendl;
+ commit.replay(mds, segment, EVENT_PEERUPDATE);
+ mds->mdcache->finish_uncommitted_peer(reqid, false);
+ break;
+
+ default:
+ mds->clog->error() << "invalid op in EPeerUpdate";
+ mds->damaged();
+ ceph_abort(); // Should be unreachable because damaged() calls respawn()
+ }
+}
+
+
+// -----------------------
+// ESubtreeMap
+
+void ESubtreeMap::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(6, 5, bl);
+ encode(stamp, bl);
+ encode(metablob, bl, features);
+ encode(subtrees, bl);
+ encode(ambiguous_subtrees, bl);
+ encode(expire_pos, bl);
+ encode(event_seq, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ESubtreeMap::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(metablob, bl);
+ decode(subtrees, bl);
+ if (struct_v >= 4)
+ decode(ambiguous_subtrees, bl);
+ if (struct_v >= 3)
+ decode(expire_pos, bl);
+ if (struct_v >= 6)
+ decode(event_seq, bl);
+ DECODE_FINISH(bl);
+}
+
+void ESubtreeMap::dump(Formatter *f) const
+{
+ f->open_object_section("metablob");
+ metablob.dump(f);
+ f->close_section(); // metablob
+
+ f->open_array_section("subtrees");
+ for(map<dirfrag_t,vector<dirfrag_t> >::const_iterator i = subtrees.begin();
+ i != subtrees.end(); ++i) {
+ f->open_object_section("tree");
+ f->dump_stream("root dirfrag") << i->first;
+ for (vector<dirfrag_t>::const_iterator j = i->second.begin();
+ j != i->second.end(); ++j) {
+ f->dump_stream("bound dirfrag") << *j;
+ }
+ f->close_section(); // tree
+ }
+ f->close_section(); // subtrees
+
+ f->open_array_section("ambiguous subtrees");
+ for(set<dirfrag_t>::const_iterator i = ambiguous_subtrees.begin();
+ i != ambiguous_subtrees.end(); ++i) {
+ f->dump_stream("dirfrag") << *i;
+ }
+ f->close_section(); // ambiguous subtrees
+
+ f->dump_int("expire position", expire_pos);
+}
+
+void ESubtreeMap::generate_test_instances(std::list<ESubtreeMap*>& ls)
+{
+ ls.push_back(new ESubtreeMap());
+}
+
+void ESubtreeMap::replay(MDSRank *mds)
+{
+ if (expire_pos && expire_pos > mds->mdlog->journaler->get_expire_pos())
+ mds->mdlog->journaler->set_expire_pos(expire_pos);
+
+ // suck up the subtree map?
+ if (mds->mdcache->is_subtrees()) {
+ dout(10) << "ESubtreeMap.replay -- i already have import map; verifying" << dendl;
+ int errors = 0;
+
+ for (map<dirfrag_t, vector<dirfrag_t> >::iterator p = subtrees.begin();
+ p != subtrees.end();
+ ++p) {
+ CDir *dir = mds->mdcache->get_dirfrag(p->first);
+ if (!dir) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " subtree root " << p->first << " not in cache";
+ ++errors;
+ continue;
+ }
+
+ if (!mds->mdcache->is_subtree(dir)) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " subtree root " << p->first << " not a subtree in cache";
+ ++errors;
+ continue;
+ }
+ if (dir->get_dir_auth().first != mds->get_nodeid()) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " subtree root " << p->first
+ << " is not mine in cache (it's " << dir->get_dir_auth() << ")";
+ ++errors;
+ continue;
+ }
+
+ for (vector<dirfrag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q)
+ mds->mdcache->get_force_dirfrag(*q, true);
+
+ set<CDir*> bounds;
+ mds->mdcache->get_subtree_bounds(dir, bounds);
+ for (vector<dirfrag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q) {
+ CDir *b = mds->mdcache->get_dirfrag(*q);
+ if (!b) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " subtree " << p->first << " bound " << *q << " not in cache";
+ ++errors;
+ continue;
+ }
+ if (bounds.count(b) == 0) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " subtree " << p->first << " bound " << *q << " not a bound in cache";
+ ++errors;
+ continue;
+ }
+ bounds.erase(b);
+ }
+ for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " subtree " << p->first << " has extra bound in cache " << (*q)->dirfrag();
+ ++errors;
+ }
+
+ if (ambiguous_subtrees.count(p->first)) {
+ if (!mds->mdcache->have_ambiguous_import(p->first)) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " subtree " << p->first << " is ambiguous but is not in our cache";
+ ++errors;
+ }
+ } else {
+ if (mds->mdcache->have_ambiguous_import(p->first)) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " subtree " << p->first << " is not ambiguous but is in our cache";
+ ++errors;
+ }
+ }
+ }
+
+ std::vector<CDir*> dirs;
+ mds->mdcache->get_subtrees(dirs);
+ for (const auto& dir : dirs) {
+ if (dir->get_dir_auth().first != mds->get_nodeid())
+ continue;
+ if (subtrees.count(dir->dirfrag()) == 0) {
+ mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
+ << " does not include cache subtree " << dir->dirfrag();
+ ++errors;
+ }
+ }
+
+ if (errors) {
+ dout(0) << "journal subtrees: " << subtrees << dendl;
+ dout(0) << "journal ambig_subtrees: " << ambiguous_subtrees << dendl;
+ mds->mdcache->show_subtrees();
+ ceph_assert(!g_conf()->mds_debug_subtrees || errors == 0);
+ }
+ return;
+ }
+
+ dout(10) << "ESubtreeMap.replay -- reconstructing (auth) subtree spanning tree" << dendl;
+
+ // first, stick the spanning tree in my cache
+ //metablob.print(*_dout);
+ metablob.replay(mds, get_segment(), EVENT_SUBTREEMAP);
+
+ // restore import/export maps
+ for (map<dirfrag_t, vector<dirfrag_t> >::iterator p = subtrees.begin();
+ p != subtrees.end();
+ ++p) {
+ CDir *dir = mds->mdcache->get_dirfrag(p->first);
+ ceph_assert(dir);
+ if (ambiguous_subtrees.count(p->first)) {
+ // ambiguous!
+ mds->mdcache->add_ambiguous_import(p->first, p->second);
+ mds->mdcache->adjust_bounded_subtree_auth(dir, p->second,
+ mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
+ } else {
+ // not ambiguous
+ mds->mdcache->adjust_bounded_subtree_auth(dir, p->second, mds->get_nodeid());
+ }
+ }
+
+ mds->mdcache->recalc_auth_bits(true);
+
+ mds->mdcache->show_subtrees();
+}
+
+
+
+// -----------------------
+// EFragment
+
+void EFragment::replay(MDSRank *mds)
+{
+ dout(10) << "EFragment.replay " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << dendl;
+
+ std::vector<CDir*> resultfrags;
+ MDSContext::vec waiters;
+
+ // in may be NULL if it wasn't in our cache yet. if it's a prepare
+ // it will be once we replay the metablob , but first we need to
+ // refragment anything we already have in the cache.
+ CInode *in = mds->mdcache->get_inode(ino);
+
+ auto&& segment = get_segment();
+ switch (op) {
+ case OP_PREPARE:
+ mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, orig_frags, segment, &rollback);
+
+ if (in)
+ mds->mdcache->adjust_dir_fragments(in, basefrag, bits, &resultfrags, waiters, true);
+ break;
+
+ case OP_ROLLBACK: {
+ frag_vec_t old_frags;
+ if (in) {
+ in->dirfragtree.get_leaves_under(basefrag, old_frags);
+ if (orig_frags.empty()) {
+ // old format EFragment
+ mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, &resultfrags, waiters, true);
+ } else {
+ for (const auto& fg : orig_frags)
+ mds->mdcache->force_dir_fragment(in, fg);
+ }
+ }
+ mds->mdcache->rollback_uncommitted_fragment(dirfrag_t(ino, basefrag), std::move(old_frags));
+ break;
+ }
+
+ case OP_COMMIT:
+ case OP_FINISH:
+ mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag), op);
+ break;
+
+ default:
+ ceph_abort();
+ }
+
+ metablob.replay(mds, segment, EVENT_FRAGMENT);
+ if (in && g_conf()->mds_debug_frag)
+ in->verify_dirfrags();
+}
+
+void EFragment::encode(bufferlist &bl, uint64_t features) const {
+ ENCODE_START(5, 4, bl);
+ encode(stamp, bl);
+ encode(op, bl);
+ encode(ino, bl);
+ encode(basefrag, bl);
+ encode(bits, bl);
+ encode(metablob, bl, features);
+ encode(orig_frags, bl);
+ encode(rollback, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EFragment::decode(bufferlist::const_iterator &bl) {
+ DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ if (struct_v >= 3)
+ decode(op, bl);
+ decode(ino, bl);
+ decode(basefrag, bl);
+ decode(bits, bl);
+ decode(metablob, bl);
+ if (struct_v >= 5) {
+ decode(orig_frags, bl);
+ decode(rollback, bl);
+ }
+ DECODE_FINISH(bl);
+}
+
+void EFragment::dump(Formatter *f) const
+{
+ /*f->open_object_section("Metablob");
+ metablob.dump(f); // sadly we don't have this; dunno if we'll get it
+ f->close_section();*/
+ f->dump_string("op", op_name(op));
+ f->dump_stream("ino") << ino;
+ f->dump_stream("base frag") << basefrag;
+ f->dump_int("bits", bits);
+}
+
+void EFragment::generate_test_instances(std::list<EFragment*>& ls)
+{
+ ls.push_back(new EFragment);
+ ls.push_back(new EFragment);
+ ls.back()->op = OP_PREPARE;
+ ls.back()->ino = 1;
+ ls.back()->bits = 5;
+}
+
+void dirfrag_rollback::encode(bufferlist &bl) const
+{
+ ENCODE_START(1, 1, bl);
+ encode(*fnode, bl);
+ ENCODE_FINISH(bl);
+}
+
+void dirfrag_rollback::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START(1, bl);
+ {
+ auto _fnode = CDir::allocate_fnode();
+ decode(*_fnode, bl);
+ fnode = std::move(_fnode);
+ }
+ DECODE_FINISH(bl);
+}
+
+
+
+// =========================================================================
+
+// -----------------------
+// EExport
+
+void EExport::replay(MDSRank *mds)
+{
+ dout(10) << "EExport.replay " << base << dendl;
+ auto&& segment = get_segment();
+ metablob.replay(mds, segment, EVENT_EXPORT);
+
+ CDir *dir = mds->mdcache->get_dirfrag(base);
+ ceph_assert(dir);
+
+ set<CDir*> realbounds;
+ for (set<dirfrag_t>::iterator p = bounds.begin();
+ p != bounds.end();
+ ++p) {
+ CDir *bd = mds->mdcache->get_dirfrag(*p);
+ ceph_assert(bd);
+ realbounds.insert(bd);
+ }
+
+ // adjust auth away
+ mds->mdcache->adjust_bounded_subtree_auth(dir, realbounds, CDIR_AUTH_UNDEF);
+
+ mds->mdcache->try_trim_non_auth_subtree(dir);
+}
+
+void EExport::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(4, 3, bl);
+ encode(stamp, bl);
+ encode(metablob, bl, features);
+ encode(base, bl);
+ encode(bounds, bl);
+ encode(target, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EExport::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(metablob, bl);
+ decode(base, bl);
+ decode(bounds, bl);
+ if (struct_v >= 4)
+ decode(target, bl);
+ DECODE_FINISH(bl);
+}
+
+void EExport::dump(Formatter *f) const
+{
+ f->dump_float("stamp", (double)stamp);
+ /*f->open_object_section("Metablob");
+ metablob.dump(f); // sadly we don't have this; dunno if we'll get it
+ f->close_section();*/
+ f->dump_stream("base dirfrag") << base;
+ f->open_array_section("bounds dirfrags");
+ for (set<dirfrag_t>::const_iterator i = bounds.begin();
+ i != bounds.end(); ++i) {
+ f->dump_stream("dirfrag") << *i;
+ }
+ f->close_section(); // bounds dirfrags
+}
+
+void EExport::generate_test_instances(std::list<EExport*>& ls)
+{
+ EExport *sample = new EExport();
+ ls.push_back(sample);
+}
+
+
+// -----------------------
+// EImportStart
+
+void EImportStart::update_segment()
+{
+ get_segment()->sessionmapv = cmapv;
+}
+
+void EImportStart::replay(MDSRank *mds)
+{
+ dout(10) << "EImportStart.replay " << base << " bounds " << bounds << dendl;
+ //metablob.print(*_dout);
+ auto&& segment = get_segment();
+ metablob.replay(mds, segment, EVENT_IMPORTSTART);
+
+ // put in ambiguous import list
+ mds->mdcache->add_ambiguous_import(base, bounds);
+
+ // set auth partially to us so we don't trim it
+ CDir *dir = mds->mdcache->get_dirfrag(base);
+ ceph_assert(dir);
+
+ set<CDir*> realbounds;
+ for (vector<dirfrag_t>::iterator p = bounds.begin();
+ p != bounds.end();
+ ++p) {
+ CDir *bd = mds->mdcache->get_dirfrag(*p);
+ ceph_assert(bd);
+ if (!bd->is_subtree_root())
+ bd->state_clear(CDir::STATE_AUTH);
+ realbounds.insert(bd);
+ }
+
+ mds->mdcache->adjust_bounded_subtree_auth(dir, realbounds,
+ mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
+
+ // open client sessions?
+ if (mds->sessionmap.get_version() >= cmapv) {
+ dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version()
+ << " >= " << cmapv << ", noop" << dendl;
+ } else {
+ dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version()
+ << " < " << cmapv << dendl;
+ map<client_t,entity_inst_t> cm;
+ map<client_t,client_metadata_t> cmm;
+ auto blp = client_map.cbegin();
+ using ceph::decode;
+ decode(cm, blp);
+ if (!blp.end())
+ decode(cmm, blp);
+ mds->sessionmap.replay_open_sessions(cmapv, cm, cmm);
+ }
+ update_segment();
+}
+
+void EImportStart::encode(bufferlist &bl, uint64_t features) const {
+ ENCODE_START(4, 3, bl);
+ encode(stamp, bl);
+ encode(base, bl);
+ encode(metablob, bl, features);
+ encode(bounds, bl);
+ encode(cmapv, bl);
+ encode(client_map, bl);
+ encode(from, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EImportStart::decode(bufferlist::const_iterator &bl) {
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(base, bl);
+ decode(metablob, bl);
+ decode(bounds, bl);
+ decode(cmapv, bl);
+ decode(client_map, bl);
+ if (struct_v >= 4)
+ decode(from, bl);
+ DECODE_FINISH(bl);
+}
+
+void EImportStart::dump(Formatter *f) const
+{
+ f->dump_stream("base dirfrag") << base;
+ f->open_array_section("boundary dirfrags");
+ for (vector<dirfrag_t>::const_iterator iter = bounds.begin();
+ iter != bounds.end(); ++iter) {
+ f->dump_stream("frag") << *iter;
+ }
+ f->close_section();
+}
+
+void EImportStart::generate_test_instances(std::list<EImportStart*>& ls)
+{
+ ls.push_back(new EImportStart);
+}
+
+// -----------------------
+// EImportFinish
+
+void EImportFinish::replay(MDSRank *mds)
+{
+ if (mds->mdcache->have_ambiguous_import(base)) {
+ dout(10) << "EImportFinish.replay " << base << " success=" << success << dendl;
+ if (success) {
+ mds->mdcache->finish_ambiguous_import(base);
+ } else {
+ CDir *dir = mds->mdcache->get_dirfrag(base);
+ ceph_assert(dir);
+ vector<dirfrag_t> bounds;
+ mds->mdcache->get_ambiguous_import_bounds(base, bounds);
+ mds->mdcache->adjust_bounded_subtree_auth(dir, bounds, CDIR_AUTH_UNDEF);
+ mds->mdcache->cancel_ambiguous_import(dir);
+ mds->mdcache->try_trim_non_auth_subtree(dir);
+ }
+ } else {
+ // this shouldn't happen unless this is an old journal
+ dout(10) << "EImportFinish.replay " << base << " success=" << success
+ << " on subtree not marked as ambiguous"
+ << dendl;
+ mds->clog->error() << "failure replaying journal (EImportFinish)";
+ mds->damaged();
+ ceph_abort(); // Should be unreachable because damaged() calls respawn()
+ }
+}
+
+void EImportFinish::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(3, 3, bl);
+ encode(stamp, bl);
+ encode(base, bl);
+ encode(success, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EImportFinish::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ if (struct_v >= 2)
+ decode(stamp, bl);
+ decode(base, bl);
+ decode(success, bl);
+ DECODE_FINISH(bl);
+}
+
+void EImportFinish::dump(Formatter *f) const
+{
+ f->dump_stream("base dirfrag") << base;
+ f->dump_string("success", success ? "true" : "false");
+}
+void EImportFinish::generate_test_instances(std::list<EImportFinish*>& ls)
+{
+ ls.push_back(new EImportFinish);
+ ls.push_back(new EImportFinish);
+ ls.back()->success = true;
+}
+
+
+// ------------------------
+// EResetJournal
+
+void EResetJournal::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(2, 2, bl);
+ encode(stamp, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EResetJournal::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
+ decode(stamp, bl);
+ DECODE_FINISH(bl);
+}
+
+void EResetJournal::dump(Formatter *f) const
+{
+ f->dump_stream("timestamp") << stamp;
+}
+
+void EResetJournal::generate_test_instances(std::list<EResetJournal*>& ls)
+{
+ ls.push_back(new EResetJournal());
+}
+
+void EResetJournal::replay(MDSRank *mds)
+{
+ dout(1) << "EResetJournal" << dendl;
+
+ mds->sessionmap.wipe();
+ mds->inotable->replay_reset();
+
+ if (mds->mdsmap->get_root() == mds->get_nodeid()) {
+ CDir *rootdir = mds->mdcache->get_root()->get_or_open_dirfrag(mds->mdcache, frag_t());
+ mds->mdcache->adjust_subtree_auth(rootdir, mds->get_nodeid());
+ }
+
+ CDir *mydir = mds->mdcache->get_myin()->get_or_open_dirfrag(mds->mdcache, frag_t());
+ mds->mdcache->adjust_subtree_auth(mydir, mds->get_nodeid());
+
+ mds->mdcache->recalc_auth_bits(true);
+
+ mds->mdcache->show_subtrees();
+}
+
+
+void ENoOp::encode(bufferlist &bl, uint64_t features) const
+{
+ ENCODE_START(2, 2, bl);
+ encode(pad_size, bl);
+ uint8_t const pad = 0xff;
+ for (unsigned int i = 0; i < pad_size; ++i) {
+ encode(pad, bl);
+ }
+ ENCODE_FINISH(bl);
+}
+
+
+void ENoOp::decode(bufferlist::const_iterator &bl)
+{
+ DECODE_START(2, bl);
+ decode(pad_size, bl);
+ if (bl.get_remaining() != pad_size) {
+ // This is spiritually an assertion, but expressing in a way that will let
+ // journal debug tools catch it and recognise a malformed entry.
+ throw buffer::end_of_buffer();
+ } else {
+ bl += pad_size;
+ }
+ DECODE_FINISH(bl);
+}
+
+
+void ENoOp::replay(MDSRank *mds)
+{
+ dout(4) << "ENoOp::replay, " << pad_size << " bytes skipped in journal" << dendl;
+}
+
+/**
+ * If re-formatting an old journal that used absolute log position
+ * references as segment sequence numbers, use this function to update
+ * it.
+ *
+ * @param mds
+ * MDSRank instance, just used for logging
+ * @param old_to_new
+ * Map of old journal segment sequence numbers to new journal segment sequence numbers
+ *
+ * @return
+ * True if the event was modified.
+ */
+bool EMetaBlob::rewrite_truncate_finish(MDSRank const *mds,
+ std::map<LogSegment::seq_t, LogSegment::seq_t> const &old_to_new)
+{
+ bool modified = false;
+ map<inodeno_t, LogSegment::seq_t> new_trunc_finish;
+ for (const auto& p : truncate_finish) {
+ auto q = old_to_new.find(p.second);
+ if (q != old_to_new.end()) {
+ dout(20) << __func__ << " applying segment seq mapping "
+ << p.second << " -> " << q->second << dendl;
+ new_trunc_finish.emplace(p.first, q->second);
+ modified = true;
+ } else {
+ dout(20) << __func__ << " no segment seq mapping found for "
+ << p.second << dendl;
+ new_trunc_finish.insert(p);
+ }
+ }
+ truncate_finish.swap(new_trunc_finish);
+
+ return modified;
+}