From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/mds/SessionMap.h | 844 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 844 insertions(+) create mode 100644 src/mds/SessionMap.h (limited to 'src/mds/SessionMap.h') diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h new file mode 100644 index 000000000..e59f7f264 --- /dev/null +++ b/src/mds/SessionMap.h @@ -0,0 +1,844 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MDS_SESSIONMAP_H +#define CEPH_MDS_SESSIONMAP_H + +#include + +#include "include/unordered_map.h" + +#include "include/Context.h" +#include "include/xlist.h" +#include "include/elist.h" +#include "include/interval_set.h" +#include "mdstypes.h" +#include "mds/MDSAuthCaps.h" +#include "common/perf_counters.h" +#include "common/DecayCounter.h" + +#include "CInode.h" +#include "Capability.h" +#include "MDSContext.h" +#include "msg/Message.h" + +struct MDRequestImpl; + +enum { + l_mdssm_first = 5500, + l_mdssm_session_count, + l_mdssm_session_add, + l_mdssm_session_remove, + l_mdssm_session_open, + l_mdssm_session_stale, + l_mdssm_total_load, + l_mdssm_avg_load, + l_mdssm_avg_session_uptime, + l_mdssm_last, +}; + +class CInode; + +/* + * session + */ + +class Session : public RefCountedObject { + // -- state etc -- +public: + /* + + <-- closed <------------+ + ^ | | + | v | + killing <-- opening <----+ | + ^ | | | + | v | | + stale <--> open --> closing ---+ + + + additional dimension of 'importing' (with counter) + + */ + + using clock = ceph::coarse_mono_clock; + using time = ceph::coarse_mono_time; + + enum { + STATE_CLOSED = 0, + STATE_OPENING = 1, // journaling open + STATE_OPEN = 2, + STATE_CLOSING = 3, // journaling close + STATE_STALE = 4, + STATE_KILLING = 5 + }; + + Session() = delete; + Session(ConnectionRef con) : + item_session_list(this), + requests(member_offset(MDRequestImpl, item_session_request)), + recall_caps(g_conf().get_val("mds_recall_warning_decay_rate")), + release_caps(g_conf().get_val("mds_recall_warning_decay_rate")), + recall_caps_throttle(g_conf().get_val("mds_recall_max_decay_rate")), + recall_caps_throttle2o(0.5), + session_cache_liveness(g_conf().get_val("mds_session_cache_liveness_decay_rate")), + cap_acquisition(g_conf().get_val("mds_session_cap_acquisition_decay_rate")), + birth_time(clock::now()) + { + set_connection(std::move(con)); + } + ~Session() override { + ceph_assert(!item_session_list.is_on_list()); + preopen_out_queue.clear(); + } + + static std::string_view get_state_name(int s) { + switch (s) { + case STATE_CLOSED: return "closed"; + case STATE_OPENING: return "opening"; + case STATE_OPEN: return "open"; + case STATE_CLOSING: return "closing"; + case STATE_STALE: return "stale"; + case STATE_KILLING: return "killing"; + default: return "???"; + } + } + + void dump(ceph::Formatter *f, bool cap_dump=false) const; + void push_pv(version_t pv) + { + ceph_assert(projected.empty() || projected.back() != pv); + projected.push_back(pv); + } + + void pop_pv(version_t v) + { + ceph_assert(!projected.empty()); + ceph_assert(projected.front() == v); + projected.pop_front(); + } + + int get_state() const { return state; } + void set_state(int new_state) + { + if (state != new_state) { + state = new_state; + state_seq++; + } + } + + void set_reconnecting(bool s) { reconnecting = s; } + + void decode(ceph::buffer::list::const_iterator &p); + template + void set_client_metadata(T&& meta) + { + info.client_metadata = std::forward(meta); + _update_human_name(); + } + + const std::string& get_human_name() const {return human_name;} + + size_t get_request_count() const; + + void notify_cap_release(size_t n_caps); + uint64_t notify_recall_sent(size_t new_limit); + auto get_recall_caps_throttle() const { + return recall_caps_throttle.get(); + } + auto get_recall_caps_throttle2o() const { + return recall_caps_throttle2o.get(); + } + auto get_recall_caps() const { + return recall_caps.get(); + } + auto get_release_caps() const { + return release_caps.get(); + } + auto get_session_cache_liveness() const { + return session_cache_liveness.get(); + } + auto get_cap_acquisition() const { + return cap_acquisition.get(); + } + + inodeno_t take_ino(inodeno_t ino = 0) { + if (ino) { + if (!info.prealloc_inos.contains(ino)) + return 0; + if (delegated_inos.contains(ino)) { + delegated_inos.erase(ino); + } else if (free_prealloc_inos.contains(ino)) { + free_prealloc_inos.erase(ino); + } else { + ceph_assert(0); + } + } else if (!free_prealloc_inos.empty()) { + ino = free_prealloc_inos.range_start(); + free_prealloc_inos.erase(ino); + } + return ino; + } + + void delegate_inos(int want, interval_set& inos) { + want -= (int)delegated_inos.size(); + if (want <= 0) + return; + + for (auto it = free_prealloc_inos.begin(); it != free_prealloc_inos.end(); ) { + if (want < (int)it.get_len()) { + inos.insert(it.get_start(), (inodeno_t)want); + delegated_inos.insert(it.get_start(), (inodeno_t)want); + free_prealloc_inos.erase(it.get_start(), (inodeno_t)want); + break; + } + want -= (int)it.get_len(); + inos.insert(it.get_start(), it.get_len()); + delegated_inos.insert(it.get_start(), it.get_len()); + free_prealloc_inos.erase(it++); + if (want <= 0) + break; + } + } + + // sans any delegated ones + int get_num_prealloc_inos() const { + return free_prealloc_inos.size(); + } + + int get_num_projected_prealloc_inos() const { + return get_num_prealloc_inos() + pending_prealloc_inos.size(); + } + + client_t get_client() const { + return info.get_client(); + } + + std::string_view get_state_name() const { return get_state_name(state); } + uint64_t get_state_seq() const { return state_seq; } + bool is_closed() const { return state == STATE_CLOSED; } + bool is_opening() const { return state == STATE_OPENING; } + bool is_open() const { return state == STATE_OPEN; } + bool is_closing() const { return state == STATE_CLOSING; } + bool is_stale() const { return state == STATE_STALE; } + bool is_killing() const { return state == STATE_KILLING; } + + void inc_importing() { + ++importing_count; + } + void dec_importing() { + ceph_assert(importing_count > 0); + --importing_count; + } + bool is_importing() const { return importing_count > 0; } + + void set_load_avg_decay_rate(double rate) { + ceph_assert(is_open() || is_stale()); + load_avg = DecayCounter(rate); + } + uint64_t get_load_avg() const { + return (uint64_t)load_avg.get(); + } + void hit_session() { + load_avg.adjust(); + } + + double get_session_uptime() const { + std::chrono::duration uptime = clock::now() - birth_time; + return uptime.count(); + } + + time get_birth_time() const { + return birth_time; + } + + void inc_cap_gen() { ++cap_gen; } + uint32_t get_cap_gen() const { return cap_gen; } + + version_t inc_push_seq() { return ++cap_push_seq; } + version_t get_push_seq() const { return cap_push_seq; } + + version_t wait_for_flush(MDSContext* c) { + waitfor_flush[get_push_seq()].push_back(c); + return get_push_seq(); + } + void finish_flush(version_t seq, MDSContext::vec& ls) { + while (!waitfor_flush.empty()) { + auto it = waitfor_flush.begin(); + if (it->first > seq) + break; + auto& v = it->second; + ls.insert(ls.end(), v.begin(), v.end()); + waitfor_flush.erase(it); + } + } + + void touch_readdir_cap(uint32_t count) { + cap_acquisition.hit(count); + } + + void touch_cap(Capability *cap) { + session_cache_liveness.hit(1.0); + caps.push_front(&cap->item_session_caps); + } + + void touch_cap_bottom(Capability *cap) { + session_cache_liveness.hit(1.0); + caps.push_back(&cap->item_session_caps); + } + + void touch_lease(ClientLease *r) { + session_cache_liveness.hit(1.0); + leases.push_back(&r->item_session_lease); + } + + bool is_any_flush_waiter() { + return !waitfor_flush.empty(); + } + + void add_completed_request(ceph_tid_t t, inodeno_t created) { + info.completed_requests[t] = created; + completed_requests_dirty = true; + } + bool trim_completed_requests(ceph_tid_t mintid) { + // trim + bool erased_any = false; + while (!info.completed_requests.empty() && + (mintid == 0 || info.completed_requests.begin()->first < mintid)) { + info.completed_requests.erase(info.completed_requests.begin()); + erased_any = true; + } + + if (erased_any) { + completed_requests_dirty = true; + } + return erased_any; + } + bool have_completed_request(ceph_tid_t tid, inodeno_t *pcreated) const { + auto p = info.completed_requests.find(tid); + if (p == info.completed_requests.end()) + return false; + if (pcreated) + *pcreated = p->second; + return true; + } + + void add_completed_flush(ceph_tid_t tid) { + info.completed_flushes.insert(tid); + } + bool trim_completed_flushes(ceph_tid_t mintid) { + bool erased_any = false; + while (!info.completed_flushes.empty() && + (mintid == 0 || *info.completed_flushes.begin() < mintid)) { + info.completed_flushes.erase(info.completed_flushes.begin()); + erased_any = true; + } + if (erased_any) { + completed_requests_dirty = true; + } + return erased_any; + } + bool have_completed_flush(ceph_tid_t tid) const { + return info.completed_flushes.count(tid); + } + + uint64_t get_num_caps() const { + return caps.size(); + } + + unsigned get_num_completed_flushes() const { return info.completed_flushes.size(); } + unsigned get_num_trim_flushes_warnings() const { + return num_trim_flushes_warnings; + } + void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings; } + void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings = 0; } + + unsigned get_num_completed_requests() const { return info.completed_requests.size(); } + unsigned get_num_trim_requests_warnings() const { + return num_trim_requests_warnings; + } + void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings; } + void reset_num_trim_requests_warnings() { num_trim_requests_warnings = 0; } + + bool has_dirty_completed_requests() const + { + return completed_requests_dirty; + } + + void clear_dirty_completed_requests() + { + completed_requests_dirty = false; + } + + int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid, + const std::vector *gid_list, int new_uid, int new_gid); + + bool fs_name_capable(std::string_view fs_name, unsigned mask) const { + return auth_caps.fs_name_capable(fs_name, mask); + } + + void set_connection(ConnectionRef con) { + connection = std::move(con); + auto& c = connection; + if (c) { + info.auth_name = c->get_peer_entity_name(); + info.inst.addr = c->get_peer_socket_addr(); + info.inst.name = entity_name_t(c->get_peer_type(), c->get_peer_global_id()); + } + } + const ConnectionRef& get_connection() const { + return connection; + } + + void clear() { + pending_prealloc_inos.clear(); + free_prealloc_inos.clear(); + delegated_inos.clear(); + info.clear_meta(); + + cap_push_seq = 0; + last_cap_renew = clock::zero(); + } + + Session *reclaiming_from = nullptr; + session_info_t info; ///< durable bits + MDSAuthCaps auth_caps; + + xlist::item item_session_list; + + std::list> preopen_out_queue; ///< messages for client, queued before they connect + + /* This is mutable to allow get_request_count to be const. elist does not + * support const iterators yet. + */ + mutable elist requests; + + interval_set pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos + interval_set free_prealloc_inos; // + interval_set delegated_inos; // hand these out to client + + xlist caps; // inodes with caps; front=most recently used + xlist leases; // metadata leases to clients + time last_cap_renew = clock::zero(); + time last_seen = clock::zero(); + + // -- leases -- + uint32_t lease_seq = 0; + +protected: + ConnectionRef connection; + +private: + friend class SessionMap; + + // Human (friendly) name is soft state generated from client metadata + void _update_human_name(); + + int state = STATE_CLOSED; + bool reconnecting = false; + uint64_t state_seq = 0; + int importing_count = 0; + + std::string human_name; + + // Versions in this session was projected: used to verify + // that appropriate mark_dirty calls follow. + std::deque projected; + + // request load average for this session + DecayCounter load_avg; + + // Ephemeral state for tracking progress of capability recalls + // caps being recalled recently by this session; used for Beacon warnings + DecayCounter recall_caps; // caps that have been released + DecayCounter release_caps; + // throttle on caps recalled + DecayCounter recall_caps_throttle; + // second order throttle that prevents recalling too quickly + DecayCounter recall_caps_throttle2o; + // New limit in SESSION_RECALL + uint32_t recall_limit = 0; + + // session caps liveness + DecayCounter session_cache_liveness; + + // cap acquisition via readdir + DecayCounter cap_acquisition; + + // session start time -- used to track average session time + // note that this is initialized in the constructor rather + // than at the time of adding a session to the sessionmap + // as journal replay of sessionmap will not call add_session(). + time birth_time; + + // -- caps -- + uint32_t cap_gen = 0; + version_t cap_push_seq = 0; // cap push seq # + std::map waitfor_flush; // flush session messages + + // Has completed_requests been modified since the last time we + // wrote this session out? + bool completed_requests_dirty = false; + + unsigned num_trim_flushes_warnings = 0; + unsigned num_trim_requests_warnings = 0; +}; + +class SessionFilter +{ +public: + SessionFilter() : reconnecting(false, false) {} + + bool match( + const Session &session, + std::function is_reconnecting) const; + int parse(const std::vector &args, std::ostream *ss); + void set_reconnecting(bool v) + { + reconnecting.first = true; + reconnecting.second = v; + } + + std::map metadata; + std::string auth_name; + std::string state; + int64_t id = 0; +protected: + // First is whether to filter, second is filter value + std::pair reconnecting; +}; + +/* + * session map + */ + +class MDSRank; + +/** + * Encapsulate the serialized state associated with SessionMap. Allows + * encode/decode outside of live MDS instance. + */ +class SessionMapStore { +public: + using clock = Session::clock; + using time = Session::time; + + SessionMapStore(): total_load_avg(decay_rate) {} + virtual ~SessionMapStore() {}; + + version_t get_version() const {return version;} + + virtual void encode_header(ceph::buffer::list *header_bl); + virtual void decode_header(ceph::buffer::list &header_bl); + virtual void decode_values(std::map &session_vals); + virtual void decode_legacy(ceph::buffer::list::const_iterator& blp); + void dump(ceph::Formatter *f) const; + + void set_rank(mds_rank_t r) + { + rank = r; + } + + Session* get_or_add_session(const entity_inst_t& i) { + Session *s; + auto session_map_entry = session_map.find(i.name); + if (session_map_entry != session_map.end()) { + s = session_map_entry->second; + } else { + s = session_map[i.name] = new Session(ConnectionRef()); + s->info.inst = i; + s->last_cap_renew = Session::clock::now(); + if (logger) { + logger->set(l_mdssm_session_count, session_map.size()); + logger->inc(l_mdssm_session_add); + } + } + + return s; + } + + static void generate_test_instances(std::list& ls); + + void reset_state() + { + session_map.clear(); + } + + mds_rank_t rank = MDS_RANK_NONE; + +protected: + version_t version = 0; + ceph::unordered_map session_map; + PerfCounters *logger =nullptr; + + // total request load avg + double decay_rate = g_conf().get_val("mds_request_load_average_decay_rate"); + DecayCounter total_load_avg; +}; + +class SessionMap : public SessionMapStore { +public: + SessionMap() = delete; + explicit SessionMap(MDSRank *m) : mds(m) {} + + ~SessionMap() override + { + for (auto p : by_state) + delete p.second; + + if (logger) { + g_ceph_context->get_perfcounters_collection()->remove(logger); + } + + delete logger; + } + + uint64_t set_state(Session *session, int state); + void update_average_session_age(); + + void register_perfcounters(); + + void set_version(const version_t v) + { + version = projected = v; + } + + void set_projected(const version_t v) + { + projected = v; + } + + version_t get_projected() const + { + return projected; + } + + version_t get_committed() const + { + return committed; + } + + version_t get_committing() const + { + return committing; + } + + // sessions + void decode_legacy(ceph::buffer::list::const_iterator& blp) override; + bool empty() const { return session_map.empty(); } + const auto& get_sessions() const { + return session_map; + } + + bool is_any_state(int state) const { + auto it = by_state.find(state); + if (it == by_state.end() || it->second->empty()) + return false; + return true; + } + + bool have_unclosed_sessions() const { + return + is_any_state(Session::STATE_OPENING) || + is_any_state(Session::STATE_OPEN) || + is_any_state(Session::STATE_CLOSING) || + is_any_state(Session::STATE_STALE) || + is_any_state(Session::STATE_KILLING); + } + bool have_session(entity_name_t w) const { + return session_map.count(w); + } + Session* get_session(entity_name_t w) { + auto session_map_entry = session_map.find(w); + return (session_map_entry != session_map.end() ? + session_map_entry-> second : nullptr); + } + const Session* get_session(entity_name_t w) const { + ceph::unordered_map::const_iterator p = session_map.find(w); + if (p == session_map.end()) { + return NULL; + } else { + return p->second; + } + } + + void add_session(Session *s); + void remove_session(Session *s); + void touch_session(Session *session); + + Session *get_oldest_session(int state) { + auto by_state_entry = by_state.find(state); + if (by_state_entry == by_state.end() || by_state_entry->second->empty()) + return 0; + return by_state_entry->second->front(); + } + + void dump(); + + template + void get_client_sessions(F&& f) const { + for (const auto& p : session_map) { + auto& session = p.second; + if (session->info.inst.name.is_client()) + f(session); + } + } + template + void get_client_session_set(C& c) const { + auto f = [&c](auto& s) { + c.insert(s); + }; + get_client_sessions(f); + } + + // helpers + entity_inst_t& get_inst(entity_name_t w) { + ceph_assert(session_map.count(w)); + return session_map[w]->info.inst; + } + version_t get_push_seq(client_t client) { + return get_session(entity_name_t::CLIENT(client.v))->get_push_seq(); + } + bool have_completed_request(metareqid_t rid) { + Session *session = get_session(rid.name); + return session && session->have_completed_request(rid.tid, NULL); + } + void trim_completed_requests(entity_name_t c, ceph_tid_t tid) { + Session *session = get_session(c); + ceph_assert(session); + session->trim_completed_requests(tid); + } + + void wipe(); + void wipe_ino_prealloc(); + + object_t get_object_name() const; + + void load(MDSContext *onload); + void _load_finish( + int operation_r, + int header_r, + int values_r, + bool first, + ceph::buffer::list &header_bl, + std::map &session_vals, + bool more_session_vals); + + void load_legacy(); + void _load_legacy_finish(int r, ceph::buffer::list &bl); + + void save(MDSContext *onsave, version_t needv=0); + void _save_finish(version_t v); + + /** + * Advance the version, and mark this session + * as dirty within the new version. + * + * Dirty means journalled but needing writeback + * to the backing store. Must have called + * mark_projected previously for this session. + */ + void mark_dirty(Session *session, bool may_save=true); + + /** + * Advance the projected version, and mark this + * session as projected within the new version + * + * Projected means the session is updated in memory + * but we're waiting for the journal write of the update + * to finish. Must subsequently call mark_dirty + * for sessions in the same global order as calls + * to mark_projected. + */ + version_t mark_projected(Session *session); + + /** + * During replay, advance versions to account + * for a session modification, and mark the + * session dirty. + */ + void replay_dirty_session(Session *session); + + /** + * During replay, if a session no longer present + * would have consumed a version, advance `version` + * and `projected` to account for that. + */ + void replay_advance_version(); + + /** + * During replay, open sessions, advance versions and + * mark these sessions as dirty. + */ + void replay_open_sessions(version_t event_cmapv, + std::map& client_map, + std::map& client_metadata_map); + + /** + * For these session IDs, if a session exists with this ID, and it has + * dirty completed_requests, then persist it immediately + * (ahead of usual project/dirty versioned writes + * of the map). + */ + void save_if_dirty(const std::set &tgt_sessions, + MDSGatherBuilder *gather_bld); + + void hit_session(Session *session); + void handle_conf_change(const std::set &changed); + + MDSRank *mds; + std::map*> by_state; + std::map commit_waiters; + + // -- loading, saving -- + inodeno_t ino; + MDSContext::vec waiting_for_load; + +protected: + void _mark_dirty(Session *session, bool may_save); + + version_t projected = 0, committing = 0, committed = 0; + std::set dirty_sessions; + std::set null_sessions; + bool loaded_legacy = false; + +private: + uint64_t get_session_count_in_state(int state) { + return !is_any_state(state) ? 0 : by_state[state]->size(); + } + + void update_average_birth_time(const Session &s, bool added=true) { + uint32_t sessions = session_map.size(); + time birth_time = s.get_birth_time(); + + if (sessions == 1) { + avg_birth_time = added ? birth_time : clock::zero(); + return; + } + + if (added) { + avg_birth_time = clock::time_point( + ((avg_birth_time - clock::zero()) / sessions) * (sessions - 1) + + (birth_time - clock::zero()) / sessions); + } else { + avg_birth_time = clock::time_point( + ((avg_birth_time - clock::zero()) / (sessions - 1)) * sessions - + (birth_time - clock::zero()) / (sessions - 1)); + } + } + + time avg_birth_time = clock::zero(); +}; + +std::ostream& operator<<(std::ostream &out, const Session &s); +#endif -- cgit v1.2.3