summaryrefslogtreecommitdiffstats
path: root/src/osd/Session.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/osd/Session.h240
1 files changed, 240 insertions, 0 deletions
diff --git a/src/osd/Session.h b/src/osd/Session.h
new file mode 100644
index 000000000..a42d37bfe
--- /dev/null
+++ b/src/osd/Session.h
@@ -0,0 +1,240 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_OSD_SESSION_H
+#define CEPH_OSD_SESSION_H
+
+#include "common/RefCountedObj.h"
+#include "common/ceph_mutex.h"
+#include "global/global_context.h"
+#include "include/spinlock.h"
+#include "OSDCap.h"
+#include "Watch.h"
+#include "OSDMap.h"
+#include "PeeringState.h"
+
+//#define PG_DEBUG_REFS
+
+class PG;
+#ifdef PG_DEBUG_REFS
+#include "common/tracked_int_ptr.hpp"
+typedef TrackedIntPtr<PG> PGRef;
+#else
+typedef boost::intrusive_ptr<PG> PGRef;
+#endif
+
+/*
+ * A Backoff represents one instance of either a PG or an OID
+ * being plugged at the client. It's refcounted and linked from
+ * the PG {pg_oid}_backoffs map and from the client Session
+ * object.
+ *
+ * The Backoff has a lock that protects it's internal fields.
+ *
+ * The PG has a backoff_lock that protects it's maps to Backoffs.
+ * This lock is *inside* of Backoff::lock.
+ *
+ * The Session has a backoff_lock that protects it's map of pg and
+ * oid backoffs. This lock is *inside* the Backoff::lock *and*
+ * PG::backoff_lock.
+ *
+ * That's
+ *
+ * Backoff::lock
+ * PG::backoff_lock
+ * Session::backoff_lock
+ *
+ * When the Session goes away, we move our backoff lists aside,
+ * then we lock each of the Backoffs we
+ * previously referenced and clear the Session* pointer. If the PG
+ * is still linked, we unlink it, too.
+ *
+ * When the PG clears the backoff, it will send an unblock message
+ * if the Session* is still non-null, and unlink the session.
+ *
+ */
+
+struct Backoff : public RefCountedObject {
+ enum {
+ STATE_NEW = 1, ///< backoff in flight to client
+ STATE_ACKED = 2, ///< backoff acked
+ STATE_DELETING = 3 ///< backoff deleted, but un-acked
+ };
+ std::atomic<int> state = {STATE_NEW};
+ spg_t pgid; ///< owning pgid
+ uint64_t id = 0; ///< unique id (within the Session)
+
+ bool is_new() const {
+ return state.load() == STATE_NEW;
+ }
+ bool is_acked() const {
+ return state.load() == STATE_ACKED;
+ }
+ bool is_deleting() const {
+ return state.load() == STATE_DELETING;
+ }
+ const char *get_state_name() const {
+ switch (state.load()) {
+ case STATE_NEW: return "new";
+ case STATE_ACKED: return "acked";
+ case STATE_DELETING: return "deleting";
+ default: return "???";
+ }
+ }
+
+ ceph::mutex lock = ceph::make_mutex("Backoff::lock");
+ // NOTE: the owning PG and session are either
+ // - *both* set, or
+ // - both null (teardown), or
+ // - only session is set (and state == DELETING)
+ PGRef pg; ///< owning pg
+ ceph::ref_t<struct Session> session; ///< owning session
+ hobject_t begin, end; ///< [) range to block, unless ==, then single obj
+
+ friend ostream& operator<<(ostream& out, const Backoff& b) {
+ return out << "Backoff(" << &b << " " << b.pgid << " " << b.id
+ << " " << b.get_state_name()
+ << " [" << b.begin << "," << b.end << ") "
+ << " session " << b.session
+ << " pg " << b.pg << ")";
+ }
+
+private:
+ FRIEND_MAKE_REF(Backoff);
+ Backoff(spg_t pgid, PGRef pg, ceph::ref_t<Session> s,
+ uint64_t i,
+ const hobject_t& b, const hobject_t& e)
+ : RefCountedObject(g_ceph_context),
+ pgid(pgid),
+ id(i),
+ pg(pg),
+ session(std::move(s)),
+ begin(b),
+ end(e) {}
+};
+
+
+
+struct Session : public RefCountedObject {
+ EntityName entity_name;
+ OSDCap caps;
+ ConnectionRef con;
+ entity_addr_t socket_addr;
+ WatchConState wstate;
+
+ ceph::mutex session_dispatch_lock =
+ ceph::make_mutex("Session::session_dispatch_lock");
+ boost::intrusive::list<OpRequest> waiting_on_map;
+
+ ceph::spinlock sent_epoch_lock;
+ epoch_t last_sent_epoch = 0;
+
+ /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
+ ceph::mutex backoff_lock = ceph::make_mutex("Session::backoff_lock");
+ std::atomic<int> backoff_count= {0}; ///< simple count of backoffs
+ std::map<spg_t, std::map<hobject_t, std::set<ceph::ref_t<Backoff>>>> backoffs;
+
+ std::atomic<uint64_t> backoff_seq = {0};
+
+ // for heartbeat connections only
+ int peer = -1;
+ HeartbeatStampsRef stamps;
+
+ entity_addr_t& get_peer_socket_addr() {
+ return socket_addr;
+ }
+
+ void ack_backoff(
+ CephContext *cct,
+ spg_t pgid,
+ uint64_t id,
+ const hobject_t& start,
+ const hobject_t& end);
+
+ ceph::ref_t<Backoff> have_backoff(spg_t pgid, const hobject_t& oid) {
+ if (!backoff_count.load()) {
+ return nullptr;
+ }
+ std::lock_guard l(backoff_lock);
+ ceph_assert(!backoff_count == backoffs.empty());
+ auto i = backoffs.find(pgid);
+ if (i == backoffs.end()) {
+ return nullptr;
+ }
+ auto p = i->second.lower_bound(oid);
+ if (p != i->second.begin() &&
+ (p == i->second.end() || p->first > oid)) {
+ --p;
+ }
+ if (p != i->second.end()) {
+ int r = cmp(oid, p->first);
+ if (r == 0 || r > 0) {
+ for (auto& q : p->second) {
+ if (r == 0 || oid < q->end) {
+ return &(*q);
+ }
+ }
+ }
+ }
+ return nullptr;
+ }
+
+ bool check_backoff(
+ CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m);
+
+ void add_backoff(ceph::ref_t<Backoff> b) {
+ std::lock_guard l(backoff_lock);
+ ceph_assert(!backoff_count == backoffs.empty());
+ backoffs[b->pgid][b->begin].insert(std::move(b));
+ ++backoff_count;
+ }
+
+ // called by PG::release_*_backoffs and PG::clear_backoffs()
+ void rm_backoff(const ceph::ref_t<Backoff>& b) {
+ std::lock_guard l(backoff_lock);
+ ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
+ ceph_assert(b->session == this);
+ auto i = backoffs.find(b->pgid);
+ if (i != backoffs.end()) {
+ // may race with clear_backoffs()
+ auto p = i->second.find(b->begin);
+ if (p != i->second.end()) {
+ auto q = p->second.find(b);
+ if (q != p->second.end()) {
+ p->second.erase(q);
+ --backoff_count;
+ if (p->second.empty()) {
+ i->second.erase(p);
+ if (i->second.empty()) {
+ backoffs.erase(i);
+ }
+ }
+ }
+ }
+ }
+ ceph_assert(!backoff_count == backoffs.empty());
+ }
+ void clear_backoffs();
+
+private:
+ FRIEND_MAKE_REF(Session);
+ explicit Session(CephContext *cct, Connection *con_) :
+ RefCountedObject(cct),
+ con(con_),
+ socket_addr(con_->get_peer_socket_addr()),
+ wstate(cct)
+ {}
+};
+
+#endif