diff options
Diffstat (limited to '')
-rw-r--r-- | src/mon/Session.h | 280 |
1 files changed, 280 insertions, 0 deletions
diff --git a/src/mon/Session.h b/src/mon/Session.h new file mode 100644 index 00000000..43fe38ab --- /dev/null +++ b/src/mon/Session.h @@ -0,0 +1,280 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MON_SESSION_H +#define CEPH_MON_SESSION_H + +#include "global/global_context.h" +#include "include/xlist.h" +#include "msg/msg_types.h" +#include "mon/mon_types.h" + +#include "auth/AuthServiceHandler.h" +#include "osd/OSDMap.h" + +#include "MonCap.h" + +struct MonSession; + +struct Subscription { + MonSession *session; + string type; + xlist<Subscription*>::item type_item; + version_t next; + bool onetime; + bool incremental_onetime; // has CEPH_FEATURE_INCSUBOSDMAP + + Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this), + next(0), onetime(false), incremental_onetime(false) {} +}; + +struct MonSession : public RefCountedObject { + ConnectionRef con; + int con_type = 0; + uint64_t con_features = 0; // zero if AnonConnection + entity_name_t name; + entity_addrvec_t addrs; + entity_addr_t socket_addr; + utime_t session_timeout; + bool closed = false; + xlist<MonSession*>::item item; + set<uint64_t> routed_request_tids; + MonCap caps; + + bool authenticated = false; ///< true if auth handshake is complete + + map<string, Subscription*> sub_map; + epoch_t osd_epoch = 0; ///< the osdmap epoch sent to the mon client + + AuthServiceHandler *auth_handler = nullptr; + EntityName entity_name; + uint64_t global_id = 0; + global_id_status_t global_id_status = global_id_status_t::NONE; + + ConnectionRef proxy_con; + uint64_t proxy_tid = 0; + + string remote_host; ///< remote host name + map<string,string> last_config; ///< most recently shared config + bool any_config = false; + + MonSession(Connection *c) + : RefCountedObject(g_ceph_context), + con(c), + item(this) { } + + void _ident(const entity_name_t& n, const entity_addrvec_t& av) { + con_type = con->get_peer_type(); + name = n; + addrs = av; + socket_addr = con->get_peer_socket_addr(); + if (con->get_messenger()) { + // only fill in features if this is a non-anonymous connection + con_features = con->get_features(); + } + } + + ~MonSession() override { + //generic_dout(0) << "~MonSession " << this << dendl; + // we should have been removed before we get destructed; see MonSessionMap::remove_session() + ceph_assert(!item.is_on_list()); + ceph_assert(sub_map.empty()); + delete auth_handler; + } + + bool is_capable(string service, int mask) { + map<string,string> args; + return caps.is_capable( + g_ceph_context, + entity_name, + service, "", args, + mask & MON_CAP_R, mask & MON_CAP_W, mask & MON_CAP_X, + get_peer_socket_addr()); + } + + const entity_addr_t& get_peer_socket_addr() { + return socket_addr; + } + + void dump(Formatter *f) const { + f->dump_stream("name") << name; + f->dump_stream("entity_name") << entity_name; + f->dump_object("addrs", addrs); + f->dump_object("socket_addr", socket_addr); + f->dump_string("con_type", ceph_entity_type_name(con_type)); + f->dump_unsigned("con_features", con_features); + f->dump_stream("con_features_hex") << std::hex << con_features << std::dec; + f->dump_string("con_features_release", + ceph_release_name(ceph_release_from_features(con_features))); + f->dump_bool("open", !closed); + f->dump_object("caps", caps); + f->dump_bool("authenticated", authenticated); + f->dump_unsigned("global_id", global_id); + f->dump_stream("global_id_status") << global_id_status; + f->dump_unsigned("osd_epoch", osd_epoch); + f->dump_string("remote_host", remote_host); + } +}; + + +struct MonSessionMap { + xlist<MonSession*> sessions; + map<string, xlist<Subscription*>* > subs; + multimap<int, MonSession*> by_osd; + FeatureMap feature_map; // type -> features -> count + + MonSessionMap() {} + ~MonSessionMap() { + while (!subs.empty()) { + ceph_assert(subs.begin()->second->empty()); + delete subs.begin()->second; + subs.erase(subs.begin()); + } + } + + unsigned get_size() const { + return sessions.size(); + } + + void remove_session(MonSession *s) { + ceph_assert(!s->closed); + for (map<string,Subscription*>::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p) { + p->second->type_item.remove_myself(); + delete p->second; + } + s->sub_map.clear(); + s->item.remove_myself(); + if (s->name.is_osd() && + s->name.num() >= 0) { + for (multimap<int,MonSession*>::iterator p = by_osd.find(s->name.num()); + p->first == s->name.num(); + ++p) + if (p->second == s) { + by_osd.erase(p); + break; + } + } + if (s->con_features) { + feature_map.rm(s->con_type, s->con_features); + } + s->closed = true; + s->put(); + } + + MonSession *new_session(const entity_name_t& n, + const entity_addrvec_t& av, + Connection *c) { + MonSession *s = new MonSession(c); + ceph_assert(s); + s->_ident(n, av); + add_session(s); + return s; + } + + void add_session(MonSession *s) { + s->session_timeout = ceph_clock_now(); + s->session_timeout += g_conf()->mon_session_timeout; + + sessions.push_back(&s->item); + s->get(); + if (s->name.is_osd() && + s->name.num() >= 0) { + by_osd.insert(pair<int,MonSession*>(s->name.num(), s)); + } + if (s->con_features) { + feature_map.add(s->con_type, s->con_features); + } + } + + MonSession *get_random_osd_session(OSDMap *osdmap) { + // ok, this isn't actually random, but close enough. + if (by_osd.empty()) + return 0; + int n = by_osd.rbegin()->first + 1; + int r = rand() % n; + + multimap<int,MonSession*>::iterator p = by_osd.lower_bound(r); + if (p == by_osd.end()) + --p; + + if (!osdmap) { + return p->second; + } + + MonSession *s = NULL; + + multimap<int,MonSession*>::iterator b = p, f = p; + bool backward = true, forward = true; + while (backward || forward) { + if (backward) { + if (osdmap->is_up(b->first) && + osdmap->get_addrs(b->first) == b->second->con->get_peer_addrs()) { + s = b->second; + break; + } + if (b != by_osd.begin()) + --b; + else + backward = false; + } + + forward = (f != by_osd.end()); + if (forward) { + if (osdmap->is_up(f->first)) { + s = f->second; + break; + } + ++f; + } + } + + return s; + } + + void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime, bool incremental_onetime) { + Subscription *sub = 0; + if (s->sub_map.count(what)) { + sub = s->sub_map[what]; + } else { + sub = new Subscription(s, what); + s->sub_map[what] = sub; + + if (!subs.count(what)) + subs[what] = new xlist<Subscription*>; + subs[what]->push_back(&sub->type_item); + } + sub->next = start; + sub->onetime = onetime; + sub->incremental_onetime = onetime && incremental_onetime; + } + + void remove_sub(Subscription *sub) { + sub->session->sub_map.erase(sub->type); + sub->type_item.remove_myself(); + delete sub; + } +}; + +inline ostream& operator<<(ostream& out, const MonSession& s) +{ + out << "MonSession(" << s.name << " " << s.addrs + << " is " << (s.closed ? "closed" : "open") + << " " << s.caps + << ", features 0x" << std::hex << s.con_features << std::dec + << " (" << ceph_release_name(ceph_release_from_features(s.con_features)) + << "))"; + return out; +} + +#endif |