diff options
Diffstat (limited to 'src/mds/Locker.cc')
-rw-r--r-- | src/mds/Locker.cc | 5964 |
1 files changed, 5964 insertions, 0 deletions
diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc new file mode 100644 index 000000000..b515bcb30 --- /dev/null +++ b/src/mds/Locker.cc @@ -0,0 +1,5964 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "CDir.h" +#include "CDentry.h" +#include "CInode.h" +#include "common/config.h" +#include "events/EOpen.h" +#include "events/EUpdate.h" +#include "Locker.h" +#include "MDBalancer.h" +#include "MDCache.h" +#include "MDLog.h" +#include "MDSRank.h" +#include "MDSMap.h" +#include "messages/MInodeFileCaps.h" +#include "messages/MMDSPeerRequest.h" +#include "Migrator.h" +#include "msg/Messenger.h" +#include "osdc/Objecter.h" + +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_context g_ceph_context +#define dout_prefix _prefix(_dout, mds) +static ostream& _prefix(std::ostream *_dout, MDSRank *mds) { + return *_dout << "mds." << mds->get_nodeid() << ".locker "; +} + + +class LockerContext : public MDSContext { +protected: + Locker *locker; + MDSRank *get_mds() override + { + return locker->mds; + } + +public: + explicit LockerContext(Locker *locker_) : locker(locker_) { + ceph_assert(locker != NULL); + } +}; + +class LockerLogContext : public MDSLogContextBase { +protected: + Locker *locker; + MDSRank *get_mds() override + { + return locker->mds; + } + +public: + explicit LockerLogContext(Locker *locker_) : locker(locker_) { + ceph_assert(locker != NULL); + } +}; + +Locker::Locker(MDSRank *m, MDCache *c) : + need_snapflush_inodes(member_offset(CInode, item_caps)), mds(m), mdcache(c) {} + + +void Locker::dispatch(const cref_t<Message> &m) +{ + + switch (m->get_type()) { + // inter-mds locking + case MSG_MDS_LOCK: + handle_lock(ref_cast<MLock>(m)); + break; + // inter-mds caps + case MSG_MDS_INODEFILECAPS: + handle_inode_file_caps(ref_cast<MInodeFileCaps>(m)); + break; + // client sync + case CEPH_MSG_CLIENT_CAPS: + handle_client_caps(ref_cast<MClientCaps>(m)); + break; + case CEPH_MSG_CLIENT_CAPRELEASE: + handle_client_cap_release(ref_cast<MClientCapRelease>(m)); + break; + case CEPH_MSG_CLIENT_LEASE: + handle_client_lease(ref_cast<MClientLease>(m)); + break; + default: + derr << "locker unknown message " << m->get_type() << dendl; + ceph_abort_msg("locker unknown message"); + } +} + +void Locker::tick() +{ + scatter_tick(); + caps_tick(); +} + +/* + * locks vs rejoin + * + * + * + */ + +void Locker::send_lock_message(SimpleLock *lock, int msg) +{ + for (const auto &it : lock->get_parent()->get_replicas()) { + if (mds->is_cluster_degraded() && + mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN) + continue; + auto m = make_message<MLock>(lock, msg, mds->get_nodeid()); + mds->send_message_mds(m, it.first); + } +} + +void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data) +{ + for (const auto &it : lock->get_parent()->get_replicas()) { + if (mds->is_cluster_degraded() && + mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN) + continue; + auto m = make_message<MLock>(lock, msg, mds->get_nodeid()); + m->set_data(data); + mds->send_message_mds(m, it.first); + } +} + +bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr, + int n, bool want_layout) +{ + dout(10) << __func__ << " " << *mdr << " " << *in << dendl; + // rdlock ancestor snaps + inodeno_t root; + int depth = -1; + bool found_locked = false; + bool found_layout = false; + + if (want_layout) + ceph_assert(n == 0); + + client_t client = mdr->get_client(); + + CInode *t = in; + while (true) { + ++depth; + if (!found_locked && mdr->is_rdlocked(&t->snaplock)) + found_locked = true; + + if (!found_locked) { + if (!t->snaplock.can_rdlock(client)) { + t->snaplock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr)); + goto failed; + } + t->snaplock.get_rdlock(); + mdr->locks.emplace(&t->snaplock, MutationImpl::LockOp::RDLOCK); + dout(20) << " got rdlock on " << t->snaplock << " " << *t << dendl; + } + if (want_layout && !found_layout) { + if (!mdr->is_rdlocked(&t->policylock)) { + if (!t->policylock.can_rdlock(client)) { + t->policylock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr)); + goto failed; + } + t->policylock.get_rdlock(); + mdr->locks.emplace(&t->policylock, MutationImpl::LockOp::RDLOCK); + dout(20) << " got rdlock on " << t->policylock << " " << *t << dendl; + } + if (t->get_projected_inode()->has_layout()) { + mdr->dir_layout = t->get_projected_inode()->layout; + found_layout = true; + } + } + CDentry* pdn = t->get_projected_parent_dn(); + if (!pdn) { + root = t->ino(); + break; + } + t = pdn->get_dir()->get_inode(); + } + + mdr->dir_root[n] = root; + mdr->dir_depth[n] = depth; + return true; + +failed: + dout(10) << __func__ << " failed" << dendl; + + drop_locks(mdr.get(), nullptr); + mdr->drop_local_auth_pins(); + return false; +} + +struct MarkEventOnDestruct { + MDRequestRef& mdr; + std::string_view message; + bool mark_event; + MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) : + mdr(_mdr), + message(_message), + mark_event(true) {} + ~MarkEventOnDestruct() { + if (mark_event) + mdr->mark_event(message); + } +}; + +/* If this function returns false, the mdr has been placed + * on the appropriate wait list */ +bool Locker::acquire_locks(MDRequestRef& mdr, + MutationImpl::LockOpVec& lov, + CInode *auth_pin_freeze, + bool auth_pin_nonblocking) +{ + dout(10) << "acquire_locks " << *mdr << dendl; + + MarkEventOnDestruct marker(mdr, "failed to acquire_locks"); + + client_t client = mdr->get_client(); + + set<MDSCacheObject*> mustpin; // items to authpin + if (auth_pin_freeze) + mustpin.insert(auth_pin_freeze); + + // xlocks + for (size_t i = 0; i < lov.size(); ++i) { + auto& p = lov[i]; + SimpleLock *lock = p.lock; + MDSCacheObject *object = lock->get_parent(); + + if (p.is_xlock()) { + if ((lock->get_type() == CEPH_LOCK_ISNAP || + lock->get_type() == CEPH_LOCK_IPOLICY) && + mds->is_cluster_degraded() && + mdr->is_leader() && + !mdr->is_queued_for_replay()) { + // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout + // get processed in proper order. + bool wait = false; + if (object->is_auth()) { + if (!mdr->is_xlocked(lock)) { + set<mds_rank_t> ls; + object->list_replicas(ls); + for (auto m : ls) { + if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) { + wait = true; + break; + } + } + } + } else { + // if the lock is the latest locked one, it's possible that peer mds got the lock + // while there are recovering mds. + if (!mdr->is_xlocked(lock) || mdr->is_last_locked(lock)) + wait = true; + } + if (wait) { + dout(10) << " must xlock " << *lock << " " << *object + << ", waiting for cluster recovered" << dendl; + mds->locker->drop_locks(mdr.get(), NULL); + mdr->drop_local_auth_pins(); + mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr)); + return false; + } + } + + dout(20) << " must xlock " << *lock << " " << *object << dendl; + + mustpin.insert(object); + + // augment xlock with a versionlock? + if (lock->get_type() == CEPH_LOCK_DN) { + CDentry *dn = static_cast<CDentry*>(object); + if (!dn->is_auth()) + continue; + if (mdr->is_leader()) { + // leader. wrlock versionlock so we can pipeline dentry updates to journal. + lov.add_wrlock(&dn->versionlock, i + 1); + } else { + // peer. exclusively lock the dentry version (i.e. block other journal updates). + // this makes rollback safe. + lov.add_xlock(&dn->versionlock, i + 1); + } + } + if (lock->get_type() >= CEPH_LOCK_IFIRST && lock->get_type() != CEPH_LOCK_IVERSION) { + // inode version lock? + CInode *in = static_cast<CInode*>(object); + if (!in->is_auth()) + continue; + if (mdr->is_leader()) { + // leader. wrlock versionlock so we can pipeline inode updates to journal. + lov.add_wrlock(&in->versionlock, i + 1); + } else { + // peer. exclusively lock the inode version (i.e. block other journal updates). + // this makes rollback safe. + lov.add_xlock(&in->versionlock, i + 1); + } + } + } else if (p.is_wrlock()) { + dout(20) << " must wrlock " << *lock << " " << *object << dendl; + client_t _client = p.is_state_pin() ? lock->get_excl_client() : client; + if (object->is_auth()) { + mustpin.insert(object); + } else if (!object->is_auth() && + !lock->can_wrlock(_client) && // we might have to request a scatter + !mdr->is_peer()) { // if we are peer (remote_wrlock), the leader already authpinned + dout(15) << " will also auth_pin " << *object + << " in case we need to request a scatter" << dendl; + mustpin.insert(object); + } + } else if (p.is_remote_wrlock()) { + dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " " + << *lock << " " << *object << dendl; + mustpin.insert(object); + } else if (p.is_rdlock()) { + + dout(20) << " must rdlock " << *lock << " " << *object << dendl; + if (object->is_auth()) { + mustpin.insert(object); + } else if (!object->is_auth() && + !lock->can_rdlock(client)) { // we might have to request an rdlock + dout(15) << " will also auth_pin " << *object + << " in case we need to request a rdlock" << dendl; + mustpin.insert(object); + } + } else { + ceph_assert(0 == "locker unknown lock operation"); + } + } + + lov.sort_and_merge(); + + // AUTH PINS + map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set) + + // can i auth pin them all now? + marker.message = "failed to authpin local pins"; + for (const auto &p : mustpin) { + MDSCacheObject *object = p; + + dout(10) << " must authpin " << *object << dendl; + + if (mdr->is_auth_pinned(object)) { + if (object != (MDSCacheObject*)auth_pin_freeze) + continue; + if (mdr->more()->is_remote_frozen_authpin) { + if (mdr->more()->rename_inode == auth_pin_freeze) + continue; + // unfreeze auth pin for the wrong inode + mustpin_remote[mdr->more()->rename_inode->authority().first].size(); + } + } + + if (!object->is_auth()) { + if (mdr->lock_cache) { // debug + ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK); + CDentry *dn = mdr->dn[0].back(); + ceph_assert(dn->get_projected_linkage()->is_remote()); + } + + if (object->is_ambiguous_auth()) { + // wait + dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl; + mdr->disable_lock_cache(); + drop_locks(mdr.get()); + mdr->drop_local_auth_pins(); + marker.message = "waiting for single auth, object is being migrated"; + object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr)); + return false; + } + mustpin_remote[object->authority().first].insert(object); + continue; + } + int err = 0; + if (!object->can_auth_pin(&err)) { + if (mdr->lock_cache) { + CDir *dir; + if (CInode *in = dynamic_cast<CInode*>(object)) { + ceph_assert(!in->is_frozen_inode() && !in->is_frozen_auth_pin()); + dir = in->get_projected_parent_dir(); + } else if (CDentry *dn = dynamic_cast<CDentry*>(object)) { + dir = dn->get_dir(); + } else { + ceph_assert(0 == "unknown type of lock parent"); + } + if (dir->get_inode() == mdr->lock_cache->get_dir_inode()) { + // forcibly auth pin if there is lock cache on parent dir + continue; + } + + { // debug + ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK); + CDentry *dn = mdr->dn[0].back(); + ceph_assert(dn->get_projected_linkage()->is_remote()); + } + } + + // wait + mdr->disable_lock_cache(); + drop_locks(mdr.get()); + mdr->drop_local_auth_pins(); + if (auth_pin_nonblocking) { + dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl; + mdr->aborted = true; + return false; + } + if (err == MDSCacheObject::ERR_EXPORTING_TREE) { + marker.message = "failed to authpin, subtree is being exported"; + } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) { + marker.message = "failed to authpin, dir is being fragmented"; + } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) { + marker.message = "failed to authpin, inode is being exported"; + } + dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl; + object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); + + if (mdr->is_any_remote_auth_pin()) + notify_freeze_waiter(object); + + return false; + } + } + + // ok, grab local auth pins + for (const auto& p : mustpin) { + MDSCacheObject *object = p; + if (mdr->is_auth_pinned(object)) { + dout(10) << " already auth_pinned " << *object << dendl; + } else if (object->is_auth()) { + dout(10) << " auth_pinning " << *object << dendl; + mdr->auth_pin(object); + } + } + + // request remote auth_pins + if (!mustpin_remote.empty()) { + marker.message = "requesting remote authpins"; + for (const auto& p : mdr->object_states) { + if (p.second.remote_auth_pinned == MDS_RANK_NONE) + continue; + ceph_assert(p.second.remote_auth_pinned == p.first->authority().first); + auto q = mustpin_remote.find(p.second.remote_auth_pinned); + if (q != mustpin_remote.end()) + q->second.insert(p.first); + } + + for (auto& p : mustpin_remote) { + dout(10) << "requesting remote auth_pins from mds." << p.first << dendl; + + // wait for active auth + if (mds->is_cluster_degraded() && + !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first)) { + dout(10) << " mds." << p.first << " is not active" << dendl; + if (mdr->more()->waiting_on_peer.empty()) + mds->wait_for_active_peer(p.first, new C_MDS_RetryRequest(mdcache, mdr)); + return false; + } + + auto req = make_message<MMDSPeerRequest>(mdr->reqid, mdr->attempt, + MMDSPeerRequest::OP_AUTHPIN); + for (auto& o : p.second) { + dout(10) << " req remote auth_pin of " << *o << dendl; + MDSCacheObjectInfo info; + o->set_object_info(info); + req->get_authpins().push_back(info); + if (o == auth_pin_freeze) + o->set_object_info(req->get_authpin_freeze()); + mdr->pin(o); + } + if (auth_pin_nonblocking) + req->mark_nonblocking(); + else if (!mdr->locks.empty()) + req->mark_notify_blocking(); + + mds->send_message_mds(req, p.first); + + // put in waiting list + auto ret = mdr->more()->waiting_on_peer.insert(p.first); + ceph_assert(ret.second); + } + return false; + } + + // caps i'll need to issue + set<CInode*> issue_set; + bool result = false; + + // acquire locks. + // make sure they match currently acquired locks. + for (const auto& p : lov) { + auto lock = p.lock; + if (p.is_xlock()) { + if (mdr->is_xlocked(lock)) { + dout(10) << " already xlocked " << *lock << " " << *lock->get_parent() << dendl; + continue; + } + if (mdr->locking && lock != mdr->locking) + cancel_locking(mdr.get(), &issue_set); + if (!xlock_start(lock, mdr)) { + marker.message = "failed to xlock, waiting"; + goto out; + } + dout(10) << " got xlock on " << *lock << " " << *lock->get_parent() << dendl; + } else if (p.is_wrlock() || p.is_remote_wrlock()) { + auto it = mdr->locks.find(lock); + if (p.is_remote_wrlock()) { + if (it != mdr->locks.end() && it->is_remote_wrlock()) { + dout(10) << " already remote_wrlocked " << *lock << " " << *lock->get_parent() << dendl; + } else { + if (mdr->locking && lock != mdr->locking) + cancel_locking(mdr.get(), &issue_set); + marker.message = "waiting for remote wrlocks"; + remote_wrlock_start(lock, p.wrlock_target, mdr); + goto out; + } + } + if (p.is_wrlock()) { + if (it != mdr->locks.end() && it->is_wrlock()) { + dout(10) << " already wrlocked " << *lock << " " << *lock->get_parent() << dendl; + continue; + } + client_t _client = p.is_state_pin() ? lock->get_excl_client() : client; + if (p.is_remote_wrlock()) { + // nowait if we have already gotten remote wrlock + if (!wrlock_try(lock, mdr, _client)) { + marker.message = "failed to wrlock, dropping remote wrlock and waiting"; + // can't take the wrlock because the scatter lock is gathering. need to + // release the remote wrlock, so that the gathering process can finish. + ceph_assert(it != mdr->locks.end()); + remote_wrlock_finish(it, mdr.get()); + remote_wrlock_start(lock, p.wrlock_target, mdr); + goto out; + } + } else { + if (!wrlock_start(p, mdr)) { + ceph_assert(!p.is_remote_wrlock()); + marker.message = "failed to wrlock, waiting"; + goto out; + } + } + dout(10) << " got wrlock on " << *lock << " " << *lock->get_parent() << dendl; + } + } else { + if (mdr->is_rdlocked(lock)) { + dout(10) << " already rdlocked " << *lock << " " << *lock->get_parent() << dendl; + continue; + } + + ceph_assert(mdr->is_leader()); + if (lock->needs_recover()) { + if (mds->is_cluster_degraded()) { + if (!mdr->is_queued_for_replay()) { + // see comments in SimpleLock::set_state_rejoin() and + // ScatterLock::encode_state_for_rejoin() + drop_locks(mdr.get()); + mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr)); + dout(10) << " rejoin recovering " << *lock << " " << *lock->get_parent() + << ", waiting for cluster recovered" << dendl; + marker.message = "rejoin recovering lock, waiting for cluster recovered"; + return false; + } + } else { + lock->clear_need_recover(); + } + } + + if (!rdlock_start(lock, mdr)) { + marker.message = "failed to rdlock, waiting"; + goto out; + } + dout(10) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl; + } + } + + mdr->set_mds_stamp(ceph_clock_now()); + result = true; + marker.message = "acquired locks"; + + out: + issue_caps_set(issue_set); + return result; +} + +void Locker::notify_freeze_waiter(MDSCacheObject *o) +{ + CDir *dir = NULL; + if (CInode *in = dynamic_cast<CInode*>(o)) { + if (!in->is_root()) + dir = in->get_parent_dir(); + } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) { + dir = dn->get_dir(); + } else { + dir = dynamic_cast<CDir*>(o); + ceph_assert(dir); + } + if (dir) { + if (dir->is_freezing_dir()) + mdcache->fragment_freeze_inc_num_waiters(dir); + if (dir->is_freezing_tree()) { + while (!dir->is_freezing_tree_root()) + dir = dir->get_parent_dir(); + mdcache->migrator->export_freeze_inc_num_waiters(dir); + } + } +} + +void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry) +{ + for (const auto &p : mut->locks) { + if (!p.is_xlock()) + continue; + MDSCacheObject *obj = p.lock->get_parent(); + ceph_assert(obj->is_auth()); + if (skip_dentry && + (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION)) + continue; + dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl; + p.lock->set_xlock_done(); + } +} + +void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue, + bool drop_rdlocks) +{ + set<mds_rank_t> peers; + + for (auto it = mut->locks.begin(); it != mut->locks.end(); ) { + SimpleLock *lock = it->lock; + MDSCacheObject *obj = lock->get_parent(); + + if (it->is_xlock()) { + if (obj->is_auth()) { + bool ni = false; + xlock_finish(it++, mut, &ni); + if (ni) + pneed_issue->insert(static_cast<CInode*>(obj)); + } else { + ceph_assert(lock->get_sm()->can_remote_xlock); + peers.insert(obj->authority().first); + lock->put_xlock(); + mut->locks.erase(it++); + } + } else if (it->is_wrlock() || it->is_remote_wrlock()) { + if (it->is_remote_wrlock()) { + peers.insert(it->wrlock_target); + it->clear_remote_wrlock(); + } + if (it->is_wrlock()) { + bool ni = false; + wrlock_finish(it++, mut, &ni); + if (ni) + pneed_issue->insert(static_cast<CInode*>(obj)); + } else { + mut->locks.erase(it++); + } + } else if (drop_rdlocks && it->is_rdlock()) { + bool ni = false; + rdlock_finish(it++, mut, &ni); + if (ni) + pneed_issue->insert(static_cast<CInode*>(obj)); + } else { + ++it; + } + } + + if (drop_rdlocks) { + if (mut->lock_cache) { + put_lock_cache(mut->lock_cache); + mut->lock_cache = nullptr; + } + } + + for (set<mds_rank_t>::iterator p = peers.begin(); p != peers.end(); ++p) { + if (!mds->is_cluster_degraded() || + mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) { + dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl; + auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, + MMDSPeerRequest::OP_DROPLOCKS); + mds->send_message_mds(peerreq, *p); + } + } +} + +void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue) +{ + SimpleLock *lock = mut->locking; + ceph_assert(lock); + dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl; + + if (lock->get_parent()->is_auth()) { + bool need_issue = false; + if (lock->get_state() == LOCK_PREXLOCK) { + _finish_xlock(lock, -1, &need_issue); + } else if (lock->get_state() == LOCK_LOCK_XLOCK) { + lock->set_state(LOCK_XLOCKDONE); + eval_gather(lock, true, &need_issue); + } + if (need_issue) + pneed_issue->insert(static_cast<CInode *>(lock->get_parent())); + } + mut->finish_locking(lock); +} + +void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue) +{ + // leftover locks + set<CInode*> my_need_issue; + if (!pneed_issue) + pneed_issue = &my_need_issue; + + if (mut->locking) + cancel_locking(mut, pneed_issue); + _drop_locks(mut, pneed_issue, true); + + if (pneed_issue == &my_need_issue) + issue_caps_set(*pneed_issue); + mut->locking_state = 0; +} + +void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue) +{ + set<CInode*> my_need_issue; + if (!pneed_issue) + pneed_issue = &my_need_issue; + + _drop_locks(mut, pneed_issue, false); + + if (pneed_issue == &my_need_issue) + issue_caps_set(*pneed_issue); +} + +void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut) +{ + set<CInode*> need_issue; + + for (auto it = mut->locks.begin(); it != mut->locks.end(); ) { + if (!it->is_rdlock()) { + ++it; + continue; + } + SimpleLock *lock = it->lock; + // make later mksnap/setlayout (at other mds) wait for this unsafe request + if (lock->get_type() == CEPH_LOCK_ISNAP || + lock->get_type() == CEPH_LOCK_IPOLICY) { + ++it; + continue; + } + bool ni = false; + rdlock_finish(it++, mut, &ni); + if (ni) + need_issue.insert(static_cast<CInode*>(lock->get_parent())); + } + + issue_caps_set(need_issue); +} + +void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut) +{ + set<CInode*> need_issue; + + for (auto it = mut->locks.begin(); it != mut->locks.end(); ) { + SimpleLock *lock = it->lock; + if (lock->get_type() == CEPH_LOCK_IDFT) { + ++it; + continue; + } + bool ni = false; + wrlock_finish(it++, mut, &ni); + if (ni) + need_issue.insert(static_cast<CInode*>(lock->get_parent())); + } + issue_caps_set(need_issue); +} + +class C_MDL_DropCache : public LockerContext { + MDLockCache *lock_cache; +public: + C_MDL_DropCache(Locker *l, MDLockCache *lc) : + LockerContext(l), lock_cache(lc) { } + void finish(int r) override { + locker->drop_locks(lock_cache); + lock_cache->cleanup(); + delete lock_cache; + } +}; + +void Locker::put_lock_cache(MDLockCache* lock_cache) +{ + ceph_assert(lock_cache->ref > 0); + if (--lock_cache->ref > 0) + return; + + ceph_assert(lock_cache->invalidating); + + lock_cache->detach_locks(); + + CInode *diri = lock_cache->get_dir_inode(); + for (auto dir : lock_cache->auth_pinned_dirfrags) { + if (dir->get_inode() != diri) + continue; + dir->enable_frozen_inode(); + } + + mds->queue_waiter(new C_MDL_DropCache(this, lock_cache)); +} + +int Locker::get_cap_bit_for_lock_cache(int op) +{ + switch(op) { + case CEPH_MDS_OP_CREATE: + return CEPH_CAP_DIR_CREATE; + case CEPH_MDS_OP_UNLINK: + return CEPH_CAP_DIR_UNLINK; + default: + ceph_assert(0 == "unsupported operation"); + return 0; + } +} + +void Locker::invalidate_lock_cache(MDLockCache *lock_cache) +{ + ceph_assert(lock_cache->item_cap_lock_cache.is_on_list()); + if (lock_cache->invalidating) { + ceph_assert(!lock_cache->client_cap); + } else { + lock_cache->invalidating = true; + lock_cache->detach_dirfrags(); + } + + Capability *cap = lock_cache->client_cap; + if (cap) { + int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode); + cap->clear_lock_cache_allowed(cap_bit); + if (cap->issued() & cap_bit) + issue_caps(lock_cache->get_dir_inode(), cap); + else + cap = nullptr; + } + + if (!cap) { + lock_cache->item_cap_lock_cache.remove_myself(); + put_lock_cache(lock_cache); + } +} + +void Locker::eval_lock_caches(Capability *cap) +{ + for (auto p = cap->lock_caches.begin(); !p.end(); ) { + MDLockCache *lock_cache = *p; + ++p; + if (!lock_cache->invalidating) + continue; + int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode); + if (!(cap->issued() & cap_bit)) { + lock_cache->item_cap_lock_cache.remove_myself(); + put_lock_cache(lock_cache); + } + } +} + +// ask lock caches to release auth pins +void Locker::invalidate_lock_caches(CDir *dir) +{ + dout(10) << "invalidate_lock_caches on " << *dir << dendl; + auto &lock_caches = dir->lock_caches_with_auth_pins; + while (!lock_caches.empty()) { + invalidate_lock_cache(lock_caches.front()->parent); + } +} + +// ask lock caches to release locks +void Locker::invalidate_lock_caches(SimpleLock *lock) +{ + dout(10) << "invalidate_lock_caches " << *lock << " on " << *lock->get_parent() << dendl; + if (lock->is_cached()) { + auto&& lock_caches = lock->get_active_caches(); + for (auto& lc : lock_caches) + invalidate_lock_cache(lc); + } +} + +void Locker::create_lock_cache(MDRequestRef& mdr, CInode *diri, file_layout_t *dir_layout) +{ + if (mdr->lock_cache) + return; + + client_t client = mdr->get_client(); + int opcode = mdr->client_request->get_op(); + dout(10) << "create_lock_cache for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl; + + if (!diri->is_auth()) { + dout(10) << " dir inode is not auth, noop" << dendl; + return; + } + + if (mdr->has_more() && !mdr->more()->peers.empty()) { + dout(10) << " there are peers requests for " << *mdr << ", noop" << dendl; + return; + } + + Capability *cap = diri->get_client_cap(client); + if (!cap) { + dout(10) << " there is no cap for client." << client << ", noop" << dendl; + return; + } + + for (auto p = cap->lock_caches.begin(); !p.end(); ++p) { + if ((*p)->opcode == opcode) { + dout(10) << " lock cache already exists for " << ceph_mds_op_name(opcode) << ", noop" << dendl; + return; + } + } + + set<MDSCacheObject*> ancestors; + for (CInode *in = diri; ; ) { + CDentry *pdn = in->get_projected_parent_dn(); + if (!pdn) + break; + // ancestors.insert(pdn); + in = pdn->get_dir()->get_inode(); + ancestors.insert(in); + } + + for (auto& p : mdr->object_states) { + if (p.first != diri && !ancestors.count(p.first)) + continue; + auto& stat = p.second; + if (stat.auth_pinned) { + if (!p.first->can_auth_pin()) { + dout(10) << " can't auth_pin(freezing?) lock parent " << *p.first << ", noop" << dendl; + return; + } + if (CInode *in = dynamic_cast<CInode*>(p.first); in->is_parent_projected()) { + CDir *dir = in->get_projected_parent_dir(); + if (!dir->can_auth_pin()) { + dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl; + return; + } + } + } + } + + std::vector<CDir*> dfv; + dfv.reserve(diri->get_num_dirfrags()); + + diri->get_dirfrags(dfv); + for (auto dir : dfv) { + if (!dir->is_auth() || !dir->can_auth_pin()) { + dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl; + return; + } + if (dir->is_any_freezing_or_frozen_inode()) { + dout(10) << " there is freezing/frozen inode in " << *dir << ", noop" << dendl; + return; + } + } + + for (auto& p : mdr->locks) { + MDSCacheObject *obj = p.lock->get_parent(); + if (obj != diri && !ancestors.count(obj)) + continue; + if (!p.lock->is_stable()) { + dout(10) << " unstable " << *p.lock << " on " << *obj << ", noop" << dendl; + return; + } + } + + auto lock_cache = new MDLockCache(cap, opcode); + if (dir_layout) + lock_cache->set_dir_layout(*dir_layout); + cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode)); + + for (auto dir : dfv) { + // prevent subtree migration + lock_cache->auth_pin(dir); + // prevent frozen inode + dir->disable_frozen_inode(); + } + + for (auto& p : mdr->object_states) { + if (p.first != diri && !ancestors.count(p.first)) + continue; + auto& stat = p.second; + if (stat.auth_pinned) + lock_cache->auth_pin(p.first); + else + lock_cache->pin(p.first); + + if (CInode *in = dynamic_cast<CInode*>(p.first)) { + CDentry *pdn = in->get_projected_parent_dn(); + if (pdn) + dfv.push_back(pdn->get_dir()); + } else if (CDentry *dn = dynamic_cast<CDentry*>(p.first)) { + dfv.push_back(dn->get_dir()); + } else { + ceph_assert(0 == "unknown type of lock parent"); + } + } + lock_cache->attach_dirfrags(std::move(dfv)); + + for (auto it = mdr->locks.begin(); it != mdr->locks.end(); ) { + MDSCacheObject *obj = it->lock->get_parent(); + if (obj != diri && !ancestors.count(obj)) { + ++it; + continue; + } + unsigned lock_flag = 0; + if (it->is_wrlock()) { + // skip wrlocks that were added by MDCache::predirty_journal_parent() + if (obj == diri) + lock_flag = MutationImpl::LockOp::WRLOCK; + } else { + ceph_assert(it->is_rdlock()); + lock_flag = MutationImpl::LockOp::RDLOCK; + } + if (lock_flag) { + lock_cache->emplace_lock(it->lock, lock_flag); + mdr->locks.erase(it++); + } else { + ++it; + } + } + lock_cache->attach_locks(); + + lock_cache->ref++; + mdr->lock_cache = lock_cache; +} + +bool Locker::find_and_attach_lock_cache(MDRequestRef& mdr, CInode *diri) +{ + if (mdr->lock_cache) + return true; + + Capability *cap = diri->get_client_cap(mdr->get_client()); + if (!cap) + return false; + + int opcode = mdr->client_request->get_op(); + for (auto p = cap->lock_caches.begin(); !p.end(); ++p) { + MDLockCache *lock_cache = *p; + if (lock_cache->opcode == opcode) { + dout(10) << "found lock cache for " << ceph_mds_op_name(opcode) << " on " << *diri << dendl; + mdr->lock_cache = lock_cache; + mdr->lock_cache->ref++; + return true; + } + } + return false; +} + +// generics + +void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers) +{ + dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl; + ceph_assert(!lock->is_stable()); + + int next = lock->get_next_state(); + + CInode *in = 0; + bool caps = lock->get_cap_shift(); + if (lock->get_type() != CEPH_LOCK_DN) + in = static_cast<CInode *>(lock->get_parent()); + + bool need_issue = false; + + int loner_issued = 0, other_issued = 0, xlocker_issued = 0; + ceph_assert(!caps || in != NULL); + if (caps && in->is_head()) { + in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, + lock->get_cap_shift(), lock->get_cap_mask()); + dout(10) << " next state is " << lock->get_state_name(next) + << " issued/allows loner " << gcap_string(loner_issued) + << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next)) + << " xlocker " << gcap_string(xlocker_issued) + << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next)) + << " other " << gcap_string(other_issued) + << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next)) + << dendl; + + if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) || + (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) || + (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued))) + need_issue = true; + } + +#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH))) + bool auth = lock->get_parent()->is_auth(); + if (!lock->is_gathering() && + (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) && + (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) && + (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) && + (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) && + !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind! + (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 && + (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 && + (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) && + lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds + lock->get_state() != LOCK_MIX_SYNC2 + ) { + dout(7) << "eval_gather finished gather on " << *lock + << " on " << *lock->get_parent() << dendl; + + if (lock->get_sm() == &sm_filelock) { + ceph_assert(in); + if (in->state_test(CInode::STATE_RECOVERING)) { + dout(7) << "eval_gather finished gather, but still recovering" << dendl; + return; + } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + dout(7) << "eval_gather finished gather, but need to recover" << dendl; + mds->mdcache->queue_file_recover(in); + mds->mdcache->do_file_recover(); + return; + } + } + + if (!lock->get_parent()->is_auth()) { + // replica: tell auth + mds_rank_t auth = lock->get_parent()->authority().first; + + if (lock->get_parent()->is_rejoining() && + mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { + dout(7) << "eval_gather finished gather, but still rejoining " + << *lock->get_parent() << dendl; + return; + } + + if (!mds->is_cluster_degraded() || + mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { + switch (lock->get_state()) { + case LOCK_SYNC_LOCK: + mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth); + break; + + case LOCK_MIX_SYNC: + { + auto reply = make_message<MLock>(lock, LOCK_AC_SYNCACK, mds->get_nodeid()); + lock->encode_locked_state(reply->get_data()); + mds->send_message_mds(reply, auth); + next = LOCK_MIX_SYNC2; + (static_cast<ScatterLock *>(lock))->start_flush(); + } + break; + + case LOCK_MIX_SYNC2: + (static_cast<ScatterLock *>(lock))->finish_flush(); + (static_cast<ScatterLock *>(lock))->clear_flushed(); + + case LOCK_SYNC_MIX2: + // do nothing, we already acked + break; + + case LOCK_SYNC_MIX: + { + auto reply = make_message<MLock>(lock, LOCK_AC_MIXACK, mds->get_nodeid()); + mds->send_message_mds(reply, auth); + next = LOCK_SYNC_MIX2; + } + break; + + case LOCK_MIX_LOCK: + { + bufferlist data; + lock->encode_locked_state(data); + mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth); + (static_cast<ScatterLock *>(lock))->start_flush(); + // we'll get an AC_LOCKFLUSHED to complete + } + break; + + default: + ceph_abort(); + } + } + } else { + // auth + + // once the first (local) stage of mix->lock gather complete we can + // gather from replicas + if (lock->get_state() == LOCK_MIX_LOCK && + lock->get_parent()->is_replicated()) { + dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl; + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + lock->set_state(LOCK_MIX_LOCK2); + return; + } + + if (lock->is_dirty() && !lock->is_flushed()) { + scatter_writebehind(static_cast<ScatterLock *>(lock)); + return; + } + lock->clear_flushed(); + + switch (lock->get_state()) { + // to mixed + case LOCK_TSYN_MIX: + case LOCK_SYNC_MIX: + case LOCK_EXCL_MIX: + case LOCK_XSYN_MIX: + in->start_scatter(static_cast<ScatterLock *>(lock)); + if (lock->get_parent()->is_replicated()) { + bufferlist softdata; + lock->encode_locked_state(softdata); + send_lock_message(lock, LOCK_AC_MIX, softdata); + } + (static_cast<ScatterLock *>(lock))->clear_scatter_wanted(); + break; + + case LOCK_XLOCK: + case LOCK_XLOCKDONE: + if (next != LOCK_SYNC) + break; + // fall-thru + + // to sync + case LOCK_EXCL_SYNC: + case LOCK_LOCK_SYNC: + case LOCK_MIX_SYNC: + case LOCK_XSYN_SYNC: + if (lock->get_parent()->is_replicated()) { + bufferlist softdata; + lock->encode_locked_state(softdata); + send_lock_message(lock, LOCK_AC_SYNC, softdata); + } + break; + } + + } + + lock->set_state(next); + + if (lock->get_parent()->is_auth() && + lock->is_stable()) + lock->get_parent()->auth_unpin(lock); + + // drop loner before doing waiters + if (caps && + in->is_head() && + in->is_auth() && + in->get_wanted_loner() != in->get_loner()) { + dout(10) << " trying to drop loner" << dendl; + if (in->try_drop_loner()) { + dout(10) << " dropped loner" << dendl; + need_issue = true; + } + } + + if (pfinishers) + lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK, + *pfinishers); + else + lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK); + + if (caps && in->is_head()) + need_issue = true; + + if (lock->get_parent()->is_auth() && + lock->is_stable()) + try_eval(lock, &need_issue); + } + + if (need_issue) { + if (pneed_issue) + *pneed_issue = true; + else if (in->is_head()) + issue_caps(in); + } + +} + +bool Locker::eval(CInode *in, int mask, bool caps_imported) +{ + bool need_issue = caps_imported; + MDSContext::vec finishers; + + dout(10) << "eval " << mask << " " << *in << dendl; + + // choose loner? + if (in->is_auth() && in->is_head()) { + client_t orig_loner = in->get_loner(); + if (in->choose_ideal_loner()) { + dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl; + need_issue = true; + mask = -1; + } else if (in->get_wanted_loner() != in->get_loner()) { + dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl; + mask = -1; + } + } + + retry: + if (mask & CEPH_LOCK_IFILE) + eval_any(&in->filelock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_IAUTH) + eval_any(&in->authlock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_ILINK) + eval_any(&in->linklock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_IXATTR) + eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_INEST) + eval_any(&in->nestlock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_IFLOCK) + eval_any(&in->flocklock, &need_issue, &finishers, caps_imported); + if (mask & CEPH_LOCK_IPOLICY) + eval_any(&in->policylock, &need_issue, &finishers, caps_imported); + + // drop loner? + if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) { + if (in->try_drop_loner()) { + need_issue = true; + if (in->get_wanted_loner() >= 0) { + dout(10) << "eval end set loner to client." << in->get_loner() << dendl; + bool ok = in->try_set_loner(); + ceph_assert(ok); + mask = -1; + goto retry; + } + } + } + + finish_contexts(g_ceph_context, finishers); + + if (need_issue && in->is_head()) + issue_caps(in); + + dout(10) << "eval done" << dendl; + return need_issue; +} + +class C_Locker_Eval : public LockerContext { + MDSCacheObject *p; + int mask; +public: + C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) { + // We are used as an MDSCacheObject waiter, so should + // only be invoked by someone already holding the big lock. + ceph_assert(ceph_mutex_is_locked_by_me(locker->mds->mds_lock)); + p->get(MDSCacheObject::PIN_PTRWAITER); + } + void finish(int r) override { + locker->try_eval(p, mask); + p->put(MDSCacheObject::PIN_PTRWAITER); + } +}; + +void Locker::try_eval(MDSCacheObject *p, int mask) +{ + // unstable and ambiguous auth? + if (p->is_ambiguous_auth()) { + dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask)); + return; + } + + if (p->is_auth() && p->is_frozen()) { + dout(7) << "try_eval frozen, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask)); + return; + } + + if (mask & CEPH_LOCK_DN) { + ceph_assert(mask == CEPH_LOCK_DN); + bool need_issue = false; // ignore this, no caps on dentries + CDentry *dn = static_cast<CDentry *>(p); + eval_any(&dn->lock, &need_issue); + } else { + CInode *in = static_cast<CInode *>(p); + eval(in, mask); + } +} + +void Locker::try_eval(SimpleLock *lock, bool *pneed_issue) +{ + MDSCacheObject *p = lock->get_parent(); + + // unstable and ambiguous auth? + if (p->is_ambiguous_auth()) { + dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type())); + return; + } + + if (!p->is_auth()) { + dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl; + return; + } + + if (p->is_frozen()) { + dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type())); + return; + } + + /* + * We could have a situation like: + * + * - mds A authpins item on mds B + * - mds B starts to freeze tree containing item + * - mds A tries wrlock_start on A, sends REQSCATTER to B + * - mds B lock is unstable, sets scatter_wanted + * - mds B lock stabilizes, calls try_eval. + * + * We can defer while freezing without causing a deadlock. Honor + * scatter_wanted flag here. This will never get deferred by the + * checks above due to the auth_pin held by the leader. + */ + if (lock->is_scatterlock()) { + ScatterLock *slock = static_cast<ScatterLock *>(lock); + if (slock->get_scatter_wanted() && + slock->get_state() != LOCK_MIX) { + scatter_mix(slock, pneed_issue); + if (!lock->is_stable()) + return; + } else if (slock->get_unscatter_wanted() && + slock->get_state() != LOCK_LOCK) { + simple_lock(slock, pneed_issue); + if (!lock->is_stable()) { + return; + } + } + } + + if (lock->get_type() != CEPH_LOCK_DN && + lock->get_type() != CEPH_LOCK_ISNAP && + lock->get_type() != CEPH_LOCK_IPOLICY && + p->is_freezing()) { + dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl; + p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type())); + return; + } + + eval(lock, pneed_issue); +} + +void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set) +{ + bool need_issue = false; + MDSContext::vec finishers; + + // kick locks now + if (!in->filelock.is_stable()) + eval_gather(&in->filelock, false, &need_issue, &finishers); + if (!in->authlock.is_stable()) + eval_gather(&in->authlock, false, &need_issue, &finishers); + if (!in->linklock.is_stable()) + eval_gather(&in->linklock, false, &need_issue, &finishers); + if (!in->xattrlock.is_stable()) + eval_gather(&in->xattrlock, false, &need_issue, &finishers); + + if (need_issue && in->is_head()) { + if (issue_set) + issue_set->insert(in); + else + issue_caps(in); + } + + finish_contexts(g_ceph_context, finishers); +} + +void Locker::eval_scatter_gathers(CInode *in) +{ + bool need_issue = false; + MDSContext::vec finishers; + + dout(10) << "eval_scatter_gathers " << *in << dendl; + + // kick locks now + if (!in->filelock.is_stable()) + eval_gather(&in->filelock, false, &need_issue, &finishers); + if (!in->nestlock.is_stable()) + eval_gather(&in->nestlock, false, &need_issue, &finishers); + if (!in->dirfragtreelock.is_stable()) + eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers); + + if (need_issue && in->is_head()) + issue_caps(in); + + finish_contexts(g_ceph_context, finishers); +} + +void Locker::eval(SimpleLock *lock, bool *need_issue) +{ + switch (lock->get_type()) { + case CEPH_LOCK_IFILE: + return file_eval(static_cast<ScatterLock*>(lock), need_issue); + case CEPH_LOCK_IDFT: + case CEPH_LOCK_INEST: + return scatter_eval(static_cast<ScatterLock*>(lock), need_issue); + default: + return simple_eval(lock, need_issue); + } +} + + +// ------------------ +// rdlock + +bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon) +{ + // kick the lock + if (lock->is_stable()) { + if (lock->get_parent()->is_auth()) { + if (lock->get_sm() == &sm_scatterlock) { + // not until tempsync is fully implemented + //if (lock->get_parent()->is_replicated()) + //scatter_tempsync((ScatterLock*)lock); + //else + simple_sync(lock); + } else if (lock->get_sm() == &sm_filelock) { + CInode *in = static_cast<CInode*>(lock->get_parent()); + if (lock->get_state() == LOCK_EXCL && + in->get_target_loner() >= 0 && + !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN + file_xsyn(lock); + else + simple_sync(lock); + } else + simple_sync(lock); + return true; + } else { + // request rdlock state change from auth + mds_rank_t auth = lock->get_parent()->authority().first; + if (!mds->is_cluster_degraded() || + mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { + dout(10) << "requesting rdlock from auth on " + << *lock << " on " << *lock->get_parent() << dendl; + mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth); + } + return false; + } + } + if (lock->get_type() == CEPH_LOCK_IFILE) { + CInode *in = static_cast<CInode *>(lock->get_parent()); + if (in->state_test(CInode::STATE_RECOVERING)) { + mds->mdcache->recovery_queue.prioritize(in); + } + } + + return false; +} + +bool Locker::rdlock_try(SimpleLock *lock, client_t client) +{ + dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl; + + // can read? grab ref. + if (lock->can_rdlock(client)) + return true; + + _rdlock_kick(lock, false); + + if (lock->can_rdlock(client)) + return true; + + return false; +} + +bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon) +{ + dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl; + + // client may be allowed to rdlock the same item it has xlocked. + // UNLESS someone passes in as_anon, or we're reading snapped version here. + if (mut->snapid != CEPH_NOSNAP) + as_anon = true; + client_t client = as_anon ? -1 : mut->get_client(); + + CInode *in = 0; + if (lock->get_type() != CEPH_LOCK_DN) + in = static_cast<CInode *>(lock->get_parent()); + + /* + if (!lock->get_parent()->is_auth() && + lock->fw_rdlock_to_auth()) { + mdcache->request_forward(mut, lock->get_parent()->authority().first); + return false; + } + */ + + while (1) { + // can read? grab ref. + if (lock->can_rdlock(client)) { + lock->get_rdlock(); + mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK); + return true; + } + + // hmm, wait a second. + if (in && !in->is_head() && in->is_auth() && + lock->get_state() == LOCK_SNAP_SYNC) { + // okay, we actually need to kick the head's lock to get ourselves synced up. + CInode *head = mdcache->get_inode(in->ino()); + ceph_assert(head); + SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE); + if (hlock->get_state() == LOCK_SYNC) + hlock = head->get_lock(lock->get_type()); + + if (hlock->get_state() != LOCK_SYNC) { + dout(10) << "rdlock_start trying head inode " << *head << dendl; + if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL ** + return false; + // oh, check our lock again then + } + } + + if (!_rdlock_kick(lock, as_anon)) + break; + } + + // wait! + int wait_on; + if (lock->get_parent()->is_auth() && lock->is_stable()) + wait_on = SimpleLock::WAIT_RD; + else + wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry. + dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; + lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut)); + nudge_log(lock); + return false; +} + +void Locker::nudge_log(SimpleLock *lock) +{ + dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl; + if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush + mds->mdlog->flush(); +} + +void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue) +{ + ceph_assert(it->is_rdlock()); + SimpleLock *lock = it->lock; + // drop ref + lock->put_rdlock(); + if (mut) + mut->locks.erase(it); + + dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; + + // last one? + if (!lock->is_rdlocked()) { + if (!lock->is_stable()) + eval_gather(lock, false, pneed_issue); + else if (lock->get_parent()->is_auth()) + try_eval(lock, pneed_issue); + } +} + +bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MDRequestRef& mdr) +{ + dout(10) << __func__ << dendl; + for (const auto& p : lov) { + auto lock = p.lock; + ceph_assert(p.is_rdlock()); + if (!mdr->is_rdlocked(lock) && !rdlock_try(lock, mdr->get_client())) { + lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, + new C_MDS_RetryRequest(mdcache, mdr)); + goto failed; + } + lock->get_rdlock(); + mdr->emplace_lock(lock, MutationImpl::LockOp::RDLOCK); + dout(20) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl; + } + + return true; +failed: + dout(10) << __func__ << " failed" << dendl; + drop_locks(mdr.get(), nullptr); + mdr->drop_local_auth_pins(); + return false; +} + +bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MutationRef& mut) +{ + dout(10) << __func__ << dendl; + for (const auto& p : lov) { + auto lock = p.lock; + ceph_assert(p.is_rdlock()); + if (!lock->can_rdlock(mut->get_client())) + return false; + p.lock->get_rdlock(); + mut->emplace_lock(p.lock, MutationImpl::LockOp::RDLOCK); + } + return true; +} + +// ------------------ +// wrlock + +void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut) +{ + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_wrlock_grab(static_cast<LocalLockC*>(lock), mut); + + dout(7) << "wrlock_force on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->get_wrlock(true); + mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK); +} + +bool Locker::wrlock_try(SimpleLock *lock, const MutationRef& mut, client_t client) +{ + dout(10) << "wrlock_try " << *lock << " on " << *lock->get_parent() << dendl; + if (client == -1) + client = mut->get_client(); + + while (1) { + if (lock->can_wrlock(client)) { + lock->get_wrlock(); + auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK); + it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked + return true; + } + if (!lock->is_stable()) + break; + CInode *in = static_cast<CInode *>(lock->get_parent()); + if (!in->is_auth()) + break; + // caller may already has a log entry open. To avoid calling + // scatter_writebehind or start_scatter. don't change nest lock + // state if it has dirty scatterdata. + if (lock->is_dirty()) + break; + // To avoid calling scatter_writebehind or start_scatter. don't + // change nest lock state to MIX. + ScatterLock *slock = static_cast<ScatterLock*>(lock); + if (slock->get_scatter_wanted() || in->has_subtree_or_exporting_dirfrag()) + break; + + simple_lock(lock); + } + return false; +} + +bool Locker::wrlock_start(const MutationImpl::LockOp &op, MDRequestRef& mut) +{ + SimpleLock *lock = op.lock; + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_wrlock_start(static_cast<LocalLockC*>(lock), mut); + + dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl; + + CInode *in = static_cast<CInode *>(lock->get_parent()); + client_t client = op.is_state_pin() ? lock->get_excl_client() : mut->get_client(); + bool want_scatter = lock->get_parent()->is_auth() && + (in->has_subtree_or_exporting_dirfrag() || + static_cast<ScatterLock*>(lock)->get_scatter_wanted()); + + while (1) { + // wrlock? + if (lock->can_wrlock(client) && + (!want_scatter || lock->get_state() == LOCK_MIX)) { + lock->get_wrlock(); + auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK); + it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked + return true; + } + + if (lock->get_type() == CEPH_LOCK_IFILE && + in->state_test(CInode::STATE_RECOVERING)) { + mds->mdcache->recovery_queue.prioritize(in); + } + + if (!lock->is_stable()) + break; + + if (in->is_auth()) { + if (want_scatter) + scatter_mix(static_cast<ScatterLock*>(lock)); + else + simple_lock(lock); + } else { + // replica. + // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case). + mds_rank_t auth = lock->get_parent()->authority().first; + if (!mds->is_cluster_degraded() || + mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { + dout(10) << "requesting scatter from auth on " + << *lock << " on " << *lock->get_parent() << dendl; + mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth); + } + break; + } + } + + dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; + lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + nudge_log(lock); + + return false; +} + +void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue) +{ + ceph_assert(it->is_wrlock()); + SimpleLock* lock = it->lock; + + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_wrlock_finish(it, mut); + + dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; + lock->put_wrlock(); + + if (it->is_remote_wrlock()) + it->clear_wrlock(); + else + mut->locks.erase(it); + + if (lock->is_wrlocked()) { + // Evaluate unstable lock after scatter_writebehind_finish(). Because + // eval_gather() does not change lock's state when lock is flushing. + if (!lock->is_stable() && lock->is_flushed() && + lock->get_parent()->is_auth()) + eval_gather(lock, false, pneed_issue); + } else { + if (!lock->is_stable()) + eval_gather(lock, false, pneed_issue); + else if (lock->get_parent()->is_auth()) + try_eval(lock, pneed_issue); + } +} + + +// remote wrlock + +void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut) +{ + dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl; + + // wait for active target + if (mds->is_cluster_degraded() && + !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) { + dout(7) << " mds." << target << " is not active" << dendl; + if (mut->more()->waiting_on_peer.empty()) + mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut)); + return; + } + + // send lock request + mut->start_locking(lock, target); + mut->more()->peers.insert(target); + auto r = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_WRLOCK); + r->set_lock_type(lock->get_type()); + lock->get_parent()->set_object_info(r->get_object_info()); + mds->send_message_mds(r, target); + + ceph_assert(mut->more()->waiting_on_peer.count(target) == 0); + mut->more()->waiting_on_peer.insert(target); +} + +void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut) +{ + ceph_assert(it->is_remote_wrlock()); + SimpleLock *lock = it->lock; + mds_rank_t target = it->wrlock_target; + + if (it->is_wrlock()) + it->clear_remote_wrlock(); + else + mut->locks.erase(it); + + dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target + << " " << *lock->get_parent() << dendl; + if (!mds->is_cluster_degraded() || + mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) { + auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_UNWRLOCK); + peerreq->set_lock_type(lock->get_type()); + lock->get_parent()->set_object_info(peerreq->get_object_info()); + mds->send_message_mds(peerreq, target); + } +} + + +// ------------------ +// xlock + +bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut) +{ + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_xlock_start(static_cast<LocalLockC*>(lock), mut); + + dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl; + client_t client = mut->get_client(); + + CInode *in = nullptr; + if (lock->get_cap_shift()) + in = static_cast<CInode *>(lock->get_parent()); + + // auth? + if (lock->get_parent()->is_auth()) { + // auth + while (1) { + if (mut->locking && // started xlock (not preempt other request) + lock->can_xlock(client) && + !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or + in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap + lock->set_state(LOCK_XLOCK); + lock->get_xlock(mut, client); + mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK); + mut->finish_locking(lock); + return true; + } + + if (lock->get_type() == CEPH_LOCK_IFILE && + in->state_test(CInode::STATE_RECOVERING)) { + mds->mdcache->recovery_queue.prioritize(in); + } + + if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE || + lock->get_xlock_by_client() != client || + lock->is_waiter_for(SimpleLock::WAIT_STABLE))) + break; + + if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) { + mut->start_locking(lock); + simple_xlock(lock); + } else { + simple_lock(lock); + } + } + + lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + nudge_log(lock); + return false; + } else { + // replica + ceph_assert(lock->get_sm()->can_remote_xlock); + ceph_assert(!mut->peer_request); + + // wait for single auth + if (lock->get_parent()->is_ambiguous_auth()) { + lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, + new C_MDS_RetryRequest(mdcache, mut)); + return false; + } + + // wait for active auth + mds_rank_t auth = lock->get_parent()->authority().first; + if (mds->is_cluster_degraded() && + !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { + dout(7) << " mds." << auth << " is not active" << dendl; + if (mut->more()->waiting_on_peer.empty()) + mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut)); + return false; + } + + // send lock request + mut->more()->peers.insert(auth); + mut->start_locking(lock, auth); + auto r = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_XLOCK); + r->set_lock_type(lock->get_type()); + lock->get_parent()->set_object_info(r->get_object_info()); + mds->send_message_mds(r, auth); + + ceph_assert(mut->more()->waiting_on_peer.count(auth) == 0); + mut->more()->waiting_on_peer.insert(auth); + + return false; + } +} + +void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue) +{ + ceph_assert(!lock->is_stable()); + if (lock->get_type() != CEPH_LOCK_DN && + lock->get_type() != CEPH_LOCK_ISNAP && + lock->get_type() != CEPH_LOCK_IPOLICY && + lock->get_num_rdlocks() == 0 && + lock->get_num_wrlocks() == 0 && + !lock->is_leased() && + lock->get_state() != LOCK_XLOCKSNAP) { + CInode *in = static_cast<CInode*>(lock->get_parent()); + client_t loner = in->get_target_loner(); + if (loner >= 0 && (xlocker < 0 || xlocker == loner)) { + lock->set_state(LOCK_EXCL); + lock->get_parent()->auth_unpin(lock); + lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD); + if (lock->get_cap_shift()) + *pneed_issue = true; + if (lock->get_parent()->is_auth() && + lock->is_stable()) + try_eval(lock, pneed_issue); + return; + } + } + // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK + eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue); +} + +void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue) +{ + ceph_assert(it->is_xlock()); + SimpleLock *lock = it->lock; + + if (lock->get_type() == CEPH_LOCK_IVERSION || + lock->get_type() == CEPH_LOCK_DVERSION) + return local_xlock_finish(it, mut); + + dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl; + + client_t xlocker = lock->get_xlock_by_client(); + + // drop ref + lock->put_xlock(); + ceph_assert(mut); + mut->locks.erase(it); + + bool do_issue = false; + + // remote xlock? + if (!lock->get_parent()->is_auth()) { + ceph_assert(lock->get_sm()->can_remote_xlock); + + // tell auth + dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl; + mds_rank_t auth = lock->get_parent()->authority().first; + if (!mds->is_cluster_degraded() || + mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { + auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_UNXLOCK); + peerreq->set_lock_type(lock->get_type()); + lock->get_parent()->set_object_info(peerreq->get_object_info()); + mds->send_message_mds(peerreq, auth); + } + // others waiting? + lock->finish_waiters(SimpleLock::WAIT_STABLE | + SimpleLock::WAIT_WR | + SimpleLock::WAIT_RD, 0); + } else { + if (lock->get_num_xlocks() == 0 && + lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock + _finish_xlock(lock, xlocker, &do_issue); + } + } + + if (do_issue) { + CInode *in = static_cast<CInode*>(lock->get_parent()); + if (in->is_head()) { + if (pneed_issue) + *pneed_issue = true; + else + issue_caps(in); + } + } +} + +void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut) +{ + ceph_assert(it->is_xlock()); + SimpleLock *lock = it->lock; + dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl; + + lock->put_xlock(); + mut->locks.erase(it); + + MDSCacheObject *p = lock->get_parent(); + ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode) + + if (!lock->is_stable()) + lock->get_parent()->auth_unpin(lock); + + lock->set_state(LOCK_LOCK); +} + +void Locker::xlock_import(SimpleLock *lock) +{ + dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl; + lock->get_parent()->auth_pin(lock); +} + +void Locker::xlock_downgrade(SimpleLock *lock, MutationImpl *mut) +{ + dout(10) << "xlock_downgrade on " << *lock << " " << *lock->get_parent() << dendl; + auto it = mut->locks.find(lock); + if (it->is_rdlock()) + return; // already downgraded + + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(it != mut->locks.end()); + ceph_assert(it->is_xlock()); + + lock->set_xlock_done(); + lock->get_rdlock(); + xlock_finish(it, mut, nullptr); + mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK); +} + + +// file i/o ----------------------------------------- + +version_t Locker::issue_file_data_version(CInode *in) +{ + dout(7) << "issue_file_data_version on " << *in << dendl; + return in->get_inode()->file_data_version; +} + +class C_Locker_FileUpdate_finish : public LockerLogContext { + CInode *in; + MutationRef mut; + unsigned flags; + client_t client; + ref_t<MClientCaps> ack; +public: + C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f, + const ref_t<MClientCaps> &ack, client_t c=-1) + : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + locker->file_update_finish(in, mut, flags, client, ack); + in->put(CInode::PIN_PTRWAITER); + } +}; + +enum { + UPDATE_SHAREMAX = 1, + UPDATE_NEEDSISSUE = 2, + UPDATE_SNAPFLUSH = 4, +}; + +void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags, + client_t client, const ref_t<MClientCaps> &ack) +{ + dout(10) << "file_update_finish on " << *in << dendl; + + mut->apply(); + + if (ack) { + Session *session = mds->get_session(client); + if (session && !session->is_closed()) { + // "oldest flush tid" > 0 means client uses unique TID for each flush + if (ack->get_oldest_flush_tid() > 0) + session->add_completed_flush(ack->get_client_tid()); + mds->send_message_client_counted(ack, session); + } else { + dout(10) << " no session for client." << client << " " << *ack << dendl; + } + } + + set<CInode*> need_issue; + drop_locks(mut.get(), &need_issue); + + if (in->is_head()) { + if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) { + Capability *cap = in->get_client_cap(client); + if (cap && (cap->wanted() & ~cap->pending())) + issue_caps(in, cap); + } + + if ((flags & UPDATE_SHAREMAX) && in->is_auth() && + (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER))) + share_inode_max_size(in); + + } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) { + dout(10) << " client_snap_caps " << in->client_snap_caps << dendl; + // check for snap writeback completion + in->client_snap_caps.erase(client); + if (in->client_snap_caps.empty()) { + for (int i = 0; i < num_cinode_locks; i++) { + SimpleLock *lock = in->get_lock(cinode_lock_info[i].lock); + ceph_assert(lock); + lock->put_wrlock(); + } + in->item_open_file.remove_myself(); + in->item_caps.remove_myself(); + eval_cap_gather(in, &need_issue); + } + } + issue_caps_set(need_issue); + + mds->balancer->hit_inode(in, META_POP_IWR); + + // auth unpin after issuing caps + mut->cleanup(); +} + +Capability* Locker::issue_new_caps(CInode *in, + int mode, + MDRequestRef& mdr, + SnapRealm *realm) +{ + dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl; + Session *session = mdr->session; + bool new_inode = (mdr->alloc_ino || mdr->used_prealloc_ino); + + // if replay or async, try to reconnect cap, and otherwise do nothing. + if (new_inode && mdr->client_request->is_queued_for_replay()) + return mds->mdcache->try_reconnect_cap(in, session); + + // my needs + ceph_assert(session->info.inst.name.is_client()); + client_t my_client = session->get_client(); + int my_want = ceph_caps_for_mode(mode); + + // register a capability + Capability *cap = in->get_client_cap(my_client); + if (!cap) { + // new cap + cap = in->add_client_cap(my_client, session, realm, new_inode); + cap->set_wanted(my_want); + cap->mark_new(); + } else { + // make sure it wants sufficient caps + if (my_want & ~cap->wanted()) { + // augment wanted caps for this client + cap->set_wanted(cap->wanted() | my_want); + } + } + cap->inc_suppress(); // suppress file cap messages (we'll bundle with the request reply) + + if (in->is_auth()) { + // [auth] twiddle mode? + eval(in, CEPH_CAP_LOCKS); + + int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1; + int allowed = get_allowed_caps(in, cap, all_allowed, loner_allowed, + xlocker_allowed); + + if (_need_flush_mdlog(in, my_want & ~allowed, true)) + mds->mdlog->flush(); + + } else { + // [replica] tell auth about any new caps wanted + request_inode_file_caps(in); + } + + // issue caps (pot. incl new one) + //issue_caps(in); // note: _eval above may have done this already... + + // re-issue whatever we can + //cap->issue(cap->pending()); + + cap->dec_suppress(); + + return cap; +} + +void Locker::issue_caps_set(set<CInode*>& inset) +{ + for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p) + issue_caps(*p); +} + +class C_Locker_RevokeStaleCap : public LockerContext { + CInode *in; + client_t client; +public: + C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) : + LockerContext(l), in(i), client(c) { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + locker->revoke_stale_cap(in, client); + in->put(CInode::PIN_PTRWAITER); + } +}; + +int Locker::get_allowed_caps(CInode *in, Capability *cap, + int &all_allowed, int &loner_allowed, + int &xlocker_allowed) +{ + client_t client = cap->get_client(); + + // allowed caps are determined by the lock mode. + if (all_allowed == -1) + all_allowed = in->get_caps_allowed_by_type(CAP_ANY); + if (loner_allowed == -1) + loner_allowed = in->get_caps_allowed_by_type(CAP_LONER); + if (xlocker_allowed == -1) + xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER); + + client_t loner = in->get_loner(); + if (loner >= 0) { + dout(7) << "get_allowed_caps loner client." << loner + << " allowed=" << ccap_string(loner_allowed) + << ", xlocker allowed=" << ccap_string(xlocker_allowed) + << ", others allowed=" << ccap_string(all_allowed) + << " on " << *in << dendl; + } else { + dout(7) << "get_allowed_caps allowed=" << ccap_string(all_allowed) + << ", xlocker allowed=" << ccap_string(xlocker_allowed) + << " on " << *in << dendl; + } + + // do not issue _new_ bits when size|mtime is projected + int allowed; + if (loner == client) + allowed = loner_allowed; + else + allowed = all_allowed; + + // add in any xlocker-only caps (for locks this client is the xlocker for) + allowed |= xlocker_allowed & in->get_xlocker_mask(client); + if (in->is_dir()) { + allowed &= ~CEPH_CAP_ANY_DIR_OPS; + if (allowed & CEPH_CAP_FILE_EXCL) + allowed |= cap->get_lock_cache_allowed(); + } + + if ((in->get_inode()->inline_data.version != CEPH_INLINE_NONE && + cap->is_noinline()) || + (!in->get_inode()->layout.pool_ns.empty() && + cap->is_nopoolns())) + allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR); + + return allowed; +} + +int Locker::issue_caps(CInode *in, Capability *only_cap) +{ + // count conflicts with + int nissued = 0; + int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1; + + ceph_assert(in->is_head()); + + // client caps + map<client_t, Capability>::iterator it; + if (only_cap) + it = in->client_caps.find(only_cap->get_client()); + else + it = in->client_caps.begin(); + for (; it != in->client_caps.end(); ++it) { + Capability *cap = &it->second; + int allowed = get_allowed_caps(in, cap, all_allowed, loner_allowed, + xlocker_allowed); + int pending = cap->pending(); + int wanted = cap->wanted(); + + dout(20) << " client." << it->first + << " pending " << ccap_string(pending) + << " allowed " << ccap_string(allowed) + << " wanted " << ccap_string(wanted) + << dendl; + + if (!(pending & ~allowed)) { + // skip if suppress or new, and not revocation + if (cap->is_new() || cap->is_suppress() || cap->is_stale()) { + dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl; + continue; + } + } else { + ceph_assert(!cap->is_new()); + if (cap->is_stale()) { + dout(20) << " revoke stale cap from client." << it->first << dendl; + ceph_assert(!cap->is_valid()); + cap->issue(allowed & pending, false); + mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first)); + continue; + } + + if (!cap->is_valid() && (pending & ~CEPH_CAP_PIN)) { + // After stale->resume circle, client thinks it only has CEPH_CAP_PIN. + // mds needs to re-issue caps, then do revocation. + long seq = cap->issue(pending, true); + + dout(7) << " sending MClientCaps to client." << it->first + << " seq " << seq << " re-issue " << ccap_string(pending) << dendl; + + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant); + + auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT, in->ino(), + in->find_snaprealm()->inode->ino(), + cap->get_cap_id(), cap->get_last_seq(), + pending, wanted, 0, cap->get_mseq(), + mds->get_osd_epoch_barrier()); + in->encode_cap_message(m, cap); + + mds->send_message_client_counted(m, cap->get_session()); + } + } + + // notify clients about deleted inode, to make sure they release caps ASAP. + if (in->get_inode()->nlink == 0) + wanted |= CEPH_CAP_LINK_SHARED; + + // are there caps that the client _wants_ and can have, but aren't pending? + // or do we need to revoke? + if ((pending & ~allowed) || // need to revoke ~allowed caps. + ((wanted & allowed) & ~pending) || // missing wanted+allowed caps + !cap->is_valid()) { // after stale->resume circle + // issue + nissued++; + + // include caps that clients generally like, while we're at it. + int likes = in->get_caps_liked(); + int before = pending; + long seq; + if (pending & ~allowed) + seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new. + else + seq = cap->issue((wanted|likes) & allowed, true); + int after = cap->pending(); + + dout(7) << " sending MClientCaps to client." << it->first + << " seq " << seq << " new pending " << ccap_string(after) + << " was " << ccap_string(before) << dendl; + + int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT; + if (op == CEPH_CAP_OP_REVOKE) { + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_revoke); + revoking_caps.push_back(&cap->item_revoking_caps); + revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps); + cap->set_last_revoke_stamp(ceph_clock_now()); + cap->reset_num_revoke_warnings(); + } else { + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant); + } + + auto m = make_message<MClientCaps>(op, in->ino(), + in->find_snaprealm()->inode->ino(), + cap->get_cap_id(), cap->get_last_seq(), + after, wanted, 0, cap->get_mseq(), + mds->get_osd_epoch_barrier()); + in->encode_cap_message(m, cap); + + mds->send_message_client_counted(m, cap->get_session()); + } + + if (only_cap) + break; + } + + return nissued; +} + +void Locker::issue_truncate(CInode *in) +{ + dout(7) << "issue_truncate on " << *in << dendl; + + for (auto &p : in->client_caps) { + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_trunc); + Capability *cap = &p.second; + auto m = make_message<MClientCaps>(CEPH_CAP_OP_TRUNC, + in->ino(), + in->find_snaprealm()->inode->ino(), + cap->get_cap_id(), cap->get_last_seq(), + cap->pending(), cap->wanted(), 0, + cap->get_mseq(), + mds->get_osd_epoch_barrier()); + in->encode_cap_message(m, cap); + mds->send_message_client_counted(m, p.first); + } + + // should we increase max_size? + if (in->is_auth() && in->is_file()) + check_inode_max_size(in); +} + + +void Locker::revoke_stale_cap(CInode *in, client_t client) +{ + dout(7) << __func__ << " client." << client << " on " << *in << dendl; + Capability *cap = in->get_client_cap(client); + if (!cap) + return; + + if (cap->revoking() & CEPH_CAP_ANY_WR) { + CachedStackStringStream css; + mds->evict_client(client.v, false, g_conf()->mds_session_blocklist_on_timeout, *css, nullptr); + return; + } + + cap->revoke(); + + if (in->is_auth() && in->get_inode()->client_ranges.count(cap->get_client())) + in->state_set(CInode::STATE_NEEDSRECOVER); + + if (in->state_test(CInode::STATE_EXPORTINGCAPS)) + return; + + if (!in->filelock.is_stable()) + eval_gather(&in->filelock); + if (!in->linklock.is_stable()) + eval_gather(&in->linklock); + if (!in->authlock.is_stable()) + eval_gather(&in->authlock); + if (!in->xattrlock.is_stable()) + eval_gather(&in->xattrlock); + + if (in->is_auth()) + try_eval(in, CEPH_CAP_LOCKS); + else + request_inode_file_caps(in); +} + +bool Locker::revoke_stale_caps(Session *session) +{ + dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl; + + // invalidate all caps + session->inc_cap_gen(); + + bool ret = true; + std::vector<CInode*> to_eval; + + for (auto p = session->caps.begin(); !p.end(); ) { + Capability *cap = *p; + ++p; + if (!cap->is_notable()) { + // the rest ones are not being revoked and don't have writeable range + // and don't want exclusive caps or want file read/write. They don't + // need recover, they don't affect eval_gather()/try_eval() + break; + } + + int revoking = cap->revoking(); + if (!revoking) + continue; + + if (revoking & CEPH_CAP_ANY_WR) { + ret = false; + break; + } + + int issued = cap->issued(); + CInode *in = cap->get_inode(); + dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl; + int revoked = cap->revoke(); + if (revoked & CEPH_CAP_ANY_DIR_OPS) + eval_lock_caches(cap); + + if (in->is_auth() && + in->get_inode()->client_ranges.count(cap->get_client())) + in->state_set(CInode::STATE_NEEDSRECOVER); + + // eval lock/inode may finish contexts, which may modify other cap's position + // in the session->caps. + to_eval.push_back(in); + } + + for (auto in : to_eval) { + if (in->state_test(CInode::STATE_EXPORTINGCAPS)) + continue; + + if (!in->filelock.is_stable()) + eval_gather(&in->filelock); + if (!in->linklock.is_stable()) + eval_gather(&in->linklock); + if (!in->authlock.is_stable()) + eval_gather(&in->authlock); + if (!in->xattrlock.is_stable()) + eval_gather(&in->xattrlock); + + if (in->is_auth()) + try_eval(in, CEPH_CAP_LOCKS); + else + request_inode_file_caps(in); + } + + return ret; +} + +void Locker::resume_stale_caps(Session *session) +{ + dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl; + + bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED); + for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) { + Capability *cap = *p; + ++p; + if (lazy && !cap->is_notable()) + break; // see revoke_stale_caps() + + CInode *in = cap->get_inode(); + ceph_assert(in->is_head()); + dout(10) << " clearing stale flag on " << *in << dendl; + + if (in->state_test(CInode::STATE_EXPORTINGCAPS)) { + // if export succeeds, the cap will be removed. if export fails, + // we need to re-issue the cap if it's not stale. + in->state_set(CInode::STATE_EVALSTALECAPS); + continue; + } + + if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS)) + issue_caps(in, cap); + } +} + +void Locker::remove_stale_leases(Session *session) +{ + dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl; + xlist<ClientLease*>::iterator p = session->leases.begin(); + while (!p.end()) { + ClientLease *l = *p; + ++p; + CDentry *parent = static_cast<CDentry*>(l->parent); + dout(15) << " removing lease on " << *parent << dendl; + parent->remove_client_lease(l, this); + } +} + + +class C_MDL_RequestInodeFileCaps : public LockerContext { + CInode *in; +public: + C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + if (!in->is_auth()) + locker->request_inode_file_caps(in); + in->put(CInode::PIN_PTRWAITER); + } +}; + +void Locker::request_inode_file_caps(CInode *in) +{ + ceph_assert(!in->is_auth()); + + int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN; + if (wanted != in->replica_caps_wanted) { + // wait for single auth + if (in->is_ambiguous_auth()) { + in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, + new C_MDL_RequestInodeFileCaps(this, in)); + return; + } + + mds_rank_t auth = in->authority().first; + if (mds->is_cluster_degraded() && + mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { + mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in)); + return; + } + + dout(7) << "request_inode_file_caps " << ccap_string(wanted) + << " was " << ccap_string(in->replica_caps_wanted) + << " on " << *in << " to mds." << auth << dendl; + + in->replica_caps_wanted = wanted; + + if (!mds->is_cluster_degraded() || + mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) + mds->send_message_mds(make_message<MInodeFileCaps>(in->ino(), in->replica_caps_wanted), auth); + } +} + +void Locker::handle_inode_file_caps(const cref_t<MInodeFileCaps> &m) +{ + // nobody should be talking to us during recovery. + if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) { + if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) { + mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); + return; + } + ceph_abort_msg("got unexpected message during recovery"); + } + + // ok + CInode *in = mdcache->get_inode(m->get_ino()); + mds_rank_t from = mds_rank_t(m->get_source().num()); + + ceph_assert(in); + ceph_assert(in->is_auth()); + + dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl; + + if (mds->logger) mds->logger->inc(l_mdss_handle_inode_file_caps); + + in->set_mds_caps_wanted(from, m->get_caps()); + + try_eval(in, CEPH_CAP_LOCKS); +} + + +class C_MDL_CheckMaxSize : public LockerContext { + CInode *in; + uint64_t new_max_size; + uint64_t newsize; + utime_t mtime; + +public: + C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size, + uint64_t _newsize, utime_t _mtime) : + LockerContext(l), in(i), + new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime) + { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + if (in->is_auth()) + locker->check_inode_max_size(in, false, new_max_size, newsize, mtime); + in->put(CInode::PIN_PTRWAITER); + } +}; + +uint64_t Locker::calc_new_max_size(const CInode::inode_const_ptr &pi, uint64_t size) +{ + uint64_t new_max = (size + 1) << 1; + uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs; + if (max_inc > 0) { + max_inc *= pi->layout.object_size; + new_max = std::min(new_max, size + max_inc); + } + return round_up_to(new_max, pi->get_layout_size_increment()); +} + +bool Locker::check_client_ranges(CInode *in, uint64_t size) +{ + const auto& latest = in->get_projected_inode(); + uint64_t ms; + if (latest->has_layout()) { + ms = calc_new_max_size(latest, size); + } else { + // Layout-less directories like ~mds0/, have zero size + ms = 0; + } + + auto it = latest->client_ranges.begin(); + for (auto &p : in->client_caps) { + if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) { + if (it == latest->client_ranges.end()) + return true; + if (it->first != p.first) + return true; + if (ms > it->second.range.last) + return true; + ++it; + } + } + return it != latest->client_ranges.end(); +} + +bool Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool *max_increased) +{ + const auto& latest = in->get_projected_inode(); + uint64_t ms; + if (latest->has_layout()) { + ms = calc_new_max_size(latest, size); + } else { + // Layout-less directories like ~mds0/, have zero size + ms = 0; + } + + auto pi = in->_get_projected_inode(); + bool updated = false; + + // increase ranges as appropriate. + // shrink to 0 if no WR|BUFFER caps issued. + auto it = pi->client_ranges.begin(); + for (auto &p : in->client_caps) { + if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) { + while (it != pi->client_ranges.end() && it->first < p.first) { + it = pi->client_ranges.erase(it); + updated = true; + } + + if (it != pi->client_ranges.end() && it->first == p.first) { + if (ms > it->second.range.last) { + it->second.range.last = ms; + updated = true; + if (max_increased) + *max_increased = true; + } + } else { + it = pi->client_ranges.emplace_hint(it, std::piecewise_construct, + std::forward_as_tuple(p.first), + std::forward_as_tuple()); + it->second.range.last = ms; + it->second.follows = in->first - 1; + updated = true; + if (max_increased) + *max_increased = true; + } + p.second.mark_clientwriteable(); + ++it; + } else { + p.second.clear_clientwriteable(); + } + } + while (it != pi->client_ranges.end()) { + it = pi->client_ranges.erase(it); + updated = true; + } + if (updated) { + if (pi->client_ranges.empty()) + in->clear_clientwriteable(); + else + in->mark_clientwriteable(); + } + return updated; +} + +bool Locker::check_inode_max_size(CInode *in, bool force_wrlock, + uint64_t new_max_size, uint64_t new_size, + utime_t new_mtime) +{ + ceph_assert(in->is_auth()); + ceph_assert(in->is_file()); + + const auto& latest = in->get_projected_inode(); + uint64_t size = latest->size; + bool update_size = new_size > 0; + + if (update_size) { + new_size = size = std::max(size, new_size); + new_mtime = std::max(new_mtime, latest->mtime); + if (latest->size == new_size && latest->mtime == new_mtime) + update_size = false; + } + + bool new_ranges = check_client_ranges(in, std::max(new_max_size, size)); + if (!update_size && !new_ranges) { + dout(20) << "check_inode_max_size no-op on " << *in << dendl; + return false; + } + + dout(10) << "check_inode_max_size new_ranges " << new_ranges + << " update_size " << update_size + << " on " << *in << dendl; + + if (in->is_frozen()) { + dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, + new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime)); + return false; + } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) { + // lock? + if (in->filelock.is_stable()) { + if (in->get_target_loner() >= 0) + file_excl(&in->filelock); + else + simple_lock(&in->filelock); + } + if (!in->filelock.can_wrlock(in->get_loner())) { + dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl; + in->filelock.add_waiter(SimpleLock::WAIT_STABLE, + new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime)); + return false; + } + } + + MutationRef mut(new MutationImpl()); + mut->ls = mds->mdlog->get_current_segment(); + + auto pi = in->project_inode(mut); + pi.inode->version = in->pre_dirty(); + + bool max_increased = false; + if (new_ranges && + calc_new_client_ranges(in, std::max(new_max_size, size), &max_increased)) { + dout(10) << "check_inode_max_size client_ranges " + << in->get_previous_projected_inode()->client_ranges + << " -> " << pi.inode->client_ranges << dendl; + } + + if (update_size) { + dout(10) << "check_inode_max_size size " << pi.inode->size << " -> " << new_size << dendl; + pi.inode->size = new_size; + pi.inode->rstat.rbytes = new_size; + dout(10) << "check_inode_max_size mtime " << pi.inode->mtime << " -> " << new_mtime << dendl; + pi.inode->mtime = new_mtime; + if (new_mtime > pi.inode->ctime) { + pi.inode->ctime = new_mtime; + if (new_mtime > pi.inode->rstat.rctime) + pi.inode->rstat.rctime = new_mtime; + } + } + + // use EOpen if the file is still open; otherwise, use EUpdate. + // this is just an optimization to push open files forward into + // newer log segments. + LogEvent *le; + EMetaBlob *metablob; + if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) { + EOpen *eo = new EOpen(mds->mdlog); + eo->add_ino(in->ino()); + metablob = &eo->metablob; + le = eo; + } else { + EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size"); + metablob = &eu->metablob; + le = eu; + } + mds->mdlog->start_entry(le); + + mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY); + // no cow, here! + CDentry *parent = in->get_projected_parent_dn(); + metablob->add_primary_dentry(parent, in, true); + mdcache->journal_dirty_inode(mut.get(), metablob, in); + + mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, + UPDATE_SHAREMAX, ref_t<MClientCaps>())); + wrlock_force(&in->filelock, mut); // wrlock for duration of journal + mut->auth_pin(in); + + // make max_size _increase_ timely + if (max_increased) + mds->mdlog->flush(); + + return true; +} + + +void Locker::share_inode_max_size(CInode *in, Capability *only_cap) +{ + /* + * only share if currently issued a WR cap. if client doesn't have it, + * file_max doesn't matter, and the client will get it if/when they get + * the cap later. + */ + dout(10) << "share_inode_max_size on " << *in << dendl; + map<client_t, Capability>::iterator it; + if (only_cap) + it = in->client_caps.find(only_cap->get_client()); + else + it = in->client_caps.begin(); + for (; it != in->client_caps.end(); ++it) { + const client_t client = it->first; + Capability *cap = &it->second; + if (cap->is_suppress()) + continue; + if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) { + dout(10) << "share_inode_max_size with client." << client << dendl; + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant); + cap->inc_last_seq(); + auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT, + in->ino(), + in->find_snaprealm()->inode->ino(), + cap->get_cap_id(), + cap->get_last_seq(), + cap->pending(), + cap->wanted(), 0, + cap->get_mseq(), + mds->get_osd_epoch_barrier()); + in->encode_cap_message(m, cap); + mds->send_message_client_counted(m, client); + } + if (only_cap) + break; + } +} + +bool Locker::_need_flush_mdlog(CInode *in, int wanted, bool lock_state_any) +{ + /* flush log if caps are wanted by client but corresponding lock is unstable and locked by + * pending mutations. */ + if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) && + (lock_state_any ? in->filelock.is_locked() : in->filelock.is_unstable_and_locked())) || + ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) && + (lock_state_any ? in->authlock.is_locked() : in->authlock.is_unstable_and_locked())) || + ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) && + (lock_state_any ? in->linklock.is_locked() : in->linklock.is_unstable_and_locked())) || + ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) && + (lock_state_any ? in->xattrlock.is_locked() : in->xattrlock.is_unstable_and_locked()))) + return true; + return false; +} + +void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq) +{ + if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) { + dout(10) << " wanted " << ccap_string(cap->wanted()) + << " -> " << ccap_string(wanted) << dendl; + cap->set_wanted(wanted); + } else if (wanted & ~cap->wanted()) { + dout(10) << " wanted " << ccap_string(cap->wanted()) + << " -> " << ccap_string(wanted) + << " (added caps even though we had seq mismatch!)" << dendl; + cap->set_wanted(wanted | cap->wanted()); + } else { + dout(10) << " NOT changing wanted " << ccap_string(cap->wanted()) + << " -> " << ccap_string(wanted) + << " (issue_seq " << issue_seq << " != last_issue " + << cap->get_last_issue() << ")" << dendl; + return; + } + + CInode *cur = cap->get_inode(); + if (!cur->is_auth()) { + request_inode_file_caps(cur); + return; + } + + if (cap->wanted()) { + if (cur->state_test(CInode::STATE_RECOVERING) && + (cap->wanted() & (CEPH_CAP_FILE_RD | + CEPH_CAP_FILE_WR))) { + mds->mdcache->recovery_queue.prioritize(cur); + } + + if (mdcache->open_file_table.should_log_open(cur)) { + ceph_assert(cur->last == CEPH_NOSNAP); + EOpen *le = new EOpen(mds->mdlog); + mds->mdlog->start_entry(le); + le->add_clean_inode(cur); + mds->mdlog->submit_entry(le); + } + } +} + +void Locker::snapflush_nudge(CInode *in) +{ + ceph_assert(in->last != CEPH_NOSNAP); + if (in->client_snap_caps.empty()) + return; + + CInode *head = mdcache->get_inode(in->ino()); + // head inode gets unpinned when snapflush starts. It might get trimmed + // before snapflush finishes. + if (!head) + return; + + ceph_assert(head->is_auth()); + if (head->client_need_snapflush.empty()) + return; + + SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE); + if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) { + hlock = NULL; + for (int i = 0; i < num_cinode_locks; i++) { + SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock); + if (lock->get_state() != LOCK_SYNC && lock->is_stable()) { + hlock = lock; + break; + } + } + } + if (hlock) { + _rdlock_kick(hlock, true); + } else { + // also, requeue, in case of unstable lock + need_snapflush_inodes.push_back(&in->item_caps); + } +} + +void Locker::mark_need_snapflush_inode(CInode *in) +{ + ceph_assert(in->last != CEPH_NOSNAP); + if (!in->item_caps.is_on_list()) { + need_snapflush_inodes.push_back(&in->item_caps); + utime_t now = ceph_clock_now(); + in->last_dirstat_prop = now; + dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl; + } +} + +bool Locker::is_revoking_any_caps_from(client_t client) +{ + auto it = revoking_caps_by_client.find(client); + if (it == revoking_caps_by_client.end()) + return false; + return !it->second.empty(); +} + +void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last) +{ + dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl; + for (auto p = head_in->client_need_snapflush.begin(); + p != head_in->client_need_snapflush.end() && p->first < last; ) { + snapid_t snapid = p->first; + auto &clients = p->second; + ++p; // be careful, q loop below depends on this + + if (clients.count(client)) { + dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl; + CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1); + ceph_assert(sin); + ceph_assert(sin->first <= snapid); + _do_snap_update(sin, snapid, 0, sin->first - 1, client, ref_t<MClientCaps>(), ref_t<MClientCaps>()); + head_in->remove_need_snapflush(sin, snapid, client); + } + } +} + + +bool Locker::should_defer_client_cap_frozen(CInode *in) +{ + if (in->is_frozen()) + return true; + + /* + * This policy needs to be AT LEAST as permissive as allowing a client + * request to go forward, or else a client request can release something, + * the release gets deferred, but the request gets processed and deadlocks + * because when the caps can't get revoked. + * + * No auth_pin implies that there is no unstable lock and @in is not auth + * pinnned by client request. If parent dirfrag is auth pinned by a lock + * cache, later request from lock cache owner may forcibly auth pin the @in. + */ + if (in->is_freezing() && in->get_num_auth_pins() == 0) { + CDir* dir = in->get_parent_dir(); + if (!dir || !dir->is_auth_pinned_by_lock_cache()) + return true; + } + return false; +} + +void Locker::handle_client_caps(const cref_t<MClientCaps> &m) +{ + client_t client = m->get_source().num(); + snapid_t follows = m->get_snap_follows(); + auto op = m->get_op(); + auto dirty = m->get_dirty(); + dout(7) << "handle_client_caps " + << " on " << m->get_ino() + << " tid " << m->get_client_tid() << " follows " << follows + << " op " << ceph_cap_op_name(op) + << " flags 0x" << std::hex << m->flags << std::dec << dendl; + + Session *session = mds->get_session(m); + if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) { + if (!session) { + dout(5) << " no session, dropping " << *m << dendl; + return; + } + if (session->is_closed() || + session->is_closing() || + session->is_killing()) { + dout(7) << " session closed|closing|killing, dropping " << *m << dendl; + return; + } + if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) && + dirty && m->get_client_tid() > 0 && + !session->have_completed_flush(m->get_client_tid())) { + mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty, + op == CEPH_CAP_OP_FLUSHSNAP); + } + mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); + return; + } + + if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps); + if (dirty) { + if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps_dirty); + } + + if (m->get_client_tid() > 0 && session && + session->have_completed_flush(m->get_client_tid())) { + dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid() + << " for client." << client << dendl; + ref_t<MClientCaps> ack; + if (op == CEPH_CAP_OP_FLUSHSNAP) { + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack); + ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier()); + } else { + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack); + ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier()); + } + ack->set_snap_follows(follows); + ack->set_client_tid(m->get_client_tid()); + mds->send_message_client_counted(ack, m->get_connection()); + if (op == CEPH_CAP_OP_FLUSHSNAP) { + return; + } else { + // fall-thru because the message may release some caps + dirty = false; + op = CEPH_CAP_OP_UPDATE; + } + } + + // "oldest flush tid" > 0 means client uses unique TID for each flush + if (m->get_oldest_flush_tid() > 0 && session) { + if (session->trim_completed_flushes(m->get_oldest_flush_tid())) { + mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name); + + if (session->get_num_trim_flushes_warnings() > 0 && + session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes) + session->reset_num_trim_flushes_warnings(); + } else { + if (session->get_num_completed_flushes() >= + (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) { + session->inc_num_trim_flushes_warnings(); + CachedStackStringStream css; + *css << "client." << session->get_client() << " does not advance its oldest_flush_tid (" + << m->get_oldest_flush_tid() << "), " + << session->get_num_completed_flushes() + << " completed flushes recorded in session"; + mds->clog->warn() << css->strv(); + dout(20) << __func__ << " " << css->strv() << dendl; + } + } + } + + CInode *head_in = mdcache->get_inode(m->get_ino()); + if (!head_in) { + if (mds->is_clientreplay()) { + dout(7) << "handle_client_caps on unknown ino " << m->get_ino() + << ", will try again after replayed client requests" << dendl; + mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m)); + return; + } + + /* + * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree + * Sequence of events that cause this are: + * - client sends caps message to mds.a + * - mds finishes subtree migration, send cap export to client + * - mds trim its cache + * - mds receives cap messages from client + */ + dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl; + return; + } + + if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) { + // Pause RADOS operations until we see the required epoch + mds->objecter->set_epoch_barrier(m->osd_epoch_barrier); + } + + if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) { + // Record the barrier so that we will retransmit it to clients + mds->set_osd_epoch_barrier(m->osd_epoch_barrier); + } + + dout(10) << " head inode " << *head_in << dendl; + + Capability *cap = 0; + cap = head_in->get_client_cap(client); + if (!cap) { + dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl; + return; + } + ceph_assert(cap); + + // freezing|frozen? + if (should_defer_client_cap_frozen(head_in)) { + dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl; + head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m)); + return; + } + if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) { + dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq() + << ", dropping" << dendl; + return; + } + + bool need_unpin = false; + + // flushsnap? + if (op == CEPH_CAP_OP_FLUSHSNAP) { + if (!head_in->is_auth()) { + dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl; + goto out; + } + + SnapRealm *realm = head_in->find_snaprealm(); + snapid_t snap = realm->get_snap_following(follows); + dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl; + + auto p = head_in->client_need_snapflush.begin(); + if (p != head_in->client_need_snapflush.end() && p->first < snap) { + head_in->auth_pin(this); // prevent subtree frozen + need_unpin = true; + _do_null_snapflush(head_in, client, snap); + } + + CInode *in = head_in; + if (snap != CEPH_NOSNAP) { + in = mdcache->pick_inode_snap(head_in, snap - 1); + if (in != head_in) + dout(10) << " snapped inode " << *in << dendl; + } + + // we can prepare the ack now, since this FLUSHEDSNAP is independent of any + // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst + // case we get a dup response, so whatever.) + ref_t<MClientCaps> ack; + if (dirty) { + ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier()); + ack->set_snap_follows(follows); + ack->set_client_tid(m->get_client_tid()); + ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); + } + + if (in == head_in || + (head_in->client_need_snapflush.count(snap) && + head_in->client_need_snapflush[snap].count(client))) { + dout(7) << " flushsnap snap " << snap + << " client." << client << " on " << *in << dendl; + + // this cap now follows a later snap (i.e. the one initiating this flush, or later) + if (in == head_in) + cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq(); + + _do_snap_update(in, snap, dirty, follows, client, m, ack); + + if (in != head_in) + head_in->remove_need_snapflush(in, snap, client); + } else { + dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl; + if (ack) { + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack); + mds->send_message_client_counted(ack, m->get_connection()); + } + } + goto out; + } + + if (cap->get_cap_id() != m->get_cap_id()) { + dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl; + } else { + CInode *in = head_in; + if (follows > 0) { + in = mdcache->pick_inode_snap(head_in, follows); + // intermediate snap inodes + while (in != head_in) { + ceph_assert(in->last != CEPH_NOSNAP); + if (in->is_auth() && dirty) { + dout(10) << " updating intermediate snapped inode " << *in << dendl; + _do_cap_update(in, NULL, dirty, follows, m, ref_t<MClientCaps>()); + } + in = mdcache->pick_inode_snap(head_in, in->last); + } + } + + // head inode, and cap + ref_t<MClientCaps> ack; + + int caps = m->get_caps(); + if (caps & ~cap->issued()) { + dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl; + caps &= cap->issued(); + } + + int revoked = cap->confirm_receipt(m->get_seq(), caps); + dout(10) << " follows " << follows + << " retains " << ccap_string(m->get_caps()) + << " dirty " << ccap_string(dirty) + << " on " << *in << dendl; + + if (revoked & CEPH_CAP_ANY_DIR_OPS) + eval_lock_caches(cap); + + // missing/skipped snapflush? + // The client MAY send a snapflush if it is issued WR/EXCL caps, but + // presently only does so when it has actual dirty metadata. But, we + // set up the need_snapflush stuff based on the issued caps. + // We can infer that the client WONT send a FLUSHSNAP once they have + // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap + // update/release). + if (!head_in->client_need_snapflush.empty()) { + if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) && + !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) { + head_in->auth_pin(this); // prevent subtree frozen + need_unpin = true; + _do_null_snapflush(head_in, client); + } else { + dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl; + } + } + if (cap->need_snapflush() && !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) + cap->clear_needsnapflush(); + + if (dirty && in->is_auth()) { + dout(7) << " flush client." << client << " dirty " << ccap_string(dirty) + << " seq " << m->get_seq() << " on " << *in << dendl; + ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(), + m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier()); + ack->set_client_tid(m->get_client_tid()); + ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); + } + + // filter wanted based on what we could ever give out (given auth/replica status) + bool need_flush = m->flags & MClientCaps::FLAG_SYNC; + int new_wanted = m->get_wanted(); + if (new_wanted != cap->wanted()) { + if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) { + // exapnding caps. make sure we aren't waiting for a log flush + need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending()); + } + + adjust_cap_wanted(cap, new_wanted, m->get_issue_seq()); + } + + if (in->is_auth() && + _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush)) { + // updated + eval(in, CEPH_CAP_LOCKS); + + if (!need_flush && (cap->wanted() & ~cap->pending())) + need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending()); + } else { + // no update, ack now. + if (ack) { + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack); + mds->send_message_client_counted(ack, m->get_connection()); + } + + bool did_issue = eval(in, CEPH_CAP_LOCKS); + if (!did_issue && (cap->wanted() & ~cap->pending())) + issue_caps(in, cap); + + if (cap->get_last_seq() == 0 && + (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) { + share_inode_max_size(in, cap); + } + } + + if (need_flush) + mds->mdlog->flush(); + } + + out: + if (need_unpin) + head_in->auth_unpin(this); +} + + +class C_Locker_RetryRequestCapRelease : public LockerContext { + client_t client; + ceph_mds_request_release item; +public: + C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) : + LockerContext(l), client(c), item(it) { } + void finish(int r) override { + string dname; + MDRequestRef null_ref; + locker->process_request_cap_release(null_ref, client, item, dname); + } +}; + +void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item, + std::string_view dname) +{ + inodeno_t ino = (uint64_t)item.ino; + uint64_t cap_id = item.cap_id; + int caps = item.caps; + int wanted = item.wanted; + int seq = item.seq; + int issue_seq = item.issue_seq; + int mseq = item.mseq; + + CInode *in = mdcache->get_inode(ino); + if (!in) + return; + + if (dname.length()) { + frag_t fg = in->pick_dirfrag(dname); + CDir *dir = in->get_dirfrag(fg); + if (dir) { + CDentry *dn = dir->lookup(dname); + if (dn) { + ClientLease *l = dn->get_client_lease(client); + if (l) { + dout(10) << __func__ << " removing lease on " << *dn << dendl; + dn->remove_client_lease(l, this); + } else { + dout(7) << __func__ << " client." << client + << " doesn't have lease on " << *dn << dendl; + } + } else { + dout(7) << __func__ << " client." << client << " released lease on dn " + << dir->dirfrag() << "/" << dname << " which dne" << dendl; + } + } + } + + Capability *cap = in->get_client_cap(client); + if (!cap) + return; + + dout(10) << __func__ << " client." << client << " " << ccap_string(caps) << " on " << *in + << (mdr ? "" : " (DEFERRED, no mdr)") + << dendl; + + if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { + dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl; + return; + } + + if (cap->get_cap_id() != cap_id) { + dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl; + return; + } + + if (should_defer_client_cap_frozen(in)) { + dout(7) << " frozen, deferring" << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item)); + return; + } + + if (mds->logger) mds->logger->inc(l_mdss_process_request_cap_release); + + if (caps & ~cap->issued()) { + dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl; + caps &= cap->issued(); + } + int revoked = cap->confirm_receipt(seq, caps); + if (revoked & CEPH_CAP_ANY_DIR_OPS) + eval_lock_caches(cap); + + if (!in->client_need_snapflush.empty() && + (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) { + _do_null_snapflush(in, client); + } + + adjust_cap_wanted(cap, wanted, issue_seq); + + if (mdr) + cap->inc_suppress(); + eval(in, CEPH_CAP_LOCKS); + if (mdr) + cap->dec_suppress(); + + // take note; we may need to reissue on this cap later + if (mdr) + mdr->cap_releases[in->vino()] = cap->get_last_seq(); +} + +class C_Locker_RetryKickIssueCaps : public LockerContext { + CInode *in; + client_t client; + ceph_seq_t seq; +public: + C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) : + LockerContext(l), in(i), client(c), seq(s) { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + locker->kick_issue_caps(in, client, seq); + in->put(CInode::PIN_PTRWAITER); + } +}; + +void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq) +{ + Capability *cap = in->get_client_cap(client); + if (!cap || cap->get_last_seq() != seq) + return; + if (in->is_frozen()) { + dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, + new C_Locker_RetryKickIssueCaps(this, in, client, seq)); + return; + } + dout(10) << "kick_issue_caps released at current seq " << seq + << ", reissuing" << dendl; + issue_caps(in, cap); +} + +void Locker::kick_cap_releases(MDRequestRef& mdr) +{ + client_t client = mdr->get_client(); + for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin(); + p != mdr->cap_releases.end(); + ++p) { + CInode *in = mdcache->get_inode(p->first); + if (!in) + continue; + kick_issue_caps(in, client, p->second); + } +} + +/** + * m and ack might be NULL, so don't dereference them unless dirty != 0 + */ +void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack) +{ + dout(10) << "_do_snap_update dirty " << ccap_string(dirty) + << " follows " << follows << " snap " << snap + << " on " << *in << dendl; + + if (snap == CEPH_NOSNAP) { + // hmm, i guess snap was already deleted? just ack! + dout(10) << " wow, the snap following " << follows + << " was already deleted. nothing to record, just ack." << dendl; + if (ack) { + if (ack->get_op() == CEPH_CAP_OP_FLUSHSNAP_ACK) { + if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack); + } + mds->send_message_client_counted(ack, m->get_connection()); + } + return; + } + + EUpdate *le = new EUpdate(mds->mdlog, "snap flush"); + mds->mdlog->start_entry(le); + MutationRef mut = new MutationImpl(); + mut->ls = mds->mdlog->get_current_segment(); + + // normal metadata updates that we can apply to the head as well. + + // update xattrs? + CInode::mempool_xattr_map *px = nullptr; + bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) && + m->xattrbl.length() && + m->head.xattr_version > in->get_projected_inode()->xattr_version; + + CInode::mempool_old_inode *oi = nullptr; + CInode::old_inode_map_ptr _old_inodes; + if (in->is_any_old_inodes()) { + auto last = in->pick_old_inode(snap); + if (last) { + _old_inodes = CInode::allocate_old_inode_map(*in->get_old_inodes()); + oi = &_old_inodes->at(last); + if (snap > oi->first) { + (*_old_inodes)[snap - 1] = *oi;; + oi->first = snap; + } + } + } + + CInode::mempool_inode *i; + if (oi) { + dout(10) << " writing into old inode" << dendl; + auto pi = in->project_inode(mut); + pi.inode->version = in->pre_dirty(); + i = &oi->inode; + if (xattrs) + px = &oi->xattrs; + } else { + auto pi = in->project_inode(mut, xattrs); + pi.inode->version = in->pre_dirty(); + i = pi.inode.get(); + if (xattrs) + px = pi.xattrs.get(); + } + + _update_cap_fields(in, dirty, m, i); + + // xattr + if (xattrs) { + dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version + << " len " << m->xattrbl.length() << dendl; + i->xattr_version = m->head.xattr_version; + auto p = m->xattrbl.cbegin(); + decode(*px, p); + } + + { + auto it = i->client_ranges.find(client); + if (it != i->client_ranges.end()) { + if (in->last == snap) { + dout(10) << " removing client_range entirely" << dendl; + i->client_ranges.erase(it); + } else { + dout(10) << " client_range now follows " << snap << dendl; + it->second.follows = snap; + } + } + } + + if (_old_inodes) + in->reset_old_inodes(std::move(_old_inodes)); + + mut->auth_pin(in); + mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); + mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); + + // "oldest flush tid" > 0 means client uses unique TID for each flush + if (ack && ack->get_oldest_flush_tid() > 0) + le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), + ack->get_oldest_flush_tid()); + + mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH, + ack, client)); +} + +void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t<MClientCaps> &m, CInode::mempool_inode *pi) +{ + if (dirty == 0) + return; + + /* m must be valid if there are dirty caps */ + ceph_assert(m); + uint64_t features = m->get_connection()->get_features(); + + if (m->get_ctime() > pi->ctime) { + dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime() + << " for " << *in << dendl; + pi->ctime = m->get_ctime(); + if (m->get_ctime() > pi->rstat.rctime) + pi->rstat.rctime = m->get_ctime(); + } + + if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) && + m->get_change_attr() > pi->change_attr) { + dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr() + << " for " << *in << dendl; + pi->change_attr = m->get_change_attr(); + } + + // file + if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { + utime_t atime = m->get_atime(); + utime_t mtime = m->get_mtime(); + uint64_t size = m->get_size(); + version_t inline_version = m->inline_version; + + if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) || + ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) { + dout(7) << " mtime " << pi->mtime << " -> " << mtime + << " for " << *in << dendl; + pi->mtime = mtime; + if (mtime > pi->rstat.rctime) + pi->rstat.rctime = mtime; + } + if (in->is_file() && // ONLY if regular file + size > pi->size) { + dout(7) << " size " << pi->size << " -> " << size + << " for " << *in << dendl; + pi->size = size; + pi->rstat.rbytes = size; + } + if (in->is_file() && + (dirty & CEPH_CAP_FILE_WR) && + inline_version > pi->inline_data.version) { + pi->inline_data.version = inline_version; + if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0) + pi->inline_data.set_data(m->inline_data); + else + pi->inline_data.free_data(); + } + if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) { + dout(7) << " atime " << pi->atime << " -> " << atime + << " for " << *in << dendl; + pi->atime = atime; + } + if ((dirty & CEPH_CAP_FILE_EXCL) && + ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) { + dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq() + << " for " << *in << dendl; + pi->time_warp_seq = m->get_time_warp_seq(); + } + } + // auth + if (dirty & CEPH_CAP_AUTH_EXCL) { + if (m->head.uid != pi->uid) { + dout(7) << " uid " << pi->uid + << " -> " << m->head.uid + << " for " << *in << dendl; + pi->uid = m->head.uid; + } + if (m->head.gid != pi->gid) { + dout(7) << " gid " << pi->gid + << " -> " << m->head.gid + << " for " << *in << dendl; + pi->gid = m->head.gid; + } + if (m->head.mode != pi->mode) { + dout(7) << " mode " << oct << pi->mode + << " -> " << m->head.mode << dec + << " for " << *in << dendl; + pi->mode = m->head.mode; + } + if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) { + dout(7) << " btime " << oct << pi->btime + << " -> " << m->get_btime() << dec + << " for " << *in << dendl; + pi->btime = m->get_btime(); + } + } +} + +/* + * update inode based on cap flush|flushsnap|wanted. + * adjust max_size, if needed. + * if we update, return true; otherwise, false (no updated needed). + */ +bool Locker::_do_cap_update(CInode *in, Capability *cap, + int dirty, snapid_t follows, + const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack, + bool *need_flush) +{ + dout(10) << "_do_cap_update dirty " << ccap_string(dirty) + << " issued " << ccap_string(cap ? cap->issued() : 0) + << " wanted " << ccap_string(cap ? cap->wanted() : 0) + << " on " << *in << dendl; + ceph_assert(in->is_auth()); + client_t client = m->get_source().num(); + const auto& latest = in->get_projected_inode(); + + // increase or zero max_size? + uint64_t size = m->get_size(); + bool change_max = false; + uint64_t old_max = latest->get_client_range(client); + uint64_t new_max = old_max; + + if (in->is_file()) { + bool forced_change_max = false; + dout(20) << "inode is file" << dendl; + if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) { + dout(20) << "client has write caps; m->get_max_size=" + << m->get_max_size() << "; old_max=" << old_max << dendl; + if (m->get_max_size() > new_max) { + dout(10) << "client requests file_max " << m->get_max_size() + << " > max " << old_max << dendl; + change_max = true; + forced_change_max = true; + new_max = calc_new_max_size(latest, m->get_max_size()); + } else { + new_max = calc_new_max_size(latest, size); + + if (new_max > old_max) + change_max = true; + else + new_max = old_max; + } + } else { + if (old_max) { + change_max = true; + new_max = 0; + } + } + + if (in->last == CEPH_NOSNAP && + change_max && + !in->filelock.can_wrlock(client) && + !in->filelock.can_force_wrlock(client)) { + dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl; + if (in->filelock.is_stable()) { + bool need_issue = false; + if (cap) + cap->inc_suppress(); + if (in->get_mds_caps_wanted().empty() && + (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) { + if (in->filelock.get_state() != LOCK_EXCL) + file_excl(&in->filelock, &need_issue); + } else + simple_lock(&in->filelock, &need_issue); + if (need_issue) + issue_caps(in); + if (cap) + cap->dec_suppress(); + } + if (!in->filelock.can_wrlock(client) && + !in->filelock.can_force_wrlock(client)) { + C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, + forced_change_max ? new_max : 0, + 0, utime_t()); + + in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); + change_max = false; + } + } + } + + if (m->flockbl.length()) { + int32_t num_locks; + auto bli = m->flockbl.cbegin(); + decode(num_locks, bli); + for ( int i=0; i < num_locks; ++i) { + ceph_filelock decoded_lock; + decode(decoded_lock, bli); + in->get_fcntl_lock_state()->held_locks. + insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock)); + ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; + } + decode(num_locks, bli); + for ( int i=0; i < num_locks; ++i) { + ceph_filelock decoded_lock; + decode(decoded_lock, bli); + in->get_flock_lock_state()->held_locks. + insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock)); + ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; + } + } + + if (!dirty && !change_max) + return false; + + Session *session = mds->get_session(m); + if (session->check_access(in, MAY_WRITE, + m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) { + dout(10) << "check_access failed, dropping cap update on " << *in << dendl; + return false; + } + + // do the update. + EUpdate *le = new EUpdate(mds->mdlog, "cap update"); + mds->mdlog->start_entry(le); + + bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) && + m->xattrbl.length() && + m->head.xattr_version > in->get_projected_inode()->xattr_version; + + MutationRef mut(new MutationImpl()); + mut->ls = mds->mdlog->get_current_segment(); + + auto pi = in->project_inode(mut, xattr); + pi.inode->version = in->pre_dirty(); + + _update_cap_fields(in, dirty, m, pi.inode.get()); + + if (change_max) { + dout(7) << " max_size " << old_max << " -> " << new_max + << " for " << *in << dendl; + if (new_max) { + auto &cr = pi.inode->client_ranges[client]; + cr.range.first = 0; + cr.range.last = new_max; + cr.follows = in->first - 1; + in->mark_clientwriteable(); + if (cap) + cap->mark_clientwriteable(); + } else { + pi.inode->client_ranges.erase(client); + if (pi.inode->client_ranges.empty()) + in->clear_clientwriteable(); + if (cap) + cap->clear_clientwriteable(); + } + } + + if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) + wrlock_force(&in->filelock, mut); // wrlock for duration of journal + + // auth + if (dirty & CEPH_CAP_AUTH_EXCL) + wrlock_force(&in->authlock, mut); + + // xattrs update? + if (xattr) { + dout(7) << " xattrs v" << pi.inode->xattr_version << " -> " << m->head.xattr_version << dendl; + pi.inode->xattr_version = m->head.xattr_version; + auto p = m->xattrbl.cbegin(); + decode_noshare(*pi.xattrs, p); + wrlock_force(&in->xattrlock, mut); + } + + mut->auth_pin(in); + mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); + mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); + + // "oldest flush tid" > 0 means client uses unique TID for each flush + if (ack && ack->get_oldest_flush_tid() > 0) + le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), + ack->get_oldest_flush_tid()); + + unsigned update_flags = 0; + if (change_max) + update_flags |= UPDATE_SHAREMAX; + if (cap) + update_flags |= UPDATE_NEEDSISSUE; + mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags, + ack, client)); + if (need_flush && !*need_flush && + ((change_max && new_max) || // max INCREASE + _need_flush_mdlog(in, dirty))) + *need_flush = true; + + return true; +} + +void Locker::handle_client_cap_release(const cref_t<MClientCapRelease> &m) +{ + client_t client = m->get_source().num(); + dout(10) << "handle_client_cap_release " << *m << dendl; + + if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) { + mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); + return; + } + + if (mds->logger) mds->logger->inc(l_mdss_handle_client_cap_release); + + if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) { + // Pause RADOS operations until we see the required epoch + mds->objecter->set_epoch_barrier(m->osd_epoch_barrier); + } + + if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) { + // Record the barrier so that we will retransmit it to clients + mds->set_osd_epoch_barrier(m->osd_epoch_barrier); + } + + Session *session = mds->get_session(m); + + for (const auto &cap : m->caps) { + _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq); + } + + if (session) { + session->notify_cap_release(m->caps.size()); + } +} + +class C_Locker_RetryCapRelease : public LockerContext { + client_t client; + inodeno_t ino; + uint64_t cap_id; + ceph_seq_t migrate_seq; + ceph_seq_t issue_seq; +public: + C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id, + ceph_seq_t mseq, ceph_seq_t seq) : + LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {} + void finish(int r) override { + locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq); + } +}; + +void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, + ceph_seq_t mseq, ceph_seq_t seq) +{ + CInode *in = mdcache->get_inode(ino); + if (!in) { + dout(7) << "_do_cap_release missing ino " << ino << dendl; + return; + } + Capability *cap = in->get_client_cap(client); + if (!cap) { + dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl; + return; + } + + dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl; + if (cap->get_cap_id() != cap_id) { + dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl; + return; + } + if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { + dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl; + return; + } + if (should_defer_client_cap_frozen(in)) { + dout(7) << " freezing|frozen, deferring" << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, + new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq)); + return; + } + if (seq != cap->get_last_issue()) { + dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl; + // clean out any old revoke history + cap->clean_revoke_from(seq); + eval_cap_gather(in); + return; + } + remove_client_cap(in, cap); +} + +void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill) +{ + client_t client = cap->get_client(); + // clean out any pending snapflush state + if (!in->client_need_snapflush.empty()) + _do_null_snapflush(in, client); + + while (!cap->lock_caches.empty()) { + MDLockCache* lock_cache = cap->lock_caches.front(); + lock_cache->client_cap = nullptr; + invalidate_lock_cache(lock_cache); + } + + bool notable = cap->is_notable(); + in->remove_client_cap(client); + if (!notable) + return; + + if (in->is_auth()) { + // make sure we clear out the client byte range + if (in->get_projected_inode()->client_ranges.count(client) && + !(in->get_inode()->nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray + if (kill) + in->state_set(CInode::STATE_NEEDSRECOVER); + else + check_inode_max_size(in); + } + } else { + request_inode_file_caps(in); + } + + try_eval(in, CEPH_CAP_LOCKS); +} + + +/** + * Return true if any currently revoking caps exceed the + * session_timeout threshold. + */ +bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking, + double timeout) const +{ + xlist<Capability*>::const_iterator p = revoking.begin(); + if (p.end()) { + // No revoking caps at the moment + return false; + } else { + utime_t now = ceph_clock_now(); + utime_t age = now - (*p)->get_last_revoke_stamp(); + if (age <= timeout) { + return false; + } else { + return true; + } + } +} + +std::set<client_t> Locker::get_late_revoking_clients(double timeout) const +{ + std::set<client_t> result; + + if (any_late_revoking_caps(revoking_caps, timeout)) { + // Slow path: execute in O(N_clients) + for (auto &p : revoking_caps_by_client) { + if (any_late_revoking_caps(p.second, timeout)) { + result.insert(p.first); + } + } + } else { + // Fast path: no misbehaving clients, execute in O(1) + } + return result; +} + +// Hard-code instead of surfacing a config settings because this is +// really a hack that should go away at some point when we have better +// inspection tools for getting at detailed cap state (#7316) +#define MAX_WARN_CAPS 100 + +void Locker::caps_tick() +{ + utime_t now = ceph_clock_now(); + + if (!need_snapflush_inodes.empty()) { + // snap inodes that needs flush are auth pinned, they affect + // subtree/difrarg freeze. + utime_t cutoff = now; + cutoff -= g_conf()->mds_freeze_tree_timeout / 3; + + CInode *last = need_snapflush_inodes.back(); + while (!need_snapflush_inodes.empty()) { + CInode *in = need_snapflush_inodes.front(); + if (in->last_dirstat_prop >= cutoff) + break; + in->item_caps.remove_myself(); + snapflush_nudge(in); + if (in == last) + break; + } + } + + dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl; + + now = ceph_clock_now(); + int n = 0; + for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) { + Capability *cap = *p; + + utime_t age = now - cap->get_last_revoke_stamp(); + dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl; + if (age <= mds->mdsmap->get_session_timeout()) { + dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl; + break; + } else { + ++n; + if (n > MAX_WARN_CAPS) { + dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late" + << "revoking, ignoring subsequent caps" << dendl; + break; + } + } + // exponential backoff of warning intervals + if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) { + cap->inc_num_revoke_warnings(); + CachedStackStringStream css; + *css << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino " + << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending()) + << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago"; + mds->clog->warn() << css->strv(); + dout(20) << __func__ << " " << css->strv() << dendl; + } else { + dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl; + } + } +} + + +void Locker::handle_client_lease(const cref_t<MClientLease> &m) +{ + dout(10) << "handle_client_lease " << *m << dendl; + + ceph_assert(m->get_source().is_client()); + client_t client = m->get_source().num(); + + CInode *in = mdcache->get_inode(m->get_ino(), m->get_last()); + if (!in) { + dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl; + return; + } + CDentry *dn = 0; + + frag_t fg = in->pick_dirfrag(m->dname); + CDir *dir = in->get_dirfrag(fg); + if (dir) + dn = dir->lookup(m->dname); + if (!dn) { + dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl; + return; + } + dout(10) << " on " << *dn << dendl; + + // replica and lock + ClientLease *l = dn->get_client_lease(client); + if (!l) { + dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl; + return; + } + + switch (m->get_action()) { + case CEPH_MDS_LEASE_REVOKE_ACK: + case CEPH_MDS_LEASE_RELEASE: + if (l->seq != m->get_seq()) { + dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl; + } else { + dout(7) << "handle_client_lease client." << client + << " on " << *dn << dendl; + dn->remove_client_lease(l, this); + } + break; + + case CEPH_MDS_LEASE_RENEW: + { + dout(7) << "handle_client_lease client." << client << " renew on " << *dn + << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl; + if (dn->lock.can_lease(client)) { + auto reply = make_message<MClientLease>(*m); + int pool = 1; // fixme.. do something smart! + reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]); + reply->h.seq = ++l->seq; + reply->clear_payload(); + + utime_t now = ceph_clock_now(); + now += mdcache->client_lease_durations[pool]; + mdcache->touch_client_lease(l, pool, now); + + mds->send_message_client_counted(reply, m->get_connection()); + } + } + break; + + default: + ceph_abort(); // implement me + break; + } +} + + +void Locker::issue_client_lease(CDentry *dn, CInode *in, MDRequestRef &mdr, utime_t now, + bufferlist &bl) +{ + client_t client = mdr->get_client(); + Session *session = mdr->session; + + CInode *diri = dn->get_dir()->get_inode(); + if (mdr->snapid == CEPH_NOSNAP && + dn->lock.can_lease(client) && + !diri->is_stray() && // do not issue dn leases in stray dir! + !diri->filelock.can_lease(client) && + !(diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL))) { + int mask = 0; + CDentry::linkage_t *dnl = dn->get_linkage(client, mdr); + if (dnl->is_primary()) { + ceph_assert(dnl->get_inode() == in); + mask = CEPH_LEASE_PRIMARY_LINK; + } else { + if (dnl->is_remote()) + ceph_assert(dnl->get_remote_ino() == in->ino()); + else + ceph_assert(!in); + } + // issue a dentry lease + ClientLease *l = dn->add_client_lease(client, session); + session->touch_lease(l); + + int pool = 1; // fixme.. do something smart! + now += mdcache->client_lease_durations[pool]; + mdcache->touch_client_lease(l, pool, now); + + LeaseStat lstat; + lstat.mask = CEPH_LEASE_VALID | mask; + lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]); + lstat.seq = ++l->seq; + lstat.alternate_name = std::string(dn->alternate_name); + encode_lease(bl, session->info, lstat); + dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms " + << " on " << *dn << dendl; + } else { + // null lease + LeaseStat lstat; + lstat.mask = 0; + lstat.alternate_name = std::string(dn->alternate_name); + encode_lease(bl, session->info, lstat); + dout(20) << "issue_client_lease no/null lease on " << *dn << dendl; + } +} + + +void Locker::revoke_client_leases(SimpleLock *lock) +{ + int n = 0; + CDentry *dn = static_cast<CDentry*>(lock->get_parent()); + for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin(); + p != dn->client_lease_map.end(); + ++p) { + ClientLease *l = p->second; + + n++; + ceph_assert(lock->get_type() == CEPH_LOCK_DN); + + CDentry *dn = static_cast<CDentry*>(lock->get_parent()); + int mask = 1 | CEPH_LOCK_DN; // old and new bits + + // i should also revoke the dir ICONTENT lease, if they have it! + CInode *diri = dn->get_dir()->get_inode(); + auto lease = make_message<MClientLease>(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name()); + mds->send_message_client_counted(lease, l->client); + } +} + +void Locker::encode_lease(bufferlist& bl, const session_info_t& info, + const LeaseStat& ls) +{ + if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) { + ENCODE_START(2, 1, bl); + encode(ls.mask, bl); + encode(ls.duration_ms, bl); + encode(ls.seq, bl); + encode(ls.alternate_name, bl); + ENCODE_FINISH(bl); + } + else { + encode(ls.mask, bl); + encode(ls.duration_ms, bl); + encode(ls.seq, bl); + } +} + +// locks ---------------------------------------------------------------- + +SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info) +{ + switch (lock_type) { + case CEPH_LOCK_DN: + { + // be careful; info.dirfrag may have incorrect frag; recalculate based on dname. + CInode *diri = mdcache->get_inode(info.dirfrag.ino); + frag_t fg; + CDir *dir = 0; + CDentry *dn = 0; + if (diri) { + fg = diri->pick_dirfrag(info.dname); + dir = diri->get_dirfrag(fg); + if (dir) + dn = dir->lookup(info.dname, info.snapid); + } + if (!dn) { + dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl; + return 0; + } + return &dn->lock; + } + + case CEPH_LOCK_IAUTH: + case CEPH_LOCK_ILINK: + case CEPH_LOCK_IDFT: + case CEPH_LOCK_IFILE: + case CEPH_LOCK_INEST: + case CEPH_LOCK_IXATTR: + case CEPH_LOCK_ISNAP: + case CEPH_LOCK_IFLOCK: + case CEPH_LOCK_IPOLICY: + { + CInode *in = mdcache->get_inode(info.ino, info.snapid); + if (!in) { + dout(7) << "get_lock don't have ino " << info.ino << dendl; + return 0; + } + switch (lock_type) { + case CEPH_LOCK_IAUTH: return &in->authlock; + case CEPH_LOCK_ILINK: return &in->linklock; + case CEPH_LOCK_IDFT: return &in->dirfragtreelock; + case CEPH_LOCK_IFILE: return &in->filelock; + case CEPH_LOCK_INEST: return &in->nestlock; + case CEPH_LOCK_IXATTR: return &in->xattrlock; + case CEPH_LOCK_ISNAP: return &in->snaplock; + case CEPH_LOCK_IFLOCK: return &in->flocklock; + case CEPH_LOCK_IPOLICY: return &in->policylock; + } + } + + default: + dout(7) << "get_lock don't know lock_type " << lock_type << dendl; + ceph_abort(); + break; + } + + return 0; +} + +void Locker::handle_lock(const cref_t<MLock> &m) +{ + // nobody should be talking to us during recovery. + ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); + + SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info()); + if (!lock) { + dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl; + return; + } + + switch (lock->get_type()) { + case CEPH_LOCK_DN: + case CEPH_LOCK_IAUTH: + case CEPH_LOCK_ILINK: + case CEPH_LOCK_ISNAP: + case CEPH_LOCK_IXATTR: + case CEPH_LOCK_IFLOCK: + case CEPH_LOCK_IPOLICY: + handle_simple_lock(lock, m); + break; + + case CEPH_LOCK_IDFT: + case CEPH_LOCK_INEST: + //handle_scatter_lock((ScatterLock*)lock, m); + //break; + + case CEPH_LOCK_IFILE: + handle_file_lock(static_cast<ScatterLock*>(lock), m); + break; + + default: + dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl; + ceph_abort(); + break; + } +} + + + + + +// ========================================================================== +// simple lock + +/** This function may take a reference to m if it needs one, but does + * not put references. */ +void Locker::handle_reqrdlock(SimpleLock *lock, const cref_t<MLock> &m) +{ + MDSCacheObject *parent = lock->get_parent(); + if (parent->is_auth() && + lock->get_state() != LOCK_SYNC && + !parent->is_frozen()) { + dout(7) << "handle_reqrdlock got rdlock request on " << *lock + << " on " << *parent << dendl; + ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this! + if (lock->is_stable()) { + simple_sync(lock); + } else { + dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl; + lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE, + new C_MDS_RetryMessage(mds, m)); + } + } else { + dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock + << " on " << *parent << dendl; + // replica should retry + } +} + +void Locker::handle_simple_lock(SimpleLock *lock, const cref_t<MLock> &m) +{ + int from = m->get_asker(); + + dout(10) << "handle_simple_lock " << *m + << " on " << *lock << " " << *lock->get_parent() << dendl; + + if (mds->is_rejoin()) { + if (lock->get_parent()->is_rejoining()) { + dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent() + << ", dropping " << *m << dendl; + return; + } + } + + switch (m->get_action()) { + // -- replica -- + case LOCK_AC_SYNC: + ceph_assert(lock->get_state() == LOCK_LOCK); + lock->decode_locked_state(m->get_data()); + lock->set_state(LOCK_SYNC); + lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); + break; + + case LOCK_AC_LOCK: + ceph_assert(lock->get_state() == LOCK_SYNC); + lock->set_state(LOCK_SYNC_LOCK); + if (lock->is_leased()) + revoke_client_leases(lock); + eval_gather(lock, true); + if (lock->is_unstable_and_locked()) { + if (lock->is_cached()) + invalidate_lock_caches(lock); + mds->mdlog->flush(); + } + break; + + + // -- auth -- + case LOCK_AC_LOCKACK: + ceph_assert(lock->get_state() == LOCK_SYNC_LOCK || + lock->get_state() == LOCK_SYNC_EXCL); + ceph_assert(lock->is_gathering(from)); + lock->remove_gather(from); + + if (lock->is_gathering()) { + dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from + << ", still gathering " << lock->get_gather_set() << dendl; + } else { + dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from + << ", last one" << dendl; + eval_gather(lock); + } + break; + + case LOCK_AC_REQRDLOCK: + handle_reqrdlock(lock, m); + break; + + } +} + +/* unused, currently. + +class C_Locker_SimpleEval : public Context { + Locker *locker; + SimpleLock *lock; +public: + C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {} + void finish(int r) { + locker->try_simple_eval(lock); + } +}; + +void Locker::try_simple_eval(SimpleLock *lock) +{ + // unstable and ambiguous auth? + if (!lock->is_stable() && + lock->get_parent()->is_ambiguous_auth()) { + dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl; + //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) + lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock)); + return; + } + + if (!lock->get_parent()->is_auth()) { + dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl; + return; + } + + if (!lock->get_parent()->can_auth_pin()) { + dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl; + //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) + lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock)); + return; + } + + if (lock->is_stable()) + simple_eval(lock); +} +*/ + + +void Locker::simple_eval(SimpleLock *lock, bool *need_issue) +{ + dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl; + + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(lock->is_stable()); + + if (lock->get_parent()->is_freezing_or_frozen()) { + // dentry/snap lock in unreadable state can block path traverse + if ((lock->get_type() != CEPH_LOCK_DN && + lock->get_type() != CEPH_LOCK_ISNAP && + lock->get_type() != CEPH_LOCK_IPOLICY) || + lock->get_state() == LOCK_SYNC || + lock->get_parent()->is_frozen()) + return; + } + + if (mdcache->is_readonly()) { + if (lock->get_state() != LOCK_SYNC) { + dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } + return; + } + + CInode *in = 0; + int wanted = 0; + if (lock->get_cap_shift()) { + in = static_cast<CInode*>(lock->get_parent()); + in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift()); + } + + // -> excl? + if (lock->get_state() != LOCK_EXCL && + in && in->get_target_loner() >= 0 && + (wanted & CEPH_CAP_GEXCL)) { + dout(7) << "simple_eval stable, going to excl " << *lock + << " on " << *lock->get_parent() << dendl; + simple_excl(lock, need_issue); + } + + // stable -> sync? + else if (lock->get_state() != LOCK_SYNC && + !lock->is_wrlocked() && + ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) || + (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) { + dout(7) << "simple_eval stable, syncing " << *lock + << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } +} + + +// mid + +bool Locker::simple_sync(SimpleLock *lock, bool *need_issue) +{ + dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl; + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(lock->is_stable()); + + CInode *in = 0; + if (lock->get_cap_shift()) + in = static_cast<CInode *>(lock->get_parent()); + + int old_state = lock->get_state(); + + if (old_state != LOCK_TSYN) { + + switch (lock->get_state()) { + case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break; + case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break; + case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break; + case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_wrlocked()) { + gather++; + if (lock->is_cached()) + invalidate_lock_caches(lock); + + // After a client request is early replied the mdlog won't be flushed + // immediately, but before safe replied the request will hold the write + // locks. So if the client sends another request to a different MDS + // daemon, which then needs to request read lock from current MDS daemon, + // then that daemon maybe stuck at most for 5 seconds. Which will lead + // the client stuck at most 5 seconds. + // + // Let's try to flush the mdlog when the write lock is held, which will + // release the write locks after mdlog is successfully flushed. + mds->mdlog->flush(); + } + + if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) { + send_lock_message(lock, LOCK_AC_SYNC); + lock->init_gather(); + gather++; + } + + if (in && in->is_head()) { + if (in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + } + + bool need_recover = false; + if (lock->get_type() == CEPH_LOCK_IFILE) { + ceph_assert(in); + if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + need_recover = true; + gather++; + } + } + + if (!gather && lock->is_dirty()) { + lock->get_parent()->auth_pin(lock); + scatter_writebehind(static_cast<ScatterLock*>(lock)); + return false; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + if (need_recover) + mds->mdcache->do_file_recover(); + return false; + } + } + + if (lock->get_parent()->is_replicated()) { // FIXME + bufferlist data; + lock->encode_locked_state(data); + send_lock_message(lock, LOCK_AC_SYNC, data); + } + lock->set_state(LOCK_SYNC); + lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); + if (in && in->is_head()) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + return true; +} + +void Locker::simple_excl(SimpleLock *lock, bool *need_issue) +{ + dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl; + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(lock->is_stable()); + + CInode *in = 0; + if (lock->get_cap_shift()) + in = static_cast<CInode *>(lock->get_parent()); + + switch (lock->get_state()) { + case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break; + case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break; + case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_rdlocked()) + gather++; + if (lock->is_wrlocked()) + gather++; + if (gather && lock->is_cached()) + invalidate_lock_caches(lock); + + if (lock->get_parent()->is_replicated() && + lock->get_state() != LOCK_LOCK_EXCL && + lock->get_state() != LOCK_XSYN_EXCL) { + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + gather++; + } + + if (in && in->is_head()) { + if (in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + } else { + lock->set_state(LOCK_EXCL); + lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE); + if (in) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + } +} + +void Locker::simple_lock(SimpleLock *lock, bool *need_issue) +{ + dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl; + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(lock->is_stable()); + ceph_assert(lock->get_state() != LOCK_LOCK); + + CInode *in = 0; + if (lock->get_cap_shift()) + in = static_cast<CInode *>(lock->get_parent()); + + int old_state = lock->get_state(); + + switch (lock->get_state()) { + case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break; + case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break; + case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break; + case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); + (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted(); + break; + case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_leased()) { + gather++; + revoke_client_leases(lock); + } + if (lock->is_rdlocked()) { + if (lock->is_cached()) + invalidate_lock_caches(lock); + gather++; + } + if (in && in->is_head()) { + if (in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + } + + bool need_recover = false; + if (lock->get_type() == CEPH_LOCK_IFILE) { + ceph_assert(in); + if(in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + need_recover = true; + gather++; + } + } + + if (lock->get_parent()->is_replicated() && + lock->get_state() == LOCK_MIX_LOCK && + gather) { + dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl; + } else { + // move to second stage of gather now, so we don't send the lock action later. + if (lock->get_state() == LOCK_MIX_LOCK) + lock->set_state(LOCK_MIX_LOCK2); + + if (lock->get_parent()->is_replicated() && + lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK + gather++; + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + } + } + + if (!gather && lock->is_dirty()) { + lock->get_parent()->auth_pin(lock); + scatter_writebehind(static_cast<ScatterLock*>(lock)); + return; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + if (need_recover) + mds->mdcache->do_file_recover(); + } else { + lock->set_state(LOCK_LOCK); + lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE); + } +} + + +void Locker::simple_xlock(SimpleLock *lock) +{ + dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl; + ceph_assert(lock->get_parent()->is_auth()); + //assert(lock->is_stable()); + ceph_assert(lock->get_state() != LOCK_XLOCK); + + CInode *in = 0; + if (lock->get_cap_shift()) + in = static_cast<CInode *>(lock->get_parent()); + + if (lock->is_stable()) + lock->get_parent()->auth_pin(lock); + + switch (lock->get_state()) { + case LOCK_LOCK: + case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_rdlocked()) + gather++; + if (lock->is_wrlocked()) + gather++; + if (gather && lock->is_cached()) + invalidate_lock_caches(lock); + + if (in && in->is_head()) { + if (in->issued_caps_need_gather(lock)) { + issue_caps(in); + gather++; + } + } + + if (!gather) { + lock->set_state(LOCK_PREXLOCK); + //assert("shouldn't be called if we are already xlockable" == 0); + } +} + + + + + +// ========================================================================== +// scatter lock + +/* + +Some notes on scatterlocks. + + - The scatter/gather is driven by the inode lock. The scatter always + brings in the latest metadata from the fragments. + + - When in a scattered/MIX state, fragments are only allowed to + update/be written to if the accounted stat matches the inode's + current version. + + - That means, on gather, we _only_ assimilate diffs for frag metadata + that match the current version, because those are the only ones + written during this scatter/gather cycle. (Others didn't permit + it.) We increment the version and journal this to disk. + + - When possible, we also simultaneously update our local frag + accounted stats to match. + + - On scatter, the new inode info is broadcast to frags, both local + and remote. If possible (auth and !frozen), the dirfrag auth + should update the accounted state (if it isn't already up to date). + Note that this may occur on both the local inode auth node and + inode replicas, so there are two potential paths. If it is NOT + possible, they need to mark_stale to prevent any possible writes. + + - A scatter can be to MIX (potentially writeable) or to SYNC (read + only). Both are opportunities to update the frag accounted stats, + even though only the MIX case is affected by a stale dirfrag. + + - Because many scatter/gather cycles can potentially go by without a + frag being able to update its accounted stats (due to being frozen + by exports/refragments in progress), the frag may have (even very) + old stat versions. That's fine. If when we do want to update it, + we can update accounted_* and the version first. + +*/ + +class C_Locker_ScatterWB : public LockerLogContext { + ScatterLock *lock; + MutationRef mut; +public: + C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) : + LockerLogContext(l), lock(sl), mut(m) {} + void finish(int r) override { + locker->scatter_writebehind_finish(lock, mut); + } +}; + +void Locker::scatter_writebehind(ScatterLock *lock) +{ + CInode *in = static_cast<CInode*>(lock->get_parent()); + dout(10) << "scatter_writebehind " << in->get_inode()->mtime << " on " << *lock << " on " << *in << dendl; + + // journal + MutationRef mut(new MutationImpl()); + mut->ls = mds->mdlog->get_current_segment(); + + // forcefully take a wrlock + lock->get_wrlock(true); + mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK); + + in->pre_cow_old_inode(); // avoid cow mayhem + + auto pi = in->project_inode(mut); + pi.inode->version = in->pre_dirty(); + + in->finish_scatter_gather_update(lock->get_type(), mut); + lock->start_flush(); + + EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind"); + mds->mdlog->start_entry(le); + + mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY); + mdcache->journal_dirty_inode(mut.get(), &le->metablob, in); + + in->finish_scatter_gather_update_accounted(lock->get_type(), &le->metablob); + + mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut)); + mds->mdlog->flush(); +} + +void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut) +{ + CInode *in = static_cast<CInode*>(lock->get_parent()); + dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl; + + mut->apply(); + + lock->finish_flush(); + + // if replicas may have flushed in a mix->lock state, send another + // message so they can finish_flush(). + if (in->is_replicated()) { + switch (lock->get_state()) { + case LOCK_MIX_LOCK: + case LOCK_MIX_LOCK2: + case LOCK_MIX_EXCL: + case LOCK_MIX_TSYN: + send_lock_message(lock, LOCK_AC_LOCKFLUSHED); + } + } + + drop_locks(mut.get()); + mut->cleanup(); + + if (lock->is_stable()) + lock->finish_waiters(ScatterLock::WAIT_STABLE); + + //scatter_eval_gather(lock); +} + +void Locker::scatter_eval(ScatterLock *lock, bool *need_issue) +{ + dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl; + + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(lock->is_stable()); + + if (lock->get_parent()->is_freezing_or_frozen()) { + dout(20) << " freezing|frozen" << dendl; + return; + } + + if (mdcache->is_readonly()) { + if (lock->get_state() != LOCK_SYNC) { + dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } + return; + } + + if (!lock->is_rdlocked() && + lock->get_state() != LOCK_MIX && + lock->get_scatter_wanted()) { + dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock + << " on " << *lock->get_parent() << dendl; + scatter_mix(lock, need_issue); + return; + } + + if (lock->get_type() == CEPH_LOCK_INEST) { + // in general, we want to keep INEST writable at all times. + if (!lock->is_rdlocked()) { + if (lock->get_parent()->is_replicated()) { + if (lock->get_state() != LOCK_MIX) + scatter_mix(lock, need_issue); + } else { + if (lock->get_state() != LOCK_LOCK) + simple_lock(lock, need_issue); + } + } + return; + } + + CInode *in = static_cast<CInode*>(lock->get_parent()); + if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) { + // i _should_ be sync. + if (!lock->is_wrlocked() && + lock->get_state() != LOCK_SYNC) { + dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl; + simple_sync(lock, need_issue); + } + } +} + + +/* + * mark a scatterlock to indicate that the dir fnode has some dirty data + */ +void Locker::mark_updated_scatterlock(ScatterLock *lock) +{ + lock->mark_dirty(); + if (lock->get_updated_item()->is_on_list()) { + dout(10) << "mark_updated_scatterlock " << *lock + << " - already on list since " << lock->get_update_stamp() << dendl; + } else { + updated_scatterlocks.push_back(lock->get_updated_item()); + utime_t now = ceph_clock_now(); + lock->set_update_stamp(now); + dout(10) << "mark_updated_scatterlock " << *lock + << " - added at " << now << dendl; + } +} + +/* + * this is called by scatter_tick and LogSegment::try_to_trim() when + * trying to flush dirty scattered data (i.e. updated fnode) back to + * the inode. + * + * we need to lock|scatter in order to push fnode changes into the + * inode.dirstat. + */ +void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange) +{ + CInode *p = static_cast<CInode *>(lock->get_parent()); + + if (p->is_frozen() || p->is_freezing()) { + dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl; + if (c) + p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c); + else if (lock->is_dirty()) + // just requeue. not ideal.. starvation prone.. + updated_scatterlocks.push_back(lock->get_updated_item()); + return; + } + + if (p->is_ambiguous_auth()) { + dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl; + if (c) + p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c); + else if (lock->is_dirty()) + // just requeue. not ideal.. starvation prone.. + updated_scatterlocks.push_back(lock->get_updated_item()); + return; + } + + if (p->is_auth()) { + int count = 0; + while (true) { + if (lock->is_stable()) { + // can we do it now? + // (only if we're not replicated.. if we are, we really do need + // to nudge the lock state!) + /* + actually, even if we're not replicated, we can't stay in MIX, because another mds + could discover and replicate us at any time. if that happens while we're flushing, + they end up in MIX but their inode has the old scatterstat version. + + if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) { + dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl; + scatter_writebehind(lock); + if (c) + lock->add_waiter(SimpleLock::WAIT_STABLE, c); + return; + } + */ + + if (mdcache->is_readonly()) { + if (lock->get_state() != LOCK_SYNC) { + dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl; + simple_sync(static_cast<ScatterLock*>(lock)); + } + break; + } + + // adjust lock state + dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl; + switch (lock->get_type()) { + case CEPH_LOCK_IFILE: + if (p->is_replicated() && lock->get_state() != LOCK_MIX) + scatter_mix(static_cast<ScatterLock*>(lock)); + else if (lock->get_state() != LOCK_LOCK) + simple_lock(static_cast<ScatterLock*>(lock)); + else + simple_sync(static_cast<ScatterLock*>(lock)); + break; + + case CEPH_LOCK_IDFT: + case CEPH_LOCK_INEST: + if (p->is_replicated() && lock->get_state() != LOCK_MIX) + scatter_mix(lock); + else if (lock->get_state() != LOCK_LOCK) + simple_lock(lock); + else + simple_sync(lock); + break; + default: + ceph_abort(); + } + ++count; + if (lock->is_stable() && count == 2) { + dout(10) << "scatter_nudge oh, stable after two cycles." << dendl; + // this should only realy happen when called via + // handle_file_lock due to AC_NUDGE, because the rest of the + // time we are replicated or have dirty data and won't get + // called. bailing here avoids an infinite loop. + ceph_assert(!c); + break; + } + } else { + dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl; + if (c) + lock->add_waiter(SimpleLock::WAIT_STABLE, c); + return; + } + } + } else { + dout(10) << "scatter_nudge replica, requesting scatter/unscatter of " + << *lock << " on " << *p << dendl; + // request unscatter? + mds_rank_t auth = lock->get_parent()->authority().first; + if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { + mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth); + } + + // wait... + if (c) + lock->add_waiter(SimpleLock::WAIT_STABLE, c); + + // also, requeue, in case we had wrong auth or something + if (lock->is_dirty()) + updated_scatterlocks.push_back(lock->get_updated_item()); + } +} + +void Locker::scatter_tick() +{ + dout(10) << "scatter_tick" << dendl; + + // updated + utime_t now = ceph_clock_now(); + int n = updated_scatterlocks.size(); + while (!updated_scatterlocks.empty()) { + ScatterLock *lock = updated_scatterlocks.front(); + + if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping + + if (!lock->is_dirty()) { + updated_scatterlocks.pop_front(); + dout(10) << " removing from updated_scatterlocks " + << *lock << " " << *lock->get_parent() << dendl; + continue; + } + if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval) + break; + updated_scatterlocks.pop_front(); + scatter_nudge(lock, 0); + } + mds->mdlog->flush(); +} + + +void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue) +{ + dout(10) << "scatter_tempsync " << *lock + << " on " << *lock->get_parent() << dendl; + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(lock->is_stable()); + + ceph_abort_msg("not fully implemented, at least not for filelock"); + + CInode *in = static_cast<CInode *>(lock->get_parent()); + + switch (lock->get_state()) { + case LOCK_SYNC: ceph_abort(); // this shouldn't happen + case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break; + case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_wrlocked()) { + if (lock->is_cached()) + invalidate_lock_caches(lock); + gather++; + } + + if (lock->get_cap_shift() && + in->is_head() && + in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + + if (lock->get_state() == LOCK_MIX_TSYN && + in->is_replicated()) { + lock->init_gather(); + send_lock_message(lock, LOCK_AC_LOCK); + gather++; + } + + if (gather) { + in->auth_pin(lock); + } else { + // do tempsync + lock->set_state(LOCK_TSYN); + lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE); + if (lock->get_cap_shift()) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + } +} + + + +// ========================================================================== +// local lock + +void Locker::local_wrlock_grab(LocalLockC *lock, MutationRef& mut) +{ + dout(7) << "local_wrlock_grab on " << *lock + << " on " << *lock->get_parent() << dendl; + + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(lock->can_wrlock()); + lock->get_wrlock(mut->get_client()); + + auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK); + ceph_assert(it->is_wrlock()); +} + +bool Locker::local_wrlock_start(LocalLockC *lock, MDRequestRef& mut) +{ + dout(7) << "local_wrlock_start on " << *lock + << " on " << *lock->get_parent() << dendl; + + ceph_assert(lock->get_parent()->is_auth()); + if (lock->can_wrlock()) { + lock->get_wrlock(mut->get_client()); + auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK); + ceph_assert(it->is_wrlock()); + return true; + } else { + lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + return false; + } +} + +void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut) +{ + ceph_assert(it->is_wrlock()); + LocalLockC *lock = static_cast<LocalLockC*>(it->lock); + dout(7) << "local_wrlock_finish on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->put_wrlock(); + mut->locks.erase(it); + if (lock->get_num_wrlocks() == 0) { + lock->finish_waiters(SimpleLock::WAIT_STABLE | + SimpleLock::WAIT_WR | + SimpleLock::WAIT_RD); + } +} + +bool Locker::local_xlock_start(LocalLockC *lock, MDRequestRef& mut) +{ + dout(7) << "local_xlock_start on " << *lock + << " on " << *lock->get_parent() << dendl; + + ceph_assert(lock->get_parent()->is_auth()); + if (!lock->can_xlock_local()) { + lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + return false; + } + + lock->get_xlock(mut, mut->get_client()); + mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK); + return true; +} + +void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut) +{ + ceph_assert(it->is_xlock()); + LocalLockC *lock = static_cast<LocalLockC*>(it->lock); + dout(7) << "local_xlock_finish on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->put_xlock(); + mut->locks.erase(it); + + lock->finish_waiters(SimpleLock::WAIT_STABLE | + SimpleLock::WAIT_WR | + SimpleLock::WAIT_RD); +} + + + +// ========================================================================== +// file lock + + +void Locker::file_eval(ScatterLock *lock, bool *need_issue) +{ + CInode *in = static_cast<CInode*>(lock->get_parent()); + int loner_wanted, other_wanted; + int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE); + dout(7) << "file_eval wanted=" << gcap_string(wanted) + << " loner_wanted=" << gcap_string(loner_wanted) + << " other_wanted=" << gcap_string(other_wanted) + << " filelock=" << *lock << " on " << *lock->get_parent() + << dendl; + + ceph_assert(lock->get_parent()->is_auth()); + ceph_assert(lock->is_stable()); + + if (lock->get_parent()->is_freezing_or_frozen()) + return; + + if (mdcache->is_readonly()) { + if (lock->get_state() != LOCK_SYNC) { + dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } + return; + } + + // excl -> *? + if (lock->get_state() == LOCK_EXCL) { + dout(20) << " is excl" << dendl; + int loner_issued, other_issued, xlocker_issued; + in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE); + dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued) + << " other_issued=" << gcap_string(other_issued) + << " xlocker_issued=" << gcap_string(xlocker_issued) + << dendl; + if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) || + (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) || + (in->is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/ + dout(20) << " should lose it" << dendl; + // we should lose it. + // loner other want + // R R SYNC + // R R|W MIX + // R W MIX + // R|W R MIX + // R|W R|W MIX + // R|W W MIX + // W R MIX + // W R|W MIX + // W W MIX + // -> any writer means MIX; RD doesn't matter. + if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) || + lock->is_waiter_for(SimpleLock::WAIT_WR)) + scatter_mix(lock, need_issue); + else if (!lock->is_wrlocked()) // let excl wrlocks drain first + simple_sync(lock, need_issue); + else + dout(10) << " waiting for wrlock to drain" << dendl; + } + } + + // * -> excl? + else if (lock->get_state() != LOCK_EXCL && + !lock->is_rdlocked() && + //!lock->is_waiter_for(SimpleLock::WAIT_WR) && + in->get_target_loner() >= 0 && + (in->is_dir() ? + !in->has_subtree_or_exporting_dirfrag() : + (wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))) { + dout(7) << "file_eval stable, bump to loner " << *lock + << " on " << *lock->get_parent() << dendl; + file_excl(lock, need_issue); + } + + // * -> mixed? + else if (lock->get_state() != LOCK_MIX && + !lock->is_rdlocked() && + //!lock->is_waiter_for(SimpleLock::WAIT_WR) && + (lock->get_scatter_wanted() || + (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) { + dout(7) << "file_eval stable, bump to mixed " << *lock + << " on " << *lock->get_parent() << dendl; + scatter_mix(lock, need_issue); + } + + // * -> sync? + else if (lock->get_state() != LOCK_SYNC && + !lock->is_wrlocked() && // drain wrlocks first! + !lock->is_waiter_for(SimpleLock::WAIT_WR) && + !(wanted & CEPH_CAP_GWR) && + !((lock->get_state() == LOCK_MIX) && + in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are + //((wanted & CEPH_CAP_RD) || + //in->is_replicated() || + //lock->is_leased() || + //(!loner && lock->get_state() == LOCK_EXCL)) && + ) { + dout(7) << "file_eval stable, bump to sync " << *lock + << " on " << *lock->get_parent() << dendl; + simple_sync(lock, need_issue); + } + else if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + mds->mdcache->do_file_recover(); + } +} + + + +void Locker::scatter_mix(ScatterLock *lock, bool *need_issue) +{ + dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl; + + CInode *in = static_cast<CInode*>(lock->get_parent()); + ceph_assert(in->is_auth()); + ceph_assert(lock->is_stable()); + + if (lock->get_state() == LOCK_LOCK) { + in->start_scatter(lock); + if (in->is_replicated()) { + // data + bufferlist softdata; + lock->encode_locked_state(softdata); + + // bcast to replicas + send_lock_message(lock, LOCK_AC_MIX, softdata); + } + + // change lock + lock->set_state(LOCK_MIX); + lock->clear_scatter_wanted(); + if (lock->get_cap_shift()) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + } else { + // gather? + switch (lock->get_state()) { + case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break; + case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break; + case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break; + case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_rdlocked()) { + if (lock->is_cached()) + invalidate_lock_caches(lock); + gather++; + } + if (in->is_replicated()) { + if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK + send_lock_message(lock, LOCK_AC_MIX); + lock->init_gather(); + gather++; + } + } + if (lock->is_leased()) { + revoke_client_leases(lock); + gather++; + } + if (lock->get_cap_shift() && + in->is_head() && + in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + bool need_recover = false; + if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + need_recover = true; + gather++; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + if (need_recover) + mds->mdcache->do_file_recover(); + } else { + in->start_scatter(lock); + lock->set_state(LOCK_MIX); + lock->clear_scatter_wanted(); + if (in->is_replicated()) { + bufferlist softdata; + lock->encode_locked_state(softdata); + send_lock_message(lock, LOCK_AC_MIX, softdata); + } + if (lock->get_cap_shift()) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } + } + } +} + + +void Locker::file_excl(ScatterLock *lock, bool *need_issue) +{ + CInode *in = static_cast<CInode*>(lock->get_parent()); + dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl; + + ceph_assert(in->is_auth()); + ceph_assert(lock->is_stable()); + + ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) || + (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else> + + switch (lock->get_state()) { + case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break; + case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break; + case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break; + case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break; + default: ceph_abort(); + } + int gather = 0; + + if (lock->is_rdlocked()) + gather++; + if (lock->is_wrlocked()) + gather++; + if (gather && lock->is_cached()) + invalidate_lock_caches(lock); + + if (in->is_replicated() && + lock->get_state() != LOCK_LOCK_EXCL && + lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock. + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + gather++; + } + if (lock->is_leased()) { + revoke_client_leases(lock); + gather++; + } + if (in->is_head() && + in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + bool need_recover = false; + if (in->state_test(CInode::STATE_NEEDSRECOVER)) { + mds->mdcache->queue_file_recover(in); + need_recover = true; + gather++; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + if (need_recover) + mds->mdcache->do_file_recover(); + } else { + lock->set_state(LOCK_EXCL); + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } +} + +void Locker::file_xsyn(SimpleLock *lock, bool *need_issue) +{ + dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl; + CInode *in = static_cast<CInode *>(lock->get_parent()); + ceph_assert(in->is_auth()); + ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()); + + switch (lock->get_state()) { + case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break; + default: ceph_abort(); + } + + int gather = 0; + if (lock->is_wrlocked()) { + if (lock->is_cached()) + invalidate_lock_caches(lock); + gather++; + } + + if (in->is_head() && + in->issued_caps_need_gather(lock)) { + if (need_issue) + *need_issue = true; + else + issue_caps(in); + gather++; + } + + if (gather) { + lock->get_parent()->auth_pin(lock); + } else { + lock->set_state(LOCK_XSYN); + lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); + if (need_issue) + *need_issue = true; + else + issue_caps(in); + } +} + +void Locker::file_recover(ScatterLock *lock) +{ + CInode *in = static_cast<CInode *>(lock->get_parent()); + dout(7) << "file_recover " << *lock << " on " << *in << dendl; + + ceph_assert(in->is_auth()); + //assert(lock->is_stable()); + ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover() + + int gather = 0; + + /* + if (in->is_replicated() + lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) { + send_lock_message(lock, LOCK_AC_LOCK); + lock->init_gather(); + gather++; + } + */ + if (in->is_head() && + in->issued_caps_need_gather(lock)) { + issue_caps(in); + gather++; + } + + lock->set_state(LOCK_SCAN); + if (gather) + in->state_set(CInode::STATE_NEEDSRECOVER); + else + mds->mdcache->queue_file_recover(in); +} + + +// messenger +void Locker::handle_file_lock(ScatterLock *lock, const cref_t<MLock> &m) +{ + CInode *in = static_cast<CInode*>(lock->get_parent()); + int from = m->get_asker(); + + if (mds->is_rejoin()) { + if (in->is_rejoining()) { + dout(7) << "handle_file_lock still rejoining " << *in + << ", dropping " << *m << dendl; + return; + } + } + + dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action()) + << " on " << *lock + << " from mds." << from << " " + << *in << dendl; + + bool caps = lock->get_cap_shift(); + + switch (m->get_action()) { + // -- replica -- + case LOCK_AC_SYNC: + ceph_assert(lock->get_state() == LOCK_LOCK || + lock->get_state() == LOCK_MIX || + lock->get_state() == LOCK_MIX_SYNC2); + + if (lock->get_state() == LOCK_MIX) { + lock->set_state(LOCK_MIX_SYNC); + eval_gather(lock, true); + if (lock->is_unstable_and_locked()) { + if (lock->is_cached()) + invalidate_lock_caches(lock); + mds->mdlog->flush(); + } + break; + } + + (static_cast<ScatterLock *>(lock))->finish_flush(); + (static_cast<ScatterLock *>(lock))->clear_flushed(); + + // ok + lock->decode_locked_state(m->get_data()); + lock->set_state(LOCK_SYNC); + + lock->get_rdlock(); + if (caps) + issue_caps(in); + lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); + lock->put_rdlock(); + break; + + case LOCK_AC_LOCK: + switch (lock->get_state()) { + case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break; + case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break; + default: ceph_abort(); + } + + eval_gather(lock, true); + if (lock->is_unstable_and_locked()) { + if (lock->is_cached()) + invalidate_lock_caches(lock); + mds->mdlog->flush(); + } + + break; + + case LOCK_AC_LOCKFLUSHED: + (static_cast<ScatterLock *>(lock))->finish_flush(); + (static_cast<ScatterLock *>(lock))->clear_flushed(); + // wake up scatter_nudge waiters + if (lock->is_stable()) + lock->finish_waiters(SimpleLock::WAIT_STABLE); + break; + + case LOCK_AC_MIX: + ceph_assert(lock->get_state() == LOCK_SYNC || + lock->get_state() == LOCK_LOCK || + lock->get_state() == LOCK_SYNC_MIX2); + + if (lock->get_state() == LOCK_SYNC) { + // MIXED + lock->set_state(LOCK_SYNC_MIX); + eval_gather(lock, true); + if (lock->is_unstable_and_locked()) { + if (lock->is_cached()) + invalidate_lock_caches(lock); + mds->mdlog->flush(); + } + break; + } + + // ok + lock->set_state(LOCK_MIX); + lock->decode_locked_state(m->get_data()); + + if (caps) + issue_caps(in); + + lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE); + break; + + + // -- auth -- + case LOCK_AC_LOCKACK: + ceph_assert(lock->get_state() == LOCK_SYNC_LOCK || + lock->get_state() == LOCK_MIX_LOCK || + lock->get_state() == LOCK_MIX_LOCK2 || + lock->get_state() == LOCK_MIX_EXCL || + lock->get_state() == LOCK_SYNC_EXCL || + lock->get_state() == LOCK_SYNC_MIX || + lock->get_state() == LOCK_MIX_TSYN); + ceph_assert(lock->is_gathering(from)); + lock->remove_gather(from); + + if (lock->get_state() == LOCK_MIX_LOCK || + lock->get_state() == LOCK_MIX_LOCK2 || + lock->get_state() == LOCK_MIX_EXCL || + lock->get_state() == LOCK_MIX_TSYN) { + lock->decode_locked_state(m->get_data()); + // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not + // delay calling scatter_writebehind(). + lock->clear_flushed(); + } + + if (lock->is_gathering()) { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", still gathering " << lock->get_gather_set() << dendl; + } else { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", last one" << dendl; + eval_gather(lock); + } + break; + + case LOCK_AC_SYNCACK: + ceph_assert(lock->get_state() == LOCK_MIX_SYNC); + ceph_assert(lock->is_gathering(from)); + lock->remove_gather(from); + + lock->decode_locked_state(m->get_data()); + + if (lock->is_gathering()) { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", still gathering " << lock->get_gather_set() << dendl; + } else { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", last one" << dendl; + eval_gather(lock); + } + break; + + case LOCK_AC_MIXACK: + ceph_assert(lock->get_state() == LOCK_SYNC_MIX); + ceph_assert(lock->is_gathering(from)); + lock->remove_gather(from); + + if (lock->is_gathering()) { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", still gathering " << lock->get_gather_set() << dendl; + } else { + dout(7) << "handle_file_lock " << *in << " from " << from + << ", last one" << dendl; + eval_gather(lock); + } + break; + + + // requests.... + case LOCK_AC_REQSCATTER: + if (lock->is_stable()) { + /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing) + * because the replica should be holding an auth_pin if they're + * doing this (and thus, we are freezing, not frozen, and indefinite + * starvation isn't an issue). + */ + dout(7) << "handle_file_lock got scatter request on " << *lock + << " on " << *lock->get_parent() << dendl; + if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter + scatter_mix(lock); + } else { + dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->set_scatter_wanted(); + } + break; + + case LOCK_AC_REQUNSCATTER: + if (lock->is_stable()) { + /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing) + * because the replica should be holding an auth_pin if they're + * doing this (and thus, we are freezing, not frozen, and indefinite + * starvation isn't an issue). + */ + dout(7) << "handle_file_lock got unscatter request on " << *lock + << " on " << *lock->get_parent() << dendl; + if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter + simple_lock(lock); // FIXME tempsync? + } else { + dout(7) << "handle_file_lock ignoring unscatter request on " << *lock + << " on " << *lock->get_parent() << dendl; + lock->set_unscatter_wanted(); + } + break; + + case LOCK_AC_REQRDLOCK: + handle_reqrdlock(lock, m); + break; + + case LOCK_AC_NUDGE: + if (!lock->get_parent()->is_auth()) { + dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock + << " on " << *lock->get_parent() << dendl; + } else if (!lock->get_parent()->is_replicated()) { + dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock + << " on " << *lock->get_parent() << dendl; + } else { + dout(7) << "handle_file_lock trying nudge on " << *lock + << " on " << *lock->get_parent() << dendl; + scatter_nudge(lock, 0, true); + mds->mdlog->flush(); + } + break; + + default: + ceph_abort(); + } +} |