diff options
Diffstat (limited to 'src/mds/SessionMap.cc')
-rw-r--r-- | src/mds/SessionMap.cc | 1226 |
1 files changed, 1226 insertions, 0 deletions
diff --git a/src/mds/SessionMap.cc b/src/mds/SessionMap.cc new file mode 100644 index 00000000..56c71de8 --- /dev/null +++ b/src/mds/SessionMap.cc @@ -0,0 +1,1226 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "MDSRank.h" +#include "MDCache.h" +#include "Mutation.h" +#include "SessionMap.h" +#include "osdc/Filer.h" +#include "common/Finisher.h" + +#include "common/config.h" +#include "common/errno.h" +#include "common/DecayCounter.h" +#include "include/ceph_assert.h" +#include "include/stringify.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_prefix *_dout << "mds." << rank << ".sessionmap " + +namespace { +class SessionMapIOContext : public MDSIOContextBase +{ + protected: + SessionMap *sessionmap; + MDSRank *get_mds() override {return sessionmap->mds;} + public: + explicit SessionMapIOContext(SessionMap *sessionmap_) : sessionmap(sessionmap_) { + ceph_assert(sessionmap != NULL); + } +}; +}; + +void SessionMap::register_perfcounters() +{ + PerfCountersBuilder plb(g_ceph_context, "mds_sessions", + l_mdssm_first, l_mdssm_last); + + plb.add_u64(l_mdssm_session_count, "session_count", + "Session count", "sess", PerfCountersBuilder::PRIO_INTERESTING); + + plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL); + plb.add_u64_counter(l_mdssm_session_add, "session_add", + "Sessions added"); + plb.add_u64_counter(l_mdssm_session_remove, "session_remove", + "Sessions removed"); + plb.add_u64(l_mdssm_session_open, "sessions_open", + "Sessions currently open"); + plb.add_u64(l_mdssm_session_stale, "sessions_stale", + "Sessions currently stale"); + plb.add_u64(l_mdssm_total_load, "total_load", "Total Load"); + plb.add_u64(l_mdssm_avg_load, "average_load", "Average Load"); + plb.add_u64(l_mdssm_avg_session_uptime, "avg_session_uptime", + "Average session uptime"); + + logger = plb.create_perf_counters(); + g_ceph_context->get_perfcounters_collection()->add(logger); +} + +void SessionMap::dump() +{ + dout(10) << "dump" << dendl; + for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin(); + p != session_map.end(); + ++p) + dout(10) << p->first << " " << p->second + << " state " << p->second->get_state_name() + << " completed " << p->second->info.completed_requests + << " prealloc_inos " << p->second->info.prealloc_inos + << " used_inos " << p->second->info.used_inos + << dendl; +} + + +// ---------------- +// LOAD + + +object_t SessionMap::get_object_name() const +{ + char s[30]; + snprintf(s, sizeof(s), "mds%d_sessionmap", int(mds->get_nodeid())); + return object_t(s); +} + +namespace { +class C_IO_SM_Load : public SessionMapIOContext { +public: + const bool first; //< Am I the initial (header) load? + int header_r; //< Return value from OMAP header read + int values_r; //< Return value from OMAP value read + bufferlist header_bl; + std::map<std::string, bufferlist> session_vals; + bool more_session_vals = false; + + C_IO_SM_Load(SessionMap *cm, const bool f) + : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {} + + void finish(int r) override { + sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals, + more_session_vals); + } + void print(ostream& out) const override { + out << "session_load"; + } +}; +} + + +/** + * Decode OMAP header. Call this once when loading. + */ +void SessionMapStore::decode_header( + bufferlist &header_bl) +{ + auto q = header_bl.cbegin(); + DECODE_START(1, q) + decode(version, q); + DECODE_FINISH(q); +} + +void SessionMapStore::encode_header( + bufferlist *header_bl) +{ + ENCODE_START(1, 1, *header_bl); + encode(version, *header_bl); + ENCODE_FINISH(*header_bl); +} + +/** + * Decode and insert some serialized OMAP values. Call this + * repeatedly to insert batched loads. + */ +void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals) +{ + for (std::map<std::string, bufferlist>::iterator i = session_vals.begin(); + i != session_vals.end(); ++i) { + + entity_inst_t inst; + + bool parsed = inst.name.parse(i->first); + if (!parsed) { + derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl; + throw buffer::malformed_input("Corrupt entity name in sessionmap"); + } + + Session *s = get_or_add_session(inst); + if (s->is_closed()) { + s->set_state(Session::STATE_OPEN); + s->set_load_avg_decay_rate(decay_rate); + } + auto q = i->second.cbegin(); + s->decode(q); + } +} + +/** + * An OMAP read finished. + */ +void SessionMap::_load_finish( + int operation_r, + int header_r, + int values_r, + bool first, + bufferlist &header_bl, + std::map<std::string, bufferlist> &session_vals, + bool more_session_vals) +{ + if (operation_r < 0) { + derr << "_load_finish got " << cpp_strerror(operation_r) << dendl; + mds->clog->error() << "error reading sessionmap '" << get_object_name() + << "' " << operation_r << " (" + << cpp_strerror(operation_r) << ")"; + mds->damaged(); + ceph_abort(); // Should be unreachable because damaged() calls respawn() + } + + // Decode header + if (first) { + if (header_r != 0) { + derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl; + mds->clog->error() << "error reading sessionmap header " + << header_r << " (" << cpp_strerror(header_r) << ")"; + mds->damaged(); + ceph_abort(); // Should be unreachable because damaged() calls respawn() + } + + if(header_bl.length() == 0) { + dout(4) << __func__ << ": header missing, loading legacy..." << dendl; + load_legacy(); + return; + } + + try { + decode_header(header_bl); + } catch (buffer::error &e) { + mds->clog->error() << "corrupt sessionmap header: " << e.what(); + mds->damaged(); + ceph_abort(); // Should be unreachable because damaged() calls respawn() + } + dout(10) << __func__ << " loaded version " << version << dendl; + } + + if (values_r != 0) { + derr << __func__ << ": error reading values: " + << cpp_strerror(values_r) << dendl; + mds->clog->error() << "error reading sessionmap values: " + << values_r << " (" << cpp_strerror(values_r) << ")"; + mds->damaged(); + ceph_abort(); // Should be unreachable because damaged() calls respawn() + } + + // Decode session_vals + try { + decode_values(session_vals); + } catch (buffer::error &e) { + mds->clog->error() << "corrupt sessionmap values: " << e.what(); + mds->damaged(); + ceph_abort(); // Should be unreachable because damaged() calls respawn() + } + + if (more_session_vals) { + // Issue another read if we're not at the end of the omap + const std::string last_key = session_vals.rbegin()->first; + dout(10) << __func__ << ": continue omap load from '" + << last_key << "'" << dendl; + object_t oid = get_object_name(); + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + C_IO_SM_Load *c = new C_IO_SM_Load(this, false); + ObjectOperation op; + op.omap_get_vals(last_key, "", g_conf()->mds_sessionmap_keys_per_op, + &c->session_vals, &c->more_session_vals, &c->values_r); + mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, + new C_OnFinisher(c, mds->finisher)); + } else { + // I/O is complete. Update `by_state` + dout(10) << __func__ << ": omap load complete" << dendl; + for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin(); + i != session_map.end(); ++i) { + Session *s = i->second; + auto by_state_entry = by_state.find(s->get_state()); + if (by_state_entry == by_state.end()) + by_state_entry = by_state.emplace(s->get_state(), + new xlist<Session*>).first; + by_state_entry->second->push_back(&s->item_session_list); + } + + // Population is complete. Trigger load waiters. + dout(10) << __func__ << ": v " << version + << ", " << session_map.size() << " sessions" << dendl; + projected = committing = committed = version; + dump(); + finish_contexts(g_ceph_context, waiting_for_load); + } +} + +/** + * Populate session state from OMAP records in this + * rank's sessionmap object. + */ +void SessionMap::load(MDSContext *onload) +{ + dout(10) << "load" << dendl; + + if (onload) + waiting_for_load.push_back(onload); + + C_IO_SM_Load *c = new C_IO_SM_Load(this, true); + object_t oid = get_object_name(); + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + + ObjectOperation op; + op.omap_get_header(&c->header_bl, &c->header_r); + op.omap_get_vals("", "", g_conf()->mds_sessionmap_keys_per_op, + &c->session_vals, &c->more_session_vals, &c->values_r); + + mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, mds->finisher)); +} + +namespace { +class C_IO_SM_LoadLegacy : public SessionMapIOContext { +public: + bufferlist bl; + explicit C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {} + void finish(int r) override { + sessionmap->_load_legacy_finish(r, bl); + } + void print(ostream& out) const override { + out << "session_load_legacy"; + } +}; +} + + +/** + * Load legacy (object data blob) SessionMap format, assuming + * that waiting_for_load has already been populated with + * the relevant completion. This is the fallback if we do not + * find an OMAP header when attempting to load normally. + */ +void SessionMap::load_legacy() +{ + dout(10) << __func__ << dendl; + + C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this); + object_t oid = get_object_name(); + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + + mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0, + new C_OnFinisher(c, mds->finisher)); +} + +void SessionMap::_load_legacy_finish(int r, bufferlist &bl) +{ + auto blp = bl.cbegin(); + if (r < 0) { + derr << "_load_finish got " << cpp_strerror(r) << dendl; + ceph_abort_msg("failed to load sessionmap"); + } + dump(); + decode_legacy(blp); // note: this sets last_cap_renew = now() + dout(10) << "_load_finish v " << version + << ", " << session_map.size() << " sessions, " + << bl.length() << " bytes" + << dendl; + projected = committing = committed = version; + dump(); + + // Mark all sessions dirty, so that on next save() we will write + // a complete OMAP version of the data loaded from the legacy format + for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin(); + i != session_map.end(); ++i) { + // Don't use mark_dirty because on this occasion we want to ignore the + // keys_per_op limit and do one big write (upgrade must be atomic) + dirty_sessions.insert(i->first); + } + loaded_legacy = true; + + finish_contexts(g_ceph_context, waiting_for_load); +} + + +// ---------------- +// SAVE + +namespace { +class C_IO_SM_Save : public SessionMapIOContext { + version_t version; +public: + C_IO_SM_Save(SessionMap *cm, version_t v) : SessionMapIOContext(cm), version(v) {} + void finish(int r) override { + if (r != 0) { + get_mds()->handle_write_error(r); + } else { + sessionmap->_save_finish(version); + } + } + void print(ostream& out) const override { + out << "session_save"; + } +}; +} + +void SessionMap::save(MDSContext *onsave, version_t needv) +{ + dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl; + + if (needv && committing >= needv) { + ceph_assert(committing > committed); + commit_waiters[committing].push_back(onsave); + return; + } + + commit_waiters[version].push_back(onsave); + + committing = version; + SnapContext snapc; + object_t oid = get_object_name(); + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + + ObjectOperation op; + + /* Compose OSD OMAP transaction for full write */ + bufferlist header_bl; + encode_header(&header_bl); + op.omap_set_header(header_bl); + + /* If we loaded a legacy sessionmap, then erase the old data. If + * an old-versioned MDS tries to read it, it'll fail out safely + * with an end_of_buffer exception */ + if (loaded_legacy) { + dout(4) << __func__ << " erasing legacy sessionmap" << dendl; + op.truncate(0); + loaded_legacy = false; // only need to truncate once. + } + + dout(20) << " updating keys:" << dendl; + map<string, bufferlist> to_set; + for(std::set<entity_name_t>::iterator i = dirty_sessions.begin(); + i != dirty_sessions.end(); ++i) { + const entity_name_t name = *i; + Session *session = session_map[name]; + + if (session->is_open() || + session->is_closing() || + session->is_stale() || + session->is_killing()) { + dout(20) << " " << name << dendl; + // Serialize K + std::ostringstream k; + k << name; + + // Serialize V + bufferlist bl; + session->info.encode(bl, mds->mdsmap->get_up_features()); + + // Add to RADOS op + to_set[k.str()] = bl; + + session->clear_dirty_completed_requests(); + } else { + dout(20) << " " << name << " (ignoring)" << dendl; + } + } + if (!to_set.empty()) { + op.omap_set(to_set); + } + + dout(20) << " removing keys:" << dendl; + set<string> to_remove; + for(std::set<entity_name_t>::const_iterator i = null_sessions.begin(); + i != null_sessions.end(); ++i) { + dout(20) << " " << *i << dendl; + std::ostringstream k; + k << *i; + to_remove.insert(k.str()); + } + if (!to_remove.empty()) { + op.omap_rm_keys(to_remove); + } + + dirty_sessions.clear(); + null_sessions.clear(); + + mds->objecter->mutate(oid, oloc, op, snapc, + ceph::real_clock::now(), + 0, + new C_OnFinisher(new C_IO_SM_Save(this, version), + mds->finisher)); +} + +void SessionMap::_save_finish(version_t v) +{ + dout(10) << "_save_finish v" << v << dendl; + committed = v; + + finish_contexts(g_ceph_context, commit_waiters[v]); + commit_waiters.erase(v); +} + + +/** + * Deserialize sessions, and update by_state index + */ +void SessionMap::decode_legacy(bufferlist::const_iterator &p) +{ + // Populate `sessions` + SessionMapStore::decode_legacy(p); + + // Update `by_state` + for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin(); + i != session_map.end(); ++i) { + Session *s = i->second; + auto by_state_entry = by_state.find(s->get_state()); + if (by_state_entry == by_state.end()) + by_state_entry = by_state.emplace(s->get_state(), + new xlist<Session*>).first; + by_state_entry->second->push_back(&s->item_session_list); + } +} + +uint64_t SessionMap::set_state(Session *session, int s) { + if (session->state != s) { + session->set_state(s); + auto by_state_entry = by_state.find(s); + if (by_state_entry == by_state.end()) + by_state_entry = by_state.emplace(s, new xlist<Session*>).first; + by_state_entry->second->push_back(&session->item_session_list); + + if (session->is_open() || session->is_stale()) { + session->set_load_avg_decay_rate(decay_rate); + } + + // refresh number of sessions for states which have perf + // couters associated + logger->set(l_mdssm_session_open, + get_session_count_in_state(Session::STATE_OPEN)); + logger->set(l_mdssm_session_stale, + get_session_count_in_state(Session::STATE_STALE)); + } + + return session->get_state_seq(); +} + +void SessionMapStore::decode_legacy(bufferlist::const_iterator& p) +{ + auto now = clock::now(); + uint64_t pre; + decode(pre, p); + if (pre == (uint64_t)-1) { + DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p); + ceph_assert(struct_v >= 2); + + decode(version, p); + + while (!p.end()) { + entity_inst_t inst; + decode(inst.name, p); + Session *s = get_or_add_session(inst); + if (s->is_closed()) { + s->set_state(Session::STATE_OPEN); + s->set_load_avg_decay_rate(decay_rate); + } + s->decode(p); + } + + DECODE_FINISH(p); + } else { + // --- old format ---- + version = pre; + + // this is a meaningless upper bound. can be ignored. + __u32 n; + decode(n, p); + + while (n-- && !p.end()) { + auto p2 = p; + Session *s = new Session(ConnectionRef()); + s->info.decode(p); + { + auto& name = s->info.inst.name; + auto it = session_map.find(name); + if (it != session_map.end()) { + // eager client connected too fast! aie. + dout(10) << " already had session for " << name << ", recovering" << dendl; + delete s; + s = it->second; + p = p2; + s->info.decode(p); + } else { + it->second = s; + } + } + s->set_state(Session::STATE_OPEN); + s->set_load_avg_decay_rate(decay_rate); + s->last_cap_renew = now; + } + } +} + +void Session::dump(Formatter *f) const +{ + f->dump_int("id", info.inst.name.num()); + f->dump_object("entity", info.inst); + f->dump_string("state", get_state_name()); + f->dump_int("num_leases", leases.size()); + f->dump_int("num_caps", caps.size()); + if (is_open() || is_stale()) { + f->dump_unsigned("request_load_avg", get_load_avg()); + } + f->dump_float("uptime", get_session_uptime()); + f->dump_unsigned("requests_in_flight", get_request_count()); + f->dump_unsigned("completed_requests", get_num_completed_requests()); + f->dump_bool("reconnecting", reconnecting); + f->dump_object("recall_caps", recall_caps); + f->dump_object("release_caps", release_caps); + f->dump_object("recall_caps_throttle", recall_caps_throttle); + f->dump_object("recall_caps_throttle2o", recall_caps_throttle2o); + f->dump_object("session_cache_liveness", session_cache_liveness); + f->dump_object("cap_acquisition", cap_acquisition); + info.dump(f); +} + +void SessionMapStore::dump(Formatter *f) const +{ + f->open_array_section("sessions"); + for (const auto& p : session_map) { + f->dump_object("session", *p.second); + } + f->close_section(); // Sessions +} + +void SessionMapStore::generate_test_instances(list<SessionMapStore*>& ls) +{ + // pretty boring for now + ls.push_back(new SessionMapStore()); +} + +void SessionMap::wipe() +{ + dout(1) << "wipe start" << dendl; + dump(); + while (!session_map.empty()) { + Session *s = session_map.begin()->second; + remove_session(s); + } + version = ++projected; + dout(1) << "wipe result" << dendl; + dump(); + dout(1) << "wipe done" << dendl; +} + +void SessionMap::wipe_ino_prealloc() +{ + for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin(); + p != session_map.end(); + ++p) { + p->second->pending_prealloc_inos.clear(); + p->second->info.prealloc_inos.clear(); + p->second->info.used_inos.clear(); + } + projected = ++version; +} + +void SessionMap::add_session(Session *s) +{ + dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl; + + ceph_assert(session_map.count(s->info.inst.name) == 0); + session_map[s->info.inst.name] = s; + auto by_state_entry = by_state.find(s->state); + if (by_state_entry == by_state.end()) + by_state_entry = by_state.emplace(s->state, new xlist<Session*>).first; + by_state_entry->second->push_back(&s->item_session_list); + s->get(); + + update_average_birth_time(*s); + + logger->set(l_mdssm_session_count, session_map.size()); + logger->inc(l_mdssm_session_add); +} + +void SessionMap::remove_session(Session *s) +{ + dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl; + + update_average_birth_time(*s, false); + + s->trim_completed_requests(0); + s->item_session_list.remove_myself(); + session_map.erase(s->info.inst.name); + dirty_sessions.erase(s->info.inst.name); + null_sessions.insert(s->info.inst.name); + s->put(); + + logger->set(l_mdssm_session_count, session_map.size()); + logger->inc(l_mdssm_session_remove); +} + +void SessionMap::touch_session(Session *session) +{ + dout(10) << __func__ << " s=" << session << " name=" << session->info.inst.name << dendl; + + // Move to the back of the session list for this state (should + // already be on a list courtesy of add_session and set_state) + ceph_assert(session->item_session_list.is_on_list()); + auto by_state_entry = by_state.find(session->state); + if (by_state_entry == by_state.end()) + by_state_entry = by_state.emplace(session->state, + new xlist<Session*>).first; + by_state_entry->second->push_back(&session->item_session_list); + + session->last_cap_renew = clock::now(); +} + +void SessionMap::_mark_dirty(Session *s, bool may_save) +{ + if (dirty_sessions.count(s->info.inst.name)) + return; + + if (may_save && + dirty_sessions.size() >= g_conf()->mds_sessionmap_keys_per_op) { + // Pre-empt the usual save() call from journal segment trim, in + // order to avoid building up an oversized OMAP update operation + // from too many sessions modified at once + save(new C_MDSInternalNoop, version); + } + + null_sessions.erase(s->info.inst.name); + dirty_sessions.insert(s->info.inst.name); +} + +void SessionMap::mark_dirty(Session *s, bool may_save) +{ + dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name + << " v=" << version << dendl; + + _mark_dirty(s, may_save); + version++; + s->pop_pv(version); +} + +void SessionMap::replay_dirty_session(Session *s) +{ + dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name + << " v=" << version << dendl; + + _mark_dirty(s, false); + + replay_advance_version(); +} + +void SessionMap::replay_advance_version() +{ + version++; + projected = version; +} + +void SessionMap::replay_open_sessions(version_t event_cmapv, + map<client_t,entity_inst_t>& client_map, + map<client_t,client_metadata_t>& client_metadata_map) +{ + unsigned already_saved; + + if (version + client_map.size() < event_cmapv) + goto bad; + + // Server::finish_force_open_sessions() marks sessions dirty one by one. + // Marking a session dirty may flush all existing dirty sessions. So it's + // possible that some sessions are already saved in sessionmap. + already_saved = client_map.size() - (event_cmapv - version); + for (const auto& p : client_map) { + Session *s = get_or_add_session(p.second); + auto q = client_metadata_map.find(p.first); + if (q != client_metadata_map.end()) + s->info.client_metadata.merge(q->second); + + if (already_saved > 0) { + if (s->is_closed()) + goto bad; + + --already_saved; + continue; + } + + set_state(s, Session::STATE_OPEN); + replay_dirty_session(s); + } + return; + +bad: + mds->clog->error() << "error replaying open sessions(" << client_map.size() + << ") sessionmap v " << event_cmapv << " table " << version; + ceph_assert(g_conf()->mds_wipe_sessions); + mds->sessionmap.wipe(); + mds->sessionmap.set_version(event_cmapv); +} + +version_t SessionMap::mark_projected(Session *s) +{ + dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name + << " pv=" << projected << " -> " << projected + 1 << dendl; + ++projected; + s->push_pv(projected); + return projected; +} + +namespace { +class C_IO_SM_Save_One : public SessionMapIOContext { + MDSContext *on_safe; +public: + C_IO_SM_Save_One(SessionMap *cm, MDSContext *on_safe_) + : SessionMapIOContext(cm), on_safe(on_safe_) {} + void finish(int r) override { + if (r != 0) { + get_mds()->handle_write_error(r); + } else { + on_safe->complete(r); + } + } + void print(ostream& out) const override { + out << "session_save_one"; + } +}; +} + + +void SessionMap::save_if_dirty(const std::set<entity_name_t> &tgt_sessions, + MDSGatherBuilder *gather_bld) +{ + ceph_assert(gather_bld != NULL); + + std::vector<entity_name_t> write_sessions; + + // Decide which sessions require a write + for (std::set<entity_name_t>::iterator i = tgt_sessions.begin(); + i != tgt_sessions.end(); ++i) { + const entity_name_t &session_id = *i; + + if (session_map.count(session_id) == 0) { + // Session isn't around any more, never mind. + continue; + } + + Session *session = session_map[session_id]; + if (!session->has_dirty_completed_requests()) { + // Session hasn't had completed_requests + // modified since last write, no need to + // write it now. + continue; + } + + if (dirty_sessions.count(session_id) > 0) { + // Session is already dirtied, will be written, no + // need to pre-empt that. + continue; + } + // Okay, passed all our checks, now we write + // this session out. The version we write + // into the OMAP may now be higher-versioned + // than the version in the header, but that's + // okay because it's never a problem to have + // an overly-fresh copy of a session. + write_sessions.push_back(*i); + } + + dout(4) << __func__ << ": writing " << write_sessions.size() << dendl; + + // Batch writes into mds_sessionmap_keys_per_op + const uint32_t kpo = g_conf()->mds_sessionmap_keys_per_op; + map<string, bufferlist> to_set; + for (uint32_t i = 0; i < write_sessions.size(); ++i) { + const entity_name_t &session_id = write_sessions[i]; + Session *session = session_map[session_id]; + session->clear_dirty_completed_requests(); + + // Serialize K + std::ostringstream k; + k << session_id; + + // Serialize V + bufferlist bl; + session->info.encode(bl, mds->mdsmap->get_up_features()); + + // Add to RADOS op + to_set[k.str()] = bl; + + // Complete this write transaction? + if (i == write_sessions.size() - 1 + || i % kpo == kpo - 1) { + ObjectOperation op; + op.omap_set(to_set); + to_set.clear(); // clear to start a new transaction + + SnapContext snapc; + object_t oid = get_object_name(); + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + MDSContext *on_safe = gather_bld->new_sub(); + mds->objecter->mutate(oid, oloc, op, snapc, + ceph::real_clock::now(), 0, + new C_OnFinisher( + new C_IO_SM_Save_One(this, on_safe), + mds->finisher)); + } + } +} + +// ================= +// Session + +#undef dout_prefix +#define dout_prefix *_dout << "Session " + +/** + * Calculate the length of the `requests` member list, + * because elist does not have a size() method. + * + * O(N) runtime. + */ +size_t Session::get_request_count() const +{ + size_t result = 0; + for (auto p = requests.begin(); !p.end(); ++p) + ++result; + return result; +} + +/** + * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message, + * with n_caps equal to the number of caps that were released + * in the message. Used to update state about how many caps a + * client has released since it was last instructed to RECALL_STATE. + */ +void Session::notify_cap_release(size_t n_caps) +{ + recall_caps.hit(-(double)n_caps); + release_caps.hit(n_caps); +} + +/** + * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE + * message is sent to the client. Update our recall-related state + * in order to generate health metrics if the session doesn't see + * a commensurate number of calls to ::notify_cap_release + */ +uint64_t Session::notify_recall_sent(size_t new_limit) +{ + const auto num_caps = caps.size(); + ceph_assert(new_limit < num_caps); // Behaviour of Server::recall_client_state + const auto count = num_caps-new_limit; + uint64_t new_change; + if (recall_limit != new_limit) { + new_change = count; + } else { + new_change = 0; /* no change! */ + } + + /* Always hit the session counter as a RECALL message is still sent to the + * client and we do not want the MDS to burn its global counter tokens on a + * session that is not releasing caps (i.e. allow the session counter to + * throttle future RECALL messages). + */ + recall_caps_throttle.hit(count); + recall_caps_throttle2o.hit(count); + recall_caps.hit(count); + return new_change; +} + +/** + * Use client metadata to generate a somewhat-friendlier + * name for the client than its session ID. + * + * This is *not* guaranteed to be unique, and any machine + * consumers of session-related output should always use + * the session ID as a primary capacity and use this only + * as a presentation hint. + */ +void Session::_update_human_name() +{ + auto info_client_metadata_entry = info.client_metadata.find("hostname"); + if (info_client_metadata_entry != info.client_metadata.end()) { + // Happy path, refer to clients by hostname + human_name = info_client_metadata_entry->second; + if (!info.auth_name.has_default_id()) { + // When a non-default entity ID is set by the user, assume they + // would like to see it in references to the client, if it's + // reasonable short. Limit the length because we don't want + // to put e.g. uuid-generated names into a "human readable" + // rendering. + const int arbitrarily_short = 16; + if (info.auth_name.get_id().size() < arbitrarily_short) { + human_name += std::string(":") + info.auth_name.get_id(); + } + } + } else { + // Fallback, refer to clients by ID e.g. client.4567 + human_name = stringify(info.inst.name.num()); + } +} + +void Session::decode(bufferlist::const_iterator &p) +{ + info.decode(p); + + _update_human_name(); +} + +int Session::check_access(CInode *in, unsigned mask, + int caller_uid, int caller_gid, + const vector<uint64_t> *caller_gid_list, + int new_uid, int new_gid) +{ + string path; + CInode *diri = NULL; + if (!in->is_base()) + diri = in->get_projected_parent_dn()->get_dir()->get_inode(); + if (diri && diri->is_stray()){ + path = in->get_projected_inode()->stray_prior_path; + dout(20) << __func__ << " stray_prior_path " << path << dendl; + } else { + in->make_path_string(path, true); + dout(20) << __func__ << " path " << path << dendl; + } + if (path.length()) + path = path.substr(1); // drop leading / + + if (in->inode.is_dir() && + in->inode.has_layout() && + in->inode.layout.pool_ns.length() && + !connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) { + dout(10) << __func__ << " client doesn't support FS_FILE_LAYOUT_V2" << dendl; + return -EIO; + } + + if (!auth_caps.is_capable(path, in->inode.uid, in->inode.gid, in->inode.mode, + caller_uid, caller_gid, caller_gid_list, mask, + new_uid, new_gid, + info.inst.addr)) { + return -EACCES; + } + return 0; +} + +// track total and per session load +void SessionMap::hit_session(Session *session) { + uint64_t sessions = get_session_count_in_state(Session::STATE_OPEN) + + get_session_count_in_state(Session::STATE_STALE) + + get_session_count_in_state(Session::STATE_CLOSING); + ceph_assert(sessions != 0); + + double total_load = total_load_avg.hit(); + double avg_load = total_load / sessions; + + logger->set(l_mdssm_total_load, (uint64_t)total_load); + logger->set(l_mdssm_avg_load, (uint64_t)avg_load); + + session->hit_session(); +} + +void SessionMap::handle_conf_change(const std::set<std::string>& changed) +{ + auto apply_to_open_sessions = [this](auto f) { + if (auto it = by_state.find(Session::STATE_OPEN); it != by_state.end()) { + for (const auto &session : *(it->second)) { + f(session); + } + } + if (auto it = by_state.find(Session::STATE_STALE); it != by_state.end()) { + for (const auto &session : *(it->second)) { + f(session); + } + } + }; + + if (changed.count("mds_request_load_average_decay_rate")) { + auto d = g_conf().get_val<double>("mds_request_load_average_decay_rate"); + + decay_rate = d; + total_load_avg = DecayCounter(d); + + auto mut = [d](auto s) { + s->set_load_avg_decay_rate(d); + }; + apply_to_open_sessions(mut); + } + if (changed.count("mds_recall_max_decay_rate")) { + auto d = g_conf().get_val<double>("mds_recall_max_decay_rate"); + auto mut = [d](auto s) { + s->recall_caps_throttle = DecayCounter(d); + }; + apply_to_open_sessions(mut); + } + if (changed.count("mds_recall_warning_decay_rate")) { + auto d = g_conf().get_val<double>("mds_recall_warning_decay_rate"); + auto mut = [d](auto s) { + s->recall_caps = DecayCounter(d); + s->release_caps = DecayCounter(d); + }; + apply_to_open_sessions(mut); + } + if (changed.count("mds_session_cache_liveness_decay_rate")) { + auto d = g_conf().get_val<double>("mds_session_cache_liveness_decay_rate"); + auto mut = [d](auto s) { + s->session_cache_liveness = DecayCounter(d); + s->session_cache_liveness.hit(s->caps.size()); /* so the MDS doesn't immediately start trimming a new session */ + }; + apply_to_open_sessions(mut); + } + if (changed.count("mds_session_cap_acquisition_decay_rate")) { + auto d = g_conf().get_val<double>("mds_session_cap_acquisition_decay_rate"); + auto mut = [d](auto s) { + s->cap_acquisition = DecayCounter(d); + }; + apply_to_open_sessions(mut); + } +} + +void SessionMap::update_average_session_age() { + if (!session_map.size()) { + return; + } + + double avg_uptime = std::chrono::duration<double>(clock::now()-avg_birth_time).count(); + logger->set(l_mdssm_avg_session_uptime, (uint64_t)avg_uptime); +} + +int SessionFilter::parse( + const std::vector<std::string> &args, + std::stringstream *ss) +{ + ceph_assert(ss != NULL); + + for (const auto &s : args) { + dout(20) << __func__ << " parsing filter '" << s << "'" << dendl; + + auto eq = s.find("="); + if (eq == std::string::npos || eq == s.size()) { + *ss << "Invalid filter '" << s << "'"; + return -EINVAL; + } + + // Keys that start with this are to be taken as referring + // to freeform client metadata fields. + const std::string metadata_prefix("client_metadata."); + + auto k = s.substr(0, eq); + auto v = s.substr(eq + 1); + + dout(20) << __func__ << " parsed k='" << k << "', v='" << v << "'" << dendl; + + if (k.compare(0, metadata_prefix.size(), metadata_prefix) == 0 + && k.size() > metadata_prefix.size()) { + // Filter on arbitrary metadata key (no fixed schema for this, + // so anything after the dot is a valid field to filter on) + auto metadata_key = k.substr(metadata_prefix.size()); + metadata.insert(std::make_pair(metadata_key, v)); + } else if (k == "auth_name") { + // Filter on client entity name + auth_name = v; + } else if (k == "state") { + state = v; + } else if (k == "id") { + std::string err; + id = strict_strtoll(v.c_str(), 10, &err); + if (!err.empty()) { + *ss << err; + return -EINVAL; + } + } else if (k == "reconnecting") { + + /** + * Strict boolean parser. Allow true/false/0/1. + * Anything else is -EINVAL. + */ + auto is_true = [](std::string_view bstr, bool *out) -> bool + { + ceph_assert(out != nullptr); + + if (bstr == "true" || bstr == "1") { + *out = true; + return 0; + } else if (bstr == "false" || bstr == "0") { + *out = false; + return 0; + } else { + return -EINVAL; + } + }; + + bool bval; + int r = is_true(v, &bval); + if (r == 0) { + set_reconnecting(bval); + } else { + *ss << "Invalid boolean value '" << v << "'"; + return -EINVAL; + } + } else { + *ss << "Invalid filter key '" << k << "'"; + return -EINVAL; + } + } + + return 0; +} + +bool SessionFilter::match( + const Session &session, + std::function<bool(client_t)> is_reconnecting) const +{ + for (const auto &m : metadata) { + const auto &k = m.first; + const auto &v = m.second; + auto it = session.info.client_metadata.find(k); + if (it == session.info.client_metadata.end()) { + return false; + } + if (it->second != v) { + return false; + } + } + + if (!auth_name.empty() && auth_name != session.info.auth_name.get_id()) { + return false; + } + + if (!state.empty() && state != session.get_state_name()) { + return false; + } + + if (id != 0 && id != session.info.inst.name.num()) { + return false; + } + + if (reconnecting.first) { + const bool am_reconnecting = is_reconnecting(session.info.inst.name.num()); + if (reconnecting.second != am_reconnecting) { + return false; + } + } + + return true; +} + +std::ostream& operator<<(std::ostream &out, const Session &s) +{ + if (s.get_human_name() == stringify(s.get_client())) { + out << s.get_human_name(); + } else { + out << s.get_human_name() << " (" << std::dec << s.get_client() << ")"; + } + return out; +} + |