summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/crimson/osd
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/osd')
-rw-r--r--src/crimson/osd/CMakeLists.txt10
-rw-r--r--src/crimson/osd/chained_dispatchers.cc72
-rw-r--r--src/crimson/osd/chained_dispatchers.h32
-rw-r--r--src/crimson/osd/heartbeat.cc415
-rw-r--r--src/crimson/osd/heartbeat.h121
-rw-r--r--src/crimson/osd/main.cc89
-rw-r--r--src/crimson/osd/osd.cc658
-rw-r--r--src/crimson/osd/osd.h125
-rw-r--r--src/crimson/osd/osd_meta.cc80
-rw-r--r--src/crimson/osd/osd_meta.h53
-rw-r--r--src/crimson/osd/osdmap_service.h19
-rw-r--r--src/crimson/osd/pg.cc6
-rw-r--r--src/crimson/osd/pg.h21
-rw-r--r--src/crimson/osd/pg_meta.cc104
-rw-r--r--src/crimson/osd/pg_meta.h22
-rw-r--r--src/crimson/osd/state.h71
16 files changed, 1898 insertions, 0 deletions
diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt
new file mode 100644
index 00000000..e86a11b5
--- /dev/null
+++ b/src/crimson/osd/CMakeLists.txt
@@ -0,0 +1,10 @@
+add_executable(crimson-osd
+ chained_dispatchers.cc
+ heartbeat.cc
+ main.cc
+ osd.cc
+ osd_meta.cc
+ pg.cc
+ pg_meta.cc)
+target_link_libraries(crimson-osd
+ crimson-common crimson-os crimson)
diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc
new file mode 100644
index 00000000..da4aa269
--- /dev/null
+++ b/src/crimson/osd/chained_dispatchers.cc
@@ -0,0 +1,72 @@
+#include "chained_dispatchers.h"
+#include "crimson/net/Connection.h"
+
+
+seastar::future<>
+ChainedDispatchers::ms_dispatch(ceph::net::ConnectionRef conn,
+ MessageRef m) {
+ return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) {
+ return dispatcher->ms_dispatch(conn, m);
+ });
+}
+
+seastar::future<>
+ChainedDispatchers::ms_handle_accept(ceph::net::ConnectionRef conn) {
+ return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
+ return dispatcher->ms_handle_accept(conn);
+ });
+}
+
+seastar::future<>
+ChainedDispatchers::ms_handle_connect(ceph::net::ConnectionRef conn) {
+ return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
+ return dispatcher->ms_handle_connect(conn);
+ });
+}
+
+seastar::future<>
+ChainedDispatchers::ms_handle_reset(ceph::net::ConnectionRef conn) {
+ return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
+ return dispatcher->ms_handle_reset(conn);
+ });
+}
+
+seastar::future<>
+ChainedDispatchers::ms_handle_remote_reset(ceph::net::ConnectionRef conn) {
+ return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
+ return dispatcher->ms_handle_remote_reset(conn);
+ });
+}
+
+seastar::future<std::unique_ptr<AuthAuthorizer>>
+ChainedDispatchers::ms_get_authorizer(peer_type_t peer_type)
+{
+ // since dispatcher returns a nullptr if it does not have the authorizer,
+ // let's use the chain-of-responsibility pattern here.
+ struct Params {
+ peer_type_t peer_type;
+ std::deque<Dispatcher*>::iterator first, last;
+ } params = {peer_type, dispatchers.begin(), dispatchers.end()};
+ return seastar::do_with(Params{params}, [this] (Params& params) {
+ using result_t = std::unique_ptr<AuthAuthorizer>;
+ return seastar::repeat_until_value([&] () {
+ auto& first = params.first;
+ if (first == params.last) {
+ // just give up
+ return seastar::make_ready_future<std::optional<result_t>>(result_t{});
+ } else {
+ return (*first)->ms_get_authorizer(params.peer_type)
+ .then([&] (auto&& auth)-> std::optional<result_t> {
+ if (auth) {
+ // hooray!
+ return std::move(auth);
+ } else {
+ // try next one
+ ++first;
+ return {};
+ }
+ });
+ }
+ });
+ });
+}
diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h
new file mode 100644
index 00000000..c3040836
--- /dev/null
+++ b/src/crimson/osd/chained_dispatchers.h
@@ -0,0 +1,32 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <deque>
+#include "crimson/net/Dispatcher.h"
+
+// in existing Messenger, dispatchers are put into a chain as described by
+// chain-of-responsibility pattern. we could do the same to stop processing
+// the message once any of the dispatchers claims this message, and prevent
+// other dispatchers from reading it. but this change is more involved as
+// it requires changing the ms_ methods to return a bool. so as an intermediate
+// solution, we are using an observer dispatcher to notify all the interested
+// or unintersted parties.
+class ChainedDispatchers : public ceph::net::Dispatcher {
+ std::deque<Dispatcher*> dispatchers;
+public:
+ void push_front(Dispatcher* dispatcher) {
+ dispatchers.push_front(dispatcher);
+ }
+ void push_back(Dispatcher* dispatcher) {
+ dispatchers.push_back(dispatcher);
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override;
+ seastar::future<> ms_handle_accept(ceph::net::ConnectionRef conn) override;
+ seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override;
+ seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
+ seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override;
+ seastar::future<std::unique_ptr<AuthAuthorizer>>
+ ms_get_authorizer(peer_type_t peer_type) override;
+};
diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc
new file mode 100644
index 00000000..6dfefb3b
--- /dev/null
+++ b/src/crimson/osd/heartbeat.cc
@@ -0,0 +1,415 @@
+#include "heartbeat.h"
+
+#include <boost/range/join.hpp>
+
+#include "messages/MOSDPing.h"
+#include "messages/MOSDFailure.h"
+
+#include "crimson/common/config_proxy.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/osd/osdmap_service.h"
+#include "crimson/mon/MonClient.h"
+
+#include "osd/OSDMap.h"
+
+using ceph::common::local_conf;
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+
+ template<typename Message, typename... Args>
+ Ref<Message> make_message(Args&&... args)
+ {
+ return {new Message{std::forward<Args>(args)...}, false};
+ }
+}
+
+Heartbeat::Heartbeat(int whoami,
+ uint32_t nonce,
+ const OSDMapService& service,
+ ceph::mon::Client& monc)
+ : whoami{whoami},
+ nonce{nonce},
+ service{service},
+ monc{monc},
+ timer{[this] {send_heartbeats();}}
+{}
+
+seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
+ entity_addrvec_t back_addrs)
+{
+ logger().info("heartbeat: start");
+ // i only care about the address, so any unused port would work
+ for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
+ addr.set_port(0);
+ }
+ return seastar::when_all_succeed(
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "hb_front",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this, front_addrs] (auto msgr) {
+ front_msgr = msgr;
+ return start_messenger(front_msgr, front_addrs);
+ }),
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "hb_back",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this, back_addrs] (auto msgr) {
+ back_msgr = msgr;
+ return start_messenger(back_msgr, back_addrs);
+ }))
+ .then([this] {
+ timer.arm_periodic(
+ std::chrono::seconds(local_conf()->osd_heartbeat_interval));
+ });
+}
+
+seastar::future<>
+Heartbeat::start_messenger(ceph::net::Messenger* msgr,
+ const entity_addrvec_t& addrs)
+{
+ if (local_conf()->ms_crc_data) {
+ msgr->set_crc_data();
+ }
+ if (local_conf()->ms_crc_header) {
+ msgr->set_crc_header();
+ }
+ return msgr->try_bind(addrs,
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max).then([msgr, this] {
+ return msgr->start(this);
+ });
+}
+
+seastar::future<> Heartbeat::stop()
+{
+ return seastar::when_all_succeed(front_msgr->shutdown(),
+ back_msgr->shutdown());
+}
+
+const entity_addrvec_t& Heartbeat::get_front_addrs() const
+{
+ return front_msgr->get_myaddrs();
+}
+
+const entity_addrvec_t& Heartbeat::get_back_addrs() const
+{
+ return back_msgr->get_myaddrs();
+}
+
+seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+{
+ auto found = peers.find(peer);
+ if (found == peers.end()) {
+ logger().info("add_peer({})", peer);
+ auto osdmap = service.get_map();
+ // TODO: msgr v2
+ return seastar::when_all_succeed(
+ front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
+ CEPH_ENTITY_TYPE_OSD),
+ back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
+ CEPH_ENTITY_TYPE_OSD))
+ .then([this, peer, epoch] (auto xcon_front, auto xcon_back) {
+ PeerInfo info;
+ // sharded-messenger compatible mode
+ info.con_front = xcon_front->release();
+ info.con_back = xcon_back->release();
+ info.epoch = epoch;
+ peers.emplace(peer, std::move(info));
+ });
+ } else {
+ found->second.epoch = epoch;
+ return seastar::now();
+ }
+}
+
+seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
+{
+ osds_t osds;
+ for (auto& peer : peers) {
+ osds.push_back(peer.first);
+ }
+ return seastar::map_reduce(std::move(osds),
+ [this](auto& osd) {
+ auto osdmap = service.get_map();
+ if (!osdmap->is_up(osd)) {
+ return remove_peer(osd).then([] {
+ return seastar::make_ready_future<osd_id_t>(-1);
+ });
+ } else if (peers[osd].epoch < osdmap->get_epoch()) {
+ return seastar::make_ready_future<osd_id_t>(osd);
+ } else {
+ return seastar::make_ready_future<osd_id_t>(-1);
+ }
+ }, osds_t{},
+ [this](osds_t&& extras, osd_id_t extra) {
+ if (extra >= 0) {
+ extras.push_back(extra);
+ }
+ return extras;
+ });
+}
+
+void Heartbeat::add_reporter_peers(int whoami)
+{
+ auto osdmap = service.get_map();
+ // include next and previous up osds to ensure we have a fully-connected set
+ set<int> want;
+ if (auto next = osdmap->get_next_up_osd_after(whoami); next >= 0) {
+ want.insert(next);
+ }
+ if (auto prev = osdmap->get_previous_up_osd_before(whoami); prev >= 0) {
+ want.insert(prev);
+ }
+ // make sure we have at least **min_down** osds coming from different
+ // subtree level (e.g., hosts) for fast failure detection.
+ auto min_down = local_conf().get_val<uint64_t>("mon_osd_min_down_reporters");
+ auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level");
+ osdmap->get_random_up_osds_by_subtree(
+ whoami, subtree, min_down, want, &want);
+ for (auto osd : want) {
+ add_peer(osd, osdmap->get_epoch());
+ }
+}
+
+seastar::future<> Heartbeat::update_peers(int whoami)
+{
+ const auto min_peers = static_cast<size_t>(
+ local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
+ return remove_down_peers().then([=](osds_t&& extra) {
+ add_reporter_peers(whoami);
+ // too many?
+ struct iteration_state {
+ osds_t::const_iterator where;
+ osds_t::const_iterator end;
+ };
+ return seastar::do_with(iteration_state{extra.begin(),extra.end()},
+ [=](iteration_state& s) {
+ return seastar::do_until(
+ [min_peers, &s, this] {
+ return peers.size() < min_peers || s.where == s.end; },
+ [&s, this] {
+ return remove_peer(*s.where); }
+ );
+ });
+ }).then([=] {
+ // or too few?
+ auto osdmap = service.get_map();
+ for (auto next = osdmap->get_next_up_osd_after(whoami);
+ peers.size() < min_peers && next >= 0 && next != whoami;
+ next = osdmap->get_next_up_osd_after(next)) {
+ add_peer(next, osdmap->get_epoch());
+ }
+ return seastar::now();
+ });
+}
+
+seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
+{
+ auto found = peers.find(peer);
+ assert(found != peers.end());
+ logger().info("remove_peer({})", peer);
+ return seastar::when_all_succeed(found->second.con_front->close(),
+ found->second.con_back->close()).then(
+ [this, peer] {
+ peers.erase(peer);
+ return seastar::now();
+ });
+}
+
+seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn,
+ MessageRef m)
+{
+ logger().info("heartbeat: ms_dispatch {} from {}",
+ *m, m->get_source());
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
+ default:
+ return seastar::now();
+ }
+}
+
+seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn,
+ Ref<MOSDPing> m)
+{
+ switch (m->op) {
+ case MOSDPing::PING:
+ return handle_ping(conn, m);
+ case MOSDPing::PING_REPLY:
+ return handle_reply(conn, m);
+ case MOSDPing::YOU_DIED:
+ return handle_you_died();
+ default:
+ return seastar::now();
+ }
+}
+
+seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn,
+ Ref<MOSDPing> m)
+{
+ auto min_message = static_cast<uint32_t>(
+ local_conf()->osd_heartbeat_min_size);
+ auto reply =
+ make_message<MOSDPing>(m->fsid,
+ service.get_map()->get_epoch(),
+ MOSDPing::PING_REPLY,
+ m->stamp,
+ min_message);
+ return conn->send(reply);
+}
+
+seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn,
+ Ref<MOSDPing> m)
+{
+ const osd_id_t from = m->get_source().num();
+ auto found = peers.find(from);
+ if (found == peers.end()) {
+ // stale reply
+ return seastar::now();
+ }
+ auto& peer = found->second;
+ auto ping = peer.ping_history.find(m->stamp);
+ if (ping == peer.ping_history.end()) {
+ // old replies, deprecated by newly sent pings.
+ return seastar::now();
+ }
+ const auto now = clock::now();
+ auto& unacked = ping->second.unacknowledged;
+ if (conn == peer.con_back) {
+ peer.last_rx_back = now;
+ unacked--;
+ } else if (conn == peer.con_front) {
+ peer.last_rx_front = now;
+ unacked--;
+ }
+ if (unacked == 0) {
+ peer.ping_history.erase(peer.ping_history.begin(), ++ping);
+ }
+ if (peer.is_healthy(now)) {
+ // cancel false reports
+ failure_queue.erase(from);
+ if (auto pending = failure_pending.find(from);
+ pending != failure_pending.end()) {
+ return send_still_alive(from, pending->second.addrs);
+ }
+ }
+ return seastar::now();
+}
+
+seastar::future<> Heartbeat::handle_you_died()
+{
+ // TODO: ask for newer osdmap
+ return seastar::now();
+}
+
+seastar::future<> Heartbeat::send_heartbeats()
+{
+ using peers_item_t = typename peers_map_t::value_type;
+ return seastar::parallel_for_each(peers,
+ [this](peers_item_t& item) {
+ const auto now = clock::now();
+ const auto deadline =
+ now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+ auto& [peer, info] = item;
+ info.last_tx = now;
+ if (clock::is_zero(info.first_tx)) {
+ info.first_tx = now;
+ }
+ const utime_t sent_stamp{now};
+ auto [reply, added] = info.ping_history.emplace(sent_stamp,
+ reply_t{deadline, 0});
+ std::vector<ceph::net::ConnectionRef> conns{info.con_front,
+ info.con_back};
+ return seastar::parallel_for_each(std::move(conns),
+ [sent_stamp, &reply=reply->second, this] (auto con) {
+ if (con) {
+ auto min_message = static_cast<uint32_t>(
+ local_conf()->osd_heartbeat_min_size);
+ auto ping = make_message<MOSDPing>(monc.get_fsid(),
+ service.get_map()->get_epoch(),
+ MOSDPing::PING,
+ sent_stamp,
+ min_message);
+ return con->send(ping).then([&reply] {
+ reply.unacknowledged++;
+ return seastar::now();
+ });
+ } else {
+ return seastar::now();
+ }
+ });
+ });
+}
+
+seastar::future<> Heartbeat::send_failures()
+{
+ using failure_item_t = typename failure_queue_t::value_type;
+ return seastar::parallel_for_each(failure_queue,
+ [this](failure_item_t& failure_item) {
+ auto [osd, failed_since] = failure_item;
+ if (failure_pending.count(osd)) {
+ return seastar::now();
+ }
+ auto failed_for = chrono::duration_cast<chrono::seconds>(
+ clock::now() - failed_since).count();
+ auto osdmap = service.get_map();
+ auto failure_report =
+ make_message<MOSDFailure>(monc.get_fsid(),
+ osd,
+ osdmap->get_addrs(osd),
+ static_cast<int>(failed_for),
+ osdmap->get_epoch());
+ failure_pending.emplace(osd, failure_info_t{failed_since,
+ osdmap->get_addrs(osd)});
+ return monc.send_message(failure_report);
+ }).then([this] {
+ failure_queue.clear();
+ return seastar::now();
+ });
+}
+
+seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,
+ const entity_addrvec_t& addrs)
+{
+ auto still_alive = make_message<MOSDFailure>(monc.get_fsid(),
+ osd,
+ addrs,
+ 0,
+ service.get_map()->get_epoch(),
+ MOSDFailure::FLAG_ALIVE);
+ return monc.send_message(still_alive).then([=] {
+ failure_pending.erase(osd);
+ return seastar::now();
+ });
+}
+
+bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const
+{
+ if (ping_history.empty()) {
+ // we haven't sent a ping yet or we have got all replies,
+ // in either way we are safe and healthy for now
+ return false;
+ } else {
+ auto oldest_ping = ping_history.begin();
+ return now > oldest_ping->second.deadline;
+ }
+}
+
+bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
+{
+ if (con_front && clock::is_zero(last_rx_front)) {
+ return false;
+ }
+ if (con_back && clock::is_zero(last_rx_back)) {
+ return false;
+ }
+ // only declare to be healthy until we have received the first
+ // replies from both front/back connections
+ return !is_unhealthy(now);
+}
diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h
new file mode 100644
index 00000000..b5eb0f7c
--- /dev/null
+++ b/src/crimson/osd/heartbeat.h
@@ -0,0 +1,121 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <cstdint>
+#include <seastar/core/future.hh>
+#include "common/ceph_time.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Fwd.h"
+
+class MOSDPing;
+class OSDMapService;
+
+namespace ceph::mon {
+ class Client;
+}
+
+template<typename Message> using Ref = boost::intrusive_ptr<Message>;
+
+class Heartbeat : public ceph::net::Dispatcher {
+public:
+ using osd_id_t = int;
+
+ Heartbeat(int whoami,
+ uint32_t nonce,
+ const OSDMapService& service,
+ ceph::mon::Client& monc);
+
+ seastar::future<> start(entity_addrvec_t front,
+ entity_addrvec_t back);
+ seastar::future<> stop();
+
+ seastar::future<> add_peer(osd_id_t peer, epoch_t epoch);
+ seastar::future<> update_peers(int whoami);
+ seastar::future<> remove_peer(osd_id_t peer);
+
+ seastar::future<> send_heartbeats();
+ seastar::future<> send_failures();
+
+ const entity_addrvec_t& get_front_addrs() const;
+ const entity_addrvec_t& get_back_addrs() const;
+
+ // Dispatcher methods
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+ MessageRef m) override;
+
+private:
+ seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn,
+ Ref<MOSDPing> m);
+ seastar::future<> handle_ping(ceph::net::ConnectionRef conn,
+ Ref<MOSDPing> m);
+ seastar::future<> handle_reply(ceph::net::ConnectionRef conn,
+ Ref<MOSDPing> m);
+ seastar::future<> handle_you_died();
+
+ seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&);
+
+ using osds_t = std::vector<osd_id_t>;
+ /// remove down OSDs
+ /// @return peers not needed in this epoch
+ seastar::future<osds_t> remove_down_peers();
+ /// add enough reporters for fast failure detection
+ void add_reporter_peers(int whoami);
+
+ seastar::future<> start_messenger(ceph::net::Messenger* msgr,
+ const entity_addrvec_t& addrs);
+private:
+ const int whoami;
+ const uint32_t nonce;
+ ceph::net::Messenger* front_msgr = nullptr;
+ ceph::net::Messenger* back_msgr = nullptr;
+ const OSDMapService& service;
+ ceph::mon::Client& monc;
+
+ seastar::timer<seastar::lowres_clock> timer;
+ // use real_clock so it can be converted to utime_t
+ using clock = ceph::coarse_real_clock;
+
+ struct reply_t {
+ clock::time_point deadline;
+ // one sent over front conn, another sent over back conn
+ uint8_t unacknowledged = 0;
+ };
+ struct PeerInfo {
+ /// peer connection (front)
+ ceph::net::ConnectionRef con_front;
+ /// peer connection (back)
+ ceph::net::ConnectionRef con_back;
+ /// time we sent our first ping request
+ clock::time_point first_tx;
+ /// last time we sent a ping request
+ clock::time_point last_tx;
+ /// last time we got a ping reply on the front side
+ clock::time_point last_rx_front;
+ /// last time we got a ping reply on the back side
+ clock::time_point last_rx_back;
+ /// most recent epoch we wanted this peer
+ epoch_t epoch;
+ /// history of inflight pings, arranging by timestamp we sent
+ std::map<utime_t, reply_t> ping_history;
+
+ bool is_unhealthy(clock::time_point now) const;
+ bool is_healthy(clock::time_point now) const;
+ };
+ using peers_map_t = std::map<osd_id_t, PeerInfo>;
+ peers_map_t peers;
+
+ // osds which are considered failed
+ // osd_id => when was the last time that both front and back pings were acked
+ // use for calculating how long the OSD has been unresponsive
+ using failure_queue_t = std::map<osd_id_t, clock::time_point>;
+ failure_queue_t failure_queue;
+ struct failure_info_t {
+ clock::time_point failed_since;
+ entity_addrvec_t addrs;
+ };
+ // osds we've reported to monior as failed ones, but they are not marked down
+ // yet
+ std::map<osd_id_t, failure_info_t> failure_pending;
+};
diff --git a/src/crimson/osd/main.cc b/src/crimson/osd/main.cc
new file mode 100644
index 00000000..04a0c97d
--- /dev/null
+++ b/src/crimson/osd/main.cc
@@ -0,0 +1,89 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <iostream>
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/thread.hh>
+
+#include "common/ceph_argparse.h"
+#include "crimson/common/config_proxy.h"
+
+#include "osd.h"
+
+using config_t = ceph::common::ConfigProxy;
+
+void usage(const char* prog) {
+ std::cout << "usage: " << prog << " -i <ID>" << std::endl;
+ generic_server_usage();
+}
+
+int main(int argc, char* argv[])
+{
+ std::vector<const char*> args{argv + 1, argv + argc};
+ if (ceph_argparse_need_usage(args)) {
+ usage(argv[0]);
+ return EXIT_SUCCESS;
+ }
+ std::string cluster;
+ std::string conf_file_list;
+ // ceph_argparse_early_args() could _exit(), while local_conf() won't ready
+ // until it's started. so do the boilerplate-settings parsing here.
+ auto init_params = ceph_argparse_early_args(args,
+ CEPH_ENTITY_TYPE_OSD,
+ &cluster,
+ &conf_file_list);
+ seastar::app_template app;
+ app.add_options()
+ ("mkfs", "create a [new] data directory");
+ seastar::sharded<OSD> osd;
+
+ using ceph::common::sharded_conf;
+ using ceph::common::sharded_perf_coll;
+ using ceph::common::local_conf;
+
+ args.insert(begin(args), argv[0]);
+ try {
+ return app.run_deprecated(args.size(), const_cast<char**>(args.data()), [&] {
+ auto& config = app.configuration();
+ seastar::engine().at_exit([] {
+ return sharded_conf().stop();
+ });
+ seastar::engine().at_exit([] {
+ return sharded_perf_coll().stop();
+ });
+ seastar::engine().at_exit([&] {
+ return osd.stop();
+ });
+ return sharded_conf().start(init_params.name, cluster).then([] {
+ return sharded_perf_coll().start();
+ }).then([&conf_file_list] {
+ return local_conf().parse_config_files(conf_file_list);
+ }).then([&] {
+ return osd.start_single(std::stoi(local_conf()->name.get_id()),
+ static_cast<uint32_t>(getpid()));
+ }).then([&osd, mkfs = config.count("mkfs")] {
+ if (mkfs) {
+ return osd.invoke_on(0, &OSD::mkfs,
+ local_conf().get_val<uuid_d>("fsid"));
+ } else {
+ return osd.invoke_on(0, &OSD::start);
+ }
+ });
+ });
+ } catch (...) {
+ seastar::fprint(std::cerr, "FATAL: Exception during startup, aborting: %s\n", std::current_exception());
+ return EXIT_FAILURE;
+ }
+}
+
+/*
+ * Local Variables:
+ * compile-command: "make -j4 \
+ * -C ../../../build \
+ * crimson-osd"
+ * End:
+ */
diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc
new file mode 100644
index 00000000..bdb1642e
--- /dev/null
+++ b/src/crimson/osd/osd.cc
@@ -0,0 +1,658 @@
+#include "osd.h"
+
+#include <boost/range/join.hpp>
+#include <boost/smart_ptr/make_local_shared.hpp>
+
+#include "common/pick_address.h"
+#include "messages/MOSDBeacon.h"
+#include "messages/MOSDBoot.h"
+#include "messages/MOSDMap.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_object.h"
+#include "crimson/os/cyan_store.h"
+#include "crimson/os/Transaction.h"
+#include "crimson/osd/heartbeat.h"
+#include "crimson/osd/osd_meta.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_meta.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+
+ template<typename Message, typename... Args>
+ Ref<Message> make_message(Args&&... args)
+ {
+ return {new Message{std::forward<Args>(args)...}, false};
+ }
+ static constexpr int TICK_INTERVAL = 1;
+}
+
+using ceph::common::local_conf;
+using ceph::os::CyanStore;
+
+OSD::OSD(int id, uint32_t nonce)
+ : whoami{id},
+ nonce{nonce},
+ beacon_timer{[this] { send_beacon(); }},
+ heartbeat_timer{[this] { update_heartbeat_peers(); }}
+{
+ osdmaps[0] = boost::make_local_shared<OSDMap>();
+}
+
+OSD::~OSD() = default;
+
+namespace {
+// Initial features in new superblock.
+// Features here are also automatically upgraded
+CompatSet get_osd_initial_compat_set()
+{
+ CompatSet::FeatureSet ceph_osd_feature_compat;
+ CompatSet::FeatureSet ceph_osd_feature_ro_compat;
+ CompatSet::FeatureSet ceph_osd_feature_incompat;
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES);
+ return CompatSet(ceph_osd_feature_compat,
+ ceph_osd_feature_ro_compat,
+ ceph_osd_feature_incompat);
+}
+}
+
+seastar::future<> OSD::mkfs(uuid_d cluster_fsid)
+{
+ const auto data_path = local_conf().get_val<std::string>("osd_data");
+ store = std::make_unique<ceph::os::CyanStore>(data_path);
+ uuid_d osd_fsid;
+ osd_fsid.generate_random();
+ return store->mkfs(osd_fsid).then([this] {
+ return store->mount();
+ }).then([cluster_fsid, osd_fsid, this] {
+ superblock.cluster_fsid = cluster_fsid;
+ superblock.osd_fsid = osd_fsid;
+ superblock.whoami = whoami;
+ superblock.compat_features = get_osd_initial_compat_set();
+
+ meta_coll = make_unique<OSDMeta>(
+ store->create_new_collection(coll_t::meta()), store.get());
+ ceph::os::Transaction t;
+ meta_coll->create(t);
+ meta_coll->store_superblock(t, superblock);
+ return store->do_transaction(meta_coll->collection(), std::move(t));
+ }).then([cluster_fsid, this] {
+ store->write_meta("ceph_fsid", cluster_fsid.to_string());
+ store->write_meta("whoami", std::to_string(whoami));
+ return seastar::now();
+ });
+}
+
+namespace {
+ entity_addrvec_t pick_addresses(int what) {
+ entity_addrvec_t addrs;
+ CephContext cct;
+ if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) {
+ throw std::runtime_error("failed to pick address");
+ }
+ // TODO: v2: ::pick_addresses() returns v2 addresses, but crimson-msgr does
+ // not support v2 yet. remove following set_type() once v2 support is ready.
+ for (auto addr : addrs.v) {
+ addr.set_type(addr.TYPE_LEGACY);
+ logger().info("picked address {}", addr);
+ }
+ return addrs;
+ }
+ std::pair<entity_addrvec_t, bool>
+ replace_unknown_addrs(entity_addrvec_t maybe_unknowns,
+ const entity_addrvec_t& knowns) {
+ bool changed = false;
+ auto maybe_replace = [&](entity_addr_t addr) {
+ if (!addr.is_blank_ip()) {
+ return addr;
+ }
+ for (auto& b : knowns.v) {
+ if (addr.get_family() == b.get_family()) {
+ auto a = b;
+ a.set_nonce(addr.get_nonce());
+ a.set_type(addr.get_type());
+ a.set_port(addr.get_port());
+ changed = true;
+ return a;
+ }
+ }
+ throw std::runtime_error("failed to replace unknown address");
+ };
+ entity_addrvec_t replaced;
+ std::transform(maybe_unknowns.v.begin(),
+ maybe_unknowns.v.end(),
+ std::back_inserter(replaced.v),
+ maybe_replace);
+ return {replaced, changed};
+ }
+}
+
+seastar::future<> OSD::start()
+{
+ logger().info("start");
+
+ return seastar::when_all_succeed(
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "cluster",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this] (auto msgr) { cluster_msgr = msgr; }),
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "client",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this] (auto msgr) { public_msgr = msgr; }))
+ .then([this] {
+ monc.reset(new ceph::mon::Client{*public_msgr});
+ heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc});
+
+ for (auto msgr : {cluster_msgr, public_msgr}) {
+ if (local_conf()->ms_crc_data) {
+ msgr->set_crc_data();
+ }
+ if (local_conf()->ms_crc_header) {
+ msgr->set_crc_header();
+ }
+ }
+ dispatchers.push_front(this);
+ dispatchers.push_front(monc.get());
+
+ const auto data_path = local_conf().get_val<std::string>("osd_data");
+ store = std::make_unique<ceph::os::CyanStore>(data_path);
+ return store->mount();
+ }).then([this] {
+ meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
+ store.get());
+ return meta_coll->load_superblock();
+ }).then([this](OSDSuperblock&& sb) {
+ superblock = std::move(sb);
+ return get_map(superblock.current_epoch);
+ }).then([this](cached_map_t&& map) {
+ osdmap = std::move(map);
+ return load_pgs();
+ }).then([this] {
+ return seastar::when_all_succeed(
+ cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .then([this] { return cluster_msgr->start(&dispatchers); }),
+ public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .then([this] { return public_msgr->start(&dispatchers); }));
+ }).then([this] {
+ return monc->start();
+ }).then([this] {
+ monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
+ monc->sub_want("mgrmap", 0, 0);
+ monc->sub_want("osdmap", 0, 0);
+ return monc->renew_subs();
+ }).then([this] {
+ if (auto [addrs, changed] =
+ replace_unknown_addrs(cluster_msgr->get_myaddrs(),
+ public_msgr->get_myaddrs()); changed) {
+ cluster_msgr->set_myaddrs(addrs);
+ }
+ return heartbeat->start(public_msgr->get_myaddrs(),
+ cluster_msgr->get_myaddrs());
+ }).then([this] {
+ return start_boot();
+ });
+}
+
+seastar::future<> OSD::start_boot()
+{
+ state.set_preboot();
+ return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) {
+ return _preboot(oldest, newest);
+ });
+}
+
+seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
+{
+ logger().info("osd.{}: _preboot", whoami);
+ if (osdmap->get_epoch() == 0) {
+ logger().warn("waiting for initial osdmap");
+ } else if (osdmap->is_destroyed(whoami)) {
+ logger().warn("osdmap says I am destroyed");
+ // provide a small margin so we don't livelock seeing if we
+ // un-destroyed ourselves.
+ if (osdmap->get_epoch() > newest - 1) {
+ throw std::runtime_error("i am destroyed");
+ }
+ } else if (osdmap->is_noup(whoami)) {
+ logger().warn("osdmap NOUP flag is set, waiting for it to clear");
+ } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
+ logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
+ } else if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ logger().error("osdmap require_osd_release < luminous; please upgrade to luminous");
+ } else if (false) {
+ // TODO: update mon if current fullness state is different from osdmap
+ } else if (version_t n = local_conf()->osd_map_message_max;
+ osdmap->get_epoch() >= oldest - 1 &&
+ osdmap->get_epoch() + n > newest) {
+ return _send_boot();
+ }
+ // get all the latest maps
+ if (osdmap->get_epoch() + 1 >= oldest) {
+ return osdmap_subscribe(osdmap->get_epoch() + 1, false);
+ } else {
+ return osdmap_subscribe(oldest - 1, true);
+ }
+}
+
+seastar::future<> OSD::_send_boot()
+{
+ state.set_booting();
+
+ logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
+ logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
+ logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
+ auto m = make_message<MOSDBoot>(superblock,
+ osdmap->get_epoch(),
+ osdmap->get_epoch(),
+ heartbeat->get_back_addrs(),
+ heartbeat->get_front_addrs(),
+ cluster_msgr->get_myaddrs(),
+ CEPH_FEATURES_ALL);
+ return monc->send_message(m);
+}
+
+seastar::future<> OSD::stop()
+{
+ // see also OSD::shutdown()
+ state.set_stopping();
+ return gate.close().then([this] {
+ return heartbeat->stop();
+ }).then([this] {
+ return monc->stop();
+ }).then([this] {
+ return public_msgr->shutdown();
+ }).then([this] {
+ return cluster_msgr->shutdown();
+ });
+}
+
+seastar::future<> OSD::load_pgs()
+{
+ return seastar::parallel_for_each(store->list_collections(),
+ [this](auto coll) {
+ spg_t pgid;
+ if (coll.is_pg(&pgid)) {
+ return load_pg(pgid).then([pgid, this](auto&& pg) {
+ logger().info("load_pgs: loaded {}", pgid);
+ pgs.emplace(pgid, std::move(pg));
+ return seastar::now();
+ });
+ } else if (coll.is_temp(&pgid)) {
+ // TODO: remove the collection
+ return seastar::now();
+ } else {
+ logger().warn("ignoring unrecognized collection: {}", coll);
+ return seastar::now();
+ }
+ });
+}
+
+seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
+{
+ using ec_profile_t = map<string,string>;
+ return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
+ return get_map(e);
+ }).then([pgid, this] (auto&& create_map) {
+ if (create_map->have_pg_pool(pgid.pool())) {
+ pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
+ string name = create_map->get_pool_name(pgid.pool());
+ ec_profile_t ec_profile;
+ if (pi.is_erasure()) {
+ ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
+ }
+ return seastar::make_ready_future<pg_pool_t,
+ string,
+ ec_profile_t>(std::move(pi),
+ std::move(name),
+ std::move(ec_profile));
+ } else {
+ // pool was deleted; grab final pg_pool_t off disk.
+ return meta_coll->load_final_pool_info(pgid.pool());
+ }
+ }).then([this](pg_pool_t&& pool, string&& name, ec_profile_t&& ec_profile) {
+ Ref<PG> pg{new PG{std::move(pool),
+ std::move(name),
+ std::move(ec_profile)}};
+ return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+ });
+}
+
+seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
+{
+ logger().info("ms_dispatch {}", *m);
+ if (state.is_stopping()) {
+ return seastar::now();
+ }
+
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_MAP:
+ return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+ default:
+ return seastar::now();
+ }
+}
+
+seastar::future<> OSD::ms_handle_connect(ceph::net::ConnectionRef conn)
+{
+ if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
+ return seastar::now();
+ } else {
+ return seastar::now();
+ }
+}
+
+seastar::future<> OSD::ms_handle_reset(ceph::net::ConnectionRef conn)
+{
+ // TODO: cleanup the session attached to this connection
+ logger().warn("ms_handle_reset");
+ return seastar::now();
+}
+
+seastar::future<> OSD::ms_handle_remote_reset(ceph::net::ConnectionRef conn)
+{
+ logger().warn("ms_handle_remote_reset");
+ return seastar::now();
+}
+
+OSD::cached_map_t OSD::get_map() const
+{
+ return osdmap;
+}
+
+seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e)
+{
+ // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
+ if (auto found = osdmaps.find(e); found) {
+ return seastar::make_ready_future<cached_map_t>(std::move(found));
+ } else {
+ return load_map_bl(e).then([e, this](bufferlist bl) {
+ auto osdmap = std::make_unique<OSDMap>();
+ osdmap->decode(bl);
+ return seastar::make_ready_future<cached_map_t>(
+ osdmaps.insert(e, std::move(osdmap)));
+ });
+ }
+}
+
+void OSD::store_map_bl(ceph::os::Transaction& t,
+ epoch_t e, bufferlist&& bl)
+{
+ meta_coll->store_map(t, e, bl);
+ map_bl_cache.insert(e, std::move(bl));
+}
+
+seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
+{
+ if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
+ return seastar::make_ready_future<bufferlist>(*found);
+ } else {
+ return meta_coll->load_map(e);
+ }
+}
+
+seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
+ epoch_t start, Ref<MOSDMap> m)
+{
+ return seastar::do_for_each(boost::counting_iterator<epoch_t>(start),
+ boost::counting_iterator<epoch_t>(m->get_last() + 1),
+ [&t, m, this](epoch_t e) {
+ if (auto p = m->maps.find(e); p != m->maps.end()) {
+ auto o = std::make_unique<OSDMap>();
+ o->decode(p->second);
+ logger().info("store_maps osdmap.{}", e);
+ store_map_bl(t, e, std::move(std::move(p->second)));
+ osdmaps.insert(e, std::move(o));
+ return seastar::now();
+ } else if (auto p = m->incremental_maps.find(e);
+ p != m->incremental_maps.end()) {
+ OSDMap::Incremental inc;
+ auto i = p->second.cbegin();
+ inc.decode(i);
+ return load_map_bl(e - 1)
+ .then([&t, e, inc=std::move(inc), this](bufferlist bl) {
+ auto o = std::make_unique<OSDMap>();
+ o->decode(bl);
+ o->apply_incremental(inc);
+ bufferlist fbl;
+ o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
+ store_map_bl(t, e, std::move(fbl));
+ osdmaps.insert(e, std::move(o));
+ return seastar::now();
+ });
+ } else {
+ logger().error("MOSDMap lied about what maps it had?");
+ return seastar::now();
+ }
+ });
+}
+
+seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
+{
+ logger().info("{}({})", __func__, epoch);
+ if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+ force_request) {
+ return monc->renew_subs();
+ } else {
+ return seastar::now();
+ }
+}
+
+seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn,
+ Ref<MOSDMap> m)
+{
+ logger().info("handle_osd_map {}", *m);
+ if (m->fsid != superblock.cluster_fsid) {
+ logger().warn("fsid mismatched");
+ return seastar::now();
+ }
+ if (state.is_initializing()) {
+ logger().warn("i am still initializing");
+ return seastar::now();
+ }
+
+ const auto first = m->get_first();
+ const auto last = m->get_last();
+
+ // make sure there is something new, here, before we bother flushing
+ // the queues and such
+ if (last <= superblock.newest_map) {
+ return seastar::now();
+ }
+ // missing some?
+ bool skip_maps = false;
+ epoch_t start = superblock.newest_map + 1;
+ if (first > start) {
+ logger().info("handle_osd_map message skips epochs {}..{}",
+ start, first - 1);
+ if (m->oldest_map <= start) {
+ return osdmap_subscribe(start, false);
+ }
+ // always try to get the full range of maps--as many as we can. this
+ // 1- is good to have
+ // 2- is at present the only way to ensure that we get a *full* map as
+ // the first map!
+ if (m->oldest_map < first) {
+ return osdmap_subscribe(m->oldest_map - 1, true);
+ }
+ skip_maps = true;
+ start = first;
+ }
+
+ return seastar::do_with(ceph::os::Transaction{},
+ [=](auto& t) {
+ return store_maps(t, start, m).then([=, &t] {
+ // even if this map isn't from a mon, we may have satisfied our subscription
+ monc->sub_got("osdmap", last);
+ if (!superblock.oldest_map || skip_maps) {
+ superblock.oldest_map = first;
+ }
+ superblock.newest_map = last;
+ superblock.current_epoch = last;
+
+ // note in the superblock that we were clean thru the prior epoch
+ if (boot_epoch && boot_epoch >= superblock.mounted) {
+ superblock.mounted = boot_epoch;
+ superblock.clean_thru = last;
+ }
+ meta_coll->store_superblock(t, superblock);
+ return store->do_transaction(meta_coll->collection(), std::move(t));
+ });
+ }).then([=] {
+ // TODO: write to superblock and commit the transaction
+ return committed_osd_maps(start, last, m);
+ });
+}
+
+seastar::future<> OSD::committed_osd_maps(version_t first,
+ version_t last,
+ Ref<MOSDMap> m)
+{
+ logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
+ // advance through the new maps
+ return seastar::parallel_for_each(boost::irange(first, last + 1),
+ [this](epoch_t cur) {
+ return get_map(cur).then([this](cached_map_t&& o) {
+ osdmap = std::move(o);
+ if (up_epoch != 0 &&
+ osdmap->is_up(whoami) &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
+ up_epoch = osdmap->get_epoch();
+ if (!boot_epoch) {
+ boot_epoch = osdmap->get_epoch();
+ }
+ }
+ });
+ }).then([m, this] {
+ if (osdmap->is_up(whoami) &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+ bind_epoch < osdmap->get_up_from(whoami)) {
+ if (state.is_booting()) {
+ logger().info("osd.{}: activating...", whoami);
+ state.set_active();
+ beacon_timer.arm_periodic(
+ std::chrono::seconds(local_conf()->osd_beacon_report_interval));
+ heartbeat_timer.arm_periodic(
+ std::chrono::seconds(TICK_INTERVAL));
+ }
+ }
+
+ if (state.is_active()) {
+ logger().info("osd.{}: now active", whoami);
+ if (!osdmap->exists(whoami)) {
+ return shutdown();
+ }
+ if (should_restart()) {
+ return restart();
+ } else {
+ return seastar::now();
+ }
+ } else if (state.is_preboot()) {
+ logger().info("osd.{}: now preboot", whoami);
+
+ if (m->get_source().is_mon()) {
+ return _preboot(m->oldest_map, m->newest_map);
+ } else {
+ logger().info("osd.{}: start_boot", whoami);
+ return start_boot();
+ }
+ } else {
+ logger().info("osd.{}: now {}", whoami, state);
+ // XXX
+ return seastar::now();
+ }
+ });
+}
+
+bool OSD::should_restart() const
+{
+ if (!osdmap->is_up(whoami)) {
+ logger().info("map e {} marked osd.{} down",
+ osdmap->get_epoch(), whoami);
+ return true;
+ } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
+ logger().error("map e {} had wrong client addr ({} != my {})",
+ osdmap->get_epoch(),
+ osdmap->get_addrs(whoami),
+ public_msgr->get_myaddrs());
+ return true;
+ } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
+ logger().error("map e {} had wrong cluster addr ({} != my {})",
+ osdmap->get_epoch(),
+ osdmap->get_cluster_addrs(whoami),
+ cluster_msgr->get_myaddrs());
+ return true;
+ } else {
+ return false;
+ }
+}
+
+seastar::future<> OSD::restart()
+{
+ up_epoch = 0;
+ bind_epoch = osdmap->get_epoch();
+ // TODO: promote to shutdown if being marked down for multiple times
+ // rebind messengers
+ return start_boot();
+}
+
+seastar::future<> OSD::shutdown()
+{
+ // TODO
+ superblock.mounted = boot_epoch;
+ superblock.clean_thru = osdmap->get_epoch();
+ return seastar::now();
+}
+
+seastar::future<> OSD::send_beacon()
+{
+ // FIXME: min lec should be calculated from pg_stat
+ // and should set m->pgs
+ epoch_t min_last_epoch_clean = osdmap->get_epoch();
+ auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
+ min_last_epoch_clean);
+ return monc->send_message(m);
+}
+
+void OSD::update_heartbeat_peers()
+{
+ if (!state.is_active()) {
+ return;
+ }
+ for (auto& pg : pgs) {
+ vector<int> up, acting;
+ osdmap->pg_to_up_acting_osds(pg.first.pgid,
+ &up, nullptr,
+ &acting, nullptr);
+ for (auto osd : boost::join(up, acting)) {
+ if (osd != CRUSH_ITEM_NONE) {
+ heartbeat->add_peer(osd, osdmap->get_epoch());
+ }
+ }
+ }
+ heartbeat->update_peers(whoami);
+}
diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h
new file mode 100644
index 00000000..c5aff5e2
--- /dev/null
+++ b/src/crimson/osd/osd.h
@@ -0,0 +1,125 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/timer.hh>
+
+#include "crimson/common/simple_lru.h"
+#include "crimson/common/shared_lru.h"
+#include "crimson/mon/MonClient.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/osd/chained_dispatchers.h"
+#include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/state.h"
+
+#include "osd/OSDMap.h"
+
+class MOSDMap;
+class OSDMap;
+class OSDMeta;
+class PG;
+class Heartbeat;
+
+namespace ceph::net {
+ class Messenger;
+}
+
+namespace ceph::os {
+ class CyanStore;
+ struct Collection;
+ class Transaction;
+}
+
+template<typename T> using Ref = boost::intrusive_ptr<T>;
+
+class OSD : public ceph::net::Dispatcher,
+ private OSDMapService {
+ seastar::gate gate;
+ const int whoami;
+ const uint32_t nonce;
+ seastar::timer<seastar::lowres_clock> beacon_timer;
+ // talk with osd
+ ceph::net::Messenger* cluster_msgr = nullptr;
+ // talk with client/mon/mgr
+ ceph::net::Messenger* public_msgr = nullptr;
+ ChainedDispatchers dispatchers;
+ std::unique_ptr<ceph::mon::Client> monc;
+
+ std::unique_ptr<Heartbeat> heartbeat;
+ seastar::timer<seastar::lowres_clock> heartbeat_timer;
+
+ SharedLRU<epoch_t, OSDMap> osdmaps;
+ SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
+ cached_map_t osdmap;
+ // TODO: use a wrapper for ObjectStore
+ std::unique_ptr<ceph::os::CyanStore> store;
+ std::unique_ptr<OSDMeta> meta_coll;
+
+ std::unordered_map<spg_t, Ref<PG>> pgs;
+ OSDState state;
+
+ /// _first_ epoch we were marked up (after this process started)
+ epoch_t boot_epoch = 0;
+ /// _most_recent_ epoch we were marked up
+ epoch_t up_epoch = 0;
+ //< epoch we last did a bind to new ip:ports
+ epoch_t bind_epoch = 0;
+ //< since when there is no more pending pg creates from mon
+ epoch_t last_pg_create_epoch = 0;
+
+ OSDSuperblock superblock;
+
+ // Dispatcher methods
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override;
+ seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override;
+ seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
+ seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override;
+
+public:
+ OSD(int id, uint32_t nonce);
+ ~OSD() override;
+
+ seastar::future<> mkfs(uuid_d fsid);
+
+ seastar::future<> start();
+ seastar::future<> stop();
+
+private:
+ seastar::future<> start_boot();
+ seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap);
+ seastar::future<> _send_boot();
+
+ seastar::future<Ref<PG>> load_pg(spg_t pgid);
+ seastar::future<> load_pgs();
+
+ // OSDMapService methods
+ seastar::future<cached_map_t> get_map(epoch_t e) override;
+ cached_map_t get_map() const override;
+
+ seastar::future<bufferlist> load_map_bl(epoch_t e);
+ void store_map_bl(ceph::os::Transaction& t,
+ epoch_t e, bufferlist&& bl);
+ seastar::future<> store_maps(ceph::os::Transaction& t,
+ epoch_t start, Ref<MOSDMap> m);
+ seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
+
+ void write_superblock(ceph::os::Transaction& t);
+ seastar::future<> read_superblock();
+
+ seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn,
+ Ref<MOSDMap> m);
+ seastar::future<> committed_osd_maps(version_t first,
+ version_t last,
+ Ref<MOSDMap> m);
+ bool should_restart() const;
+ seastar::future<> restart();
+ seastar::future<> shutdown();
+
+ seastar::future<> send_beacon();
+ void update_heartbeat_peers();
+};
diff --git a/src/crimson/osd/osd_meta.cc b/src/crimson/osd/osd_meta.cc
new file mode 100644
index 00000000..6eb225fe
--- /dev/null
+++ b/src/crimson/osd/osd_meta.cc
@@ -0,0 +1,80 @@
+#include "osd_meta.h"
+
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_store.h"
+#include "crimson/os/Transaction.h"
+
+void OSDMeta::create(ceph::os::Transaction& t)
+{
+ t.create_collection(coll->cid, 0);
+}
+
+void OSDMeta::store_map(ceph::os::Transaction& t,
+ epoch_t e, const bufferlist& m)
+{
+ t.write(coll->cid, osdmap_oid(e), 0, m.length(), m);
+}
+
+seastar::future<bufferlist> OSDMeta::load_map(epoch_t e)
+{
+ return store->read(coll,
+ osdmap_oid(e), 0, 0,
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+}
+
+void OSDMeta::store_superblock(ceph::os::Transaction& t,
+ const OSDSuperblock& superblock)
+{
+ bufferlist bl;
+ encode(superblock, bl);
+ t.write(coll->cid, superblock_oid(), 0, bl.length(), bl);
+}
+
+seastar::future<OSDSuperblock> OSDMeta::load_superblock()
+{
+ return store->read(coll, superblock_oid(), 0, 0)
+ .then([this] (bufferlist&& bl) {
+ auto p = bl.cbegin();
+ OSDSuperblock superblock;
+ decode(superblock, p);
+ return seastar::make_ready_future<OSDSuperblock>(std::move(superblock));
+ });
+}
+
+seastar::future<pg_pool_t,
+ std::string,
+ OSDMeta::ec_profile_t>
+OSDMeta::load_final_pool_info(int64_t pool) {
+ return store->read(coll, final_pool_info_oid(pool),
+ 0, 0).then([this] (bufferlist&& bl) {
+ auto p = bl.cbegin();
+ pg_pool_t pi;
+ string name;
+ ec_profile_t ec_profile;
+ decode(pi, p);
+ decode(name, p);
+ decode(ec_profile, p);
+ return seastar::make_ready_future<pg_pool_t,
+ string,
+ ec_profile_t>(std::move(pi),
+ std::move(name),
+ std::move(ec_profile));
+ });
+}
+
+ghobject_t OSDMeta::osdmap_oid(epoch_t epoch)
+{
+ string name = fmt::format("osdmap.{}", epoch);
+ return ghobject_t(hobject_t(sobject_t(object_t(name), 0)));
+}
+
+ghobject_t OSDMeta::final_pool_info_oid(int64_t pool)
+{
+ string name = fmt::format("final_pool_{}", pool);
+ return ghobject_t(hobject_t(sobject_t(object_t(name), CEPH_NOSNAP)));
+}
+
+ghobject_t OSDMeta::superblock_oid()
+{
+ return ghobject_t(hobject_t(sobject_t(object_t("osd_superblock"), 0)));
+}
diff --git a/src/crimson/osd/osd_meta.h b/src/crimson/osd/osd_meta.h
new file mode 100644
index 00000000..936d9548
--- /dev/null
+++ b/src/crimson/osd/osd_meta.h
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <string>
+#include <seastar/core/future.hh>
+#include "osd/osd_types.h"
+
+namespace ceph::os {
+ class CyanStore;
+ class Collection;
+ class Transaction;
+}
+
+/// metadata shared across PGs, or put in another way,
+/// metadata not specific to certain PGs.
+class OSDMeta {
+ template<typename T> using Ref = boost::intrusive_ptr<T>;
+
+ ceph::os::CyanStore* store;
+ Ref<ceph::os::Collection> coll;
+
+public:
+ OSDMeta(Ref<ceph::os::Collection> coll,
+ ceph::os::CyanStore* store)
+ : store{store}, coll{coll}
+ {}
+
+
+ auto collection() {
+ return coll;
+ }
+ void create(ceph::os::Transaction& t);
+
+ void store_map(ceph::os::Transaction& t,
+ epoch_t e, const bufferlist& m);
+ seastar::future<bufferlist> load_map(epoch_t e);
+
+ void store_superblock(ceph::os::Transaction& t,
+ const OSDSuperblock& sb);
+ seastar::future<OSDSuperblock> load_superblock();
+
+ using ec_profile_t = std::map<std::string, std::string>;
+ seastar::future<pg_pool_t,
+ std::string,
+ ec_profile_t> load_final_pool_info(int64_t pool);
+private:
+ static ghobject_t osdmap_oid(epoch_t epoch);
+ static ghobject_t final_pool_info_oid(int64_t pool);
+ static ghobject_t superblock_oid();
+};
diff --git a/src/crimson/osd/osdmap_service.h b/src/crimson/osd/osdmap_service.h
new file mode 100644
index 00000000..0a3aaed3
--- /dev/null
+++ b/src/crimson/osd/osdmap_service.h
@@ -0,0 +1,19 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/smart_ptr/local_shared_ptr.hpp>
+
+#include "include/types.h"
+
+class OSDMap;
+
+class OSDMapService {
+public:
+ using cached_map_t = boost::local_shared_ptr<OSDMap>;
+ virtual ~OSDMapService() = default;
+ virtual seastar::future<cached_map_t> get_map(epoch_t e) = 0;
+ /// get the latest map
+ virtual cached_map_t get_map() const = 0;
+};
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc
new file mode 100644
index 00000000..bc7b8c2d
--- /dev/null
+++ b/src/crimson/osd/pg.cc
@@ -0,0 +1,6 @@
+#include "pg.h"
+
+PG::PG(pg_pool_t&& pool, std::string&& name, ec_profile_t&& ec_profile)
+{
+ // TODO
+}
diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h
new file mode 100644
index 00000000..4bb672f4
--- /dev/null
+++ b/src/crimson/osd/pg.h
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/future.hh>
+
+#include "osd/osd_types.h"
+
+template<typename T> using Ref = boost::intrusive_ptr<T>;
+
+class PG : public boost::intrusive_ref_counter<
+ PG,
+ boost::thread_unsafe_counter>
+{
+ using ec_profile_t = std::map<std::string,std::string>;
+public:
+ PG(pg_pool_t&& pool, std::string&& name, ec_profile_t&& ec_profile);
+};
diff --git a/src/crimson/osd/pg_meta.cc b/src/crimson/osd/pg_meta.cc
new file mode 100644
index 00000000..2098e50a
--- /dev/null
+++ b/src/crimson/osd/pg_meta.cc
@@ -0,0 +1,104 @@
+#include "pg_meta.h"
+
+#include <string_view>
+
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_store.h"
+
+// prefix pgmeta_oid keys with _ so that PGLog::read_log_and_missing() can
+// easily skip them
+
+static const string_view infover_key = "_infover"sv;
+static const string_view info_key = "_info"sv;
+static const string_view biginfo_key = "_biginfo"sv;
+static const string_view epoch_key = "_epoch"sv;
+static const string_view fastinfo_key = "_fastinfo"sv;
+
+using ceph::os::CyanStore;
+
+PGMeta::PGMeta(CyanStore* store, spg_t pgid)
+ : store{store},
+ pgid{pgid}
+{}
+
+namespace {
+ template<typename T>
+ std::optional<T> find_value(const CyanStore::omap_values_t& values,
+ string_view key)
+ {
+ auto found = values.find(key);
+ if (found == values.end()) {
+ return {};
+ }
+ auto p = found->second.cbegin();
+ T value;
+ decode(value, p);
+ return std::make_optional(std::move(value));
+ }
+}
+seastar::future<epoch_t> PGMeta::get_epoch()
+{
+ auto ch = store->open_collection(coll_t{pgid});
+ return store->omap_get_values(ch,
+ pgid.make_pgmeta_oid(),
+ {string{infover_key},
+ string{epoch_key}}).then(
+ [](auto&& values) {
+ {
+ // sanity check
+ auto infover = find_value<__u8>(values, infover_key);
+ assert(infover);
+ if (*infover < 10) {
+ throw std::runtime_error("incompatible pg meta");
+ }
+ }
+ {
+ auto epoch = find_value<epoch_t>(values, epoch_key);
+ assert(epoch);
+ return seastar::make_ready_future<epoch_t>(*epoch);
+ }
+ });
+}
+
+seastar::future<pg_info_t, PastIntervals> PGMeta::load()
+{
+ auto ch = store->open_collection(coll_t{pgid});
+ return store->omap_get_values(ch,
+ pgid.make_pgmeta_oid(),
+ {string{infover_key},
+ string{info_key},
+ string{biginfo_key},
+ string{fastinfo_key}}).then(
+ [this](auto&& values) {
+ {
+ // sanity check
+ auto infover = find_value<__u8>(values, infover_key);
+ assert(infover);
+ if (infover < 10) {
+ throw std::runtime_error("incompatible pg meta");
+ }
+ }
+ pg_info_t info;
+ {
+ auto found = find_value<pg_info_t>(values, info_key);
+ assert(found);
+ info = *std::move(found);
+ }
+ PastIntervals past_intervals;
+ {
+ using biginfo_t = std::pair<PastIntervals, decltype(info.purged_snaps)>;
+ auto big_info = find_value<biginfo_t>(values, biginfo_key);
+ assert(big_info);
+ past_intervals = std::move(big_info->first);
+ info.purged_snaps = std::move(big_info->second);
+ }
+ {
+ auto fast_info = find_value<pg_fast_info_t>(values, fastinfo_key);
+ assert(fast_info);
+ fast_info->try_apply_to(&info);
+ }
+ return seastar::make_ready_future<pg_info_t, PastIntervals>(
+ std::move(info),
+ std::move(past_intervals));
+ });
+}
diff --git a/src/crimson/osd/pg_meta.h b/src/crimson/osd/pg_meta.h
new file mode 100644
index 00000000..10f2234a
--- /dev/null
+++ b/src/crimson/osd/pg_meta.h
@@ -0,0 +1,22 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include "osd/osd_types.h"
+
+namespace ceph::os {
+ class CyanStore;
+}
+
+/// PG related metadata
+class PGMeta
+{
+ ceph::os::CyanStore* store;
+ const spg_t pgid;
+public:
+ PGMeta(ceph::os::CyanStore *store, spg_t pgid);
+ seastar::future<epoch_t> get_epoch();
+ seastar::future<pg_info_t, PastIntervals> load();
+};
diff --git a/src/crimson/osd/state.h b/src/crimson/osd/state.h
new file mode 100644
index 00000000..4c445348
--- /dev/null
+++ b/src/crimson/osd/state.h
@@ -0,0 +1,71 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <string_view>
+#include <ostream>
+
+class OSDMap;
+
+class OSDState {
+
+ enum class State {
+ INITIALIZING,
+ PREBOOT,
+ BOOTING,
+ ACTIVE,
+ STOPPING,
+ WAITING_FOR_HEALTHY,
+ };
+
+ State state = State::INITIALIZING;
+
+public:
+ bool is_initializing() const {
+ return state == State::INITIALIZING;
+ }
+ bool is_preboot() const {
+ return state == State::PREBOOT;
+ }
+ bool is_booting() const {
+ return state == State::BOOTING;
+ }
+ bool is_active() const {
+ return state == State::ACTIVE;
+ }
+ bool is_stopping() const {
+ return state == State::STOPPING;
+ }
+ bool is_waiting_for_healthy() const {
+ return state == State::WAITING_FOR_HEALTHY;
+ }
+ void set_preboot() {
+ state = State::PREBOOT;
+ }
+ void set_booting() {
+ state = State::BOOTING;
+ }
+ void set_active() {
+ state = State::ACTIVE;
+ }
+ void set_stopping() {
+ state = State::STOPPING;
+ }
+ std::string_view to_string() const {
+ switch (state) {
+ case State::INITIALIZING: return "initializing";
+ case State::PREBOOT: return "preboot";
+ case State::BOOTING: return "booting";
+ case State::ACTIVE: return "active";
+ case State::STOPPING: return "stopping";
+ case State::WAITING_FOR_HEALTHY: return "waiting_for_healthy";
+ default: return "???";
+ }
+ }
+};
+
+inline std::ostream&
+operator<<(std::ostream& os, const OSDState& s) {
+ return os << s.to_string();
+}