diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/crimson/osd | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/crimson/osd/CMakeLists.txt | 10 | ||||
-rw-r--r-- | src/crimson/osd/chained_dispatchers.cc | 72 | ||||
-rw-r--r-- | src/crimson/osd/chained_dispatchers.h | 32 | ||||
-rw-r--r-- | src/crimson/osd/heartbeat.cc | 415 | ||||
-rw-r--r-- | src/crimson/osd/heartbeat.h | 121 | ||||
-rw-r--r-- | src/crimson/osd/main.cc | 89 | ||||
-rw-r--r-- | src/crimson/osd/osd.cc | 658 | ||||
-rw-r--r-- | src/crimson/osd/osd.h | 125 | ||||
-rw-r--r-- | src/crimson/osd/osd_meta.cc | 80 | ||||
-rw-r--r-- | src/crimson/osd/osd_meta.h | 53 | ||||
-rw-r--r-- | src/crimson/osd/osdmap_service.h | 19 | ||||
-rw-r--r-- | src/crimson/osd/pg.cc | 6 | ||||
-rw-r--r-- | src/crimson/osd/pg.h | 21 | ||||
-rw-r--r-- | src/crimson/osd/pg_meta.cc | 104 | ||||
-rw-r--r-- | src/crimson/osd/pg_meta.h | 22 | ||||
-rw-r--r-- | src/crimson/osd/state.h | 71 |
16 files changed, 1898 insertions, 0 deletions
diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt new file mode 100644 index 00000000..e86a11b5 --- /dev/null +++ b/src/crimson/osd/CMakeLists.txt @@ -0,0 +1,10 @@ +add_executable(crimson-osd + chained_dispatchers.cc + heartbeat.cc + main.cc + osd.cc + osd_meta.cc + pg.cc + pg_meta.cc) +target_link_libraries(crimson-osd + crimson-common crimson-os crimson) diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc new file mode 100644 index 00000000..da4aa269 --- /dev/null +++ b/src/crimson/osd/chained_dispatchers.cc @@ -0,0 +1,72 @@ +#include "chained_dispatchers.h" +#include "crimson/net/Connection.h" + + +seastar::future<> +ChainedDispatchers::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) { + return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) { + return dispatcher->ms_dispatch(conn, m); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_accept(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_accept(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_connect(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_connect(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_reset(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_reset(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_remote_reset(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_remote_reset(conn); + }); +} + +seastar::future<std::unique_ptr<AuthAuthorizer>> +ChainedDispatchers::ms_get_authorizer(peer_type_t peer_type) +{ + // since dispatcher returns a nullptr if it does not have the authorizer, + // let's use the chain-of-responsibility pattern here. + struct Params { + peer_type_t peer_type; + std::deque<Dispatcher*>::iterator first, last; + } params = {peer_type, dispatchers.begin(), dispatchers.end()}; + return seastar::do_with(Params{params}, [this] (Params& params) { + using result_t = std::unique_ptr<AuthAuthorizer>; + return seastar::repeat_until_value([&] () { + auto& first = params.first; + if (first == params.last) { + // just give up + return seastar::make_ready_future<std::optional<result_t>>(result_t{}); + } else { + return (*first)->ms_get_authorizer(params.peer_type) + .then([&] (auto&& auth)-> std::optional<result_t> { + if (auth) { + // hooray! + return std::move(auth); + } else { + // try next one + ++first; + return {}; + } + }); + } + }); + }); +} diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h new file mode 100644 index 00000000..c3040836 --- /dev/null +++ b/src/crimson/osd/chained_dispatchers.h @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <deque> +#include "crimson/net/Dispatcher.h" + +// in existing Messenger, dispatchers are put into a chain as described by +// chain-of-responsibility pattern. we could do the same to stop processing +// the message once any of the dispatchers claims this message, and prevent +// other dispatchers from reading it. but this change is more involved as +// it requires changing the ms_ methods to return a bool. so as an intermediate +// solution, we are using an observer dispatcher to notify all the interested +// or unintersted parties. +class ChainedDispatchers : public ceph::net::Dispatcher { + std::deque<Dispatcher*> dispatchers; +public: + void push_front(Dispatcher* dispatcher) { + dispatchers.push_front(dispatcher); + } + void push_back(Dispatcher* dispatcher) { + dispatchers.push_back(dispatcher); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; + seastar::future<> ms_handle_accept(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; + seastar::future<std::unique_ptr<AuthAuthorizer>> + ms_get_authorizer(peer_type_t peer_type) override; +}; diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc new file mode 100644 index 00000000..6dfefb3b --- /dev/null +++ b/src/crimson/osd/heartbeat.cc @@ -0,0 +1,415 @@ +#include "heartbeat.h" + +#include <boost/range/join.hpp> + +#include "messages/MOSDPing.h" +#include "messages/MOSDFailure.h" + +#include "crimson/common/config_proxy.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/mon/MonClient.h" + +#include "osd/OSDMap.h" + +using ceph::common::local_conf; + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } + + template<typename Message, typename... Args> + Ref<Message> make_message(Args&&... args) + { + return {new Message{std::forward<Args>(args)...}, false}; + } +} + +Heartbeat::Heartbeat(int whoami, + uint32_t nonce, + const OSDMapService& service, + ceph::mon::Client& monc) + : whoami{whoami}, + nonce{nonce}, + service{service}, + monc{monc}, + timer{[this] {send_heartbeats();}} +{} + +seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, + entity_addrvec_t back_addrs) +{ + logger().info("heartbeat: start"); + // i only care about the address, so any unused port would work + for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) { + addr.set_port(0); + } + return seastar::when_all_succeed( + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "hb_front", + nonce, + seastar::engine().cpu_id()) + .then([this, front_addrs] (auto msgr) { + front_msgr = msgr; + return start_messenger(front_msgr, front_addrs); + }), + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "hb_back", + nonce, + seastar::engine().cpu_id()) + .then([this, back_addrs] (auto msgr) { + back_msgr = msgr; + return start_messenger(back_msgr, back_addrs); + })) + .then([this] { + timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_heartbeat_interval)); + }); +} + +seastar::future<> +Heartbeat::start_messenger(ceph::net::Messenger* msgr, + const entity_addrvec_t& addrs) +{ + if (local_conf()->ms_crc_data) { + msgr->set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr->set_crc_header(); + } + return msgr->try_bind(addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max).then([msgr, this] { + return msgr->start(this); + }); +} + +seastar::future<> Heartbeat::stop() +{ + return seastar::when_all_succeed(front_msgr->shutdown(), + back_msgr->shutdown()); +} + +const entity_addrvec_t& Heartbeat::get_front_addrs() const +{ + return front_msgr->get_myaddrs(); +} + +const entity_addrvec_t& Heartbeat::get_back_addrs() const +{ + return back_msgr->get_myaddrs(); +} + +seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) +{ + auto found = peers.find(peer); + if (found == peers.end()) { + logger().info("add_peer({})", peer); + auto osdmap = service.get_map(); + // TODO: msgr v2 + return seastar::when_all_succeed( + front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD), + back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD)) + .then([this, peer, epoch] (auto xcon_front, auto xcon_back) { + PeerInfo info; + // sharded-messenger compatible mode + info.con_front = xcon_front->release(); + info.con_back = xcon_back->release(); + info.epoch = epoch; + peers.emplace(peer, std::move(info)); + }); + } else { + found->second.epoch = epoch; + return seastar::now(); + } +} + +seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers() +{ + osds_t osds; + for (auto& peer : peers) { + osds.push_back(peer.first); + } + return seastar::map_reduce(std::move(osds), + [this](auto& osd) { + auto osdmap = service.get_map(); + if (!osdmap->is_up(osd)) { + return remove_peer(osd).then([] { + return seastar::make_ready_future<osd_id_t>(-1); + }); + } else if (peers[osd].epoch < osdmap->get_epoch()) { + return seastar::make_ready_future<osd_id_t>(osd); + } else { + return seastar::make_ready_future<osd_id_t>(-1); + } + }, osds_t{}, + [this](osds_t&& extras, osd_id_t extra) { + if (extra >= 0) { + extras.push_back(extra); + } + return extras; + }); +} + +void Heartbeat::add_reporter_peers(int whoami) +{ + auto osdmap = service.get_map(); + // include next and previous up osds to ensure we have a fully-connected set + set<int> want; + if (auto next = osdmap->get_next_up_osd_after(whoami); next >= 0) { + want.insert(next); + } + if (auto prev = osdmap->get_previous_up_osd_before(whoami); prev >= 0) { + want.insert(prev); + } + // make sure we have at least **min_down** osds coming from different + // subtree level (e.g., hosts) for fast failure detection. + auto min_down = local_conf().get_val<uint64_t>("mon_osd_min_down_reporters"); + auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level"); + osdmap->get_random_up_osds_by_subtree( + whoami, subtree, min_down, want, &want); + for (auto osd : want) { + add_peer(osd, osdmap->get_epoch()); + } +} + +seastar::future<> Heartbeat::update_peers(int whoami) +{ + const auto min_peers = static_cast<size_t>( + local_conf().get_val<int64_t>("osd_heartbeat_min_peers")); + return remove_down_peers().then([=](osds_t&& extra) { + add_reporter_peers(whoami); + // too many? + struct iteration_state { + osds_t::const_iterator where; + osds_t::const_iterator end; + }; + return seastar::do_with(iteration_state{extra.begin(),extra.end()}, + [=](iteration_state& s) { + return seastar::do_until( + [min_peers, &s, this] { + return peers.size() < min_peers || s.where == s.end; }, + [&s, this] { + return remove_peer(*s.where); } + ); + }); + }).then([=] { + // or too few? + auto osdmap = service.get_map(); + for (auto next = osdmap->get_next_up_osd_after(whoami); + peers.size() < min_peers && next >= 0 && next != whoami; + next = osdmap->get_next_up_osd_after(next)) { + add_peer(next, osdmap->get_epoch()); + } + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::remove_peer(osd_id_t peer) +{ + auto found = peers.find(peer); + assert(found != peers.end()); + logger().info("remove_peer({})", peer); + return seastar::when_all_succeed(found->second.con_front->close(), + found->second.con_back->close()).then( + [this, peer] { + peers.erase(peer); + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) +{ + logger().info("heartbeat: ms_dispatch {} from {}", + *m, m->get_source()); + switch (m->get_type()) { + case CEPH_MSG_PING: + return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m)); + default: + return seastar::now(); + } +} + +seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + switch (m->op) { + case MOSDPing::PING: + return handle_ping(conn, m); + case MOSDPing::PING_REPLY: + return handle_reply(conn, m); + case MOSDPing::YOU_DIED: + return handle_you_died(); + default: + return seastar::now(); + } +} + +seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + auto min_message = static_cast<uint32_t>( + local_conf()->osd_heartbeat_min_size); + auto reply = + make_message<MOSDPing>(m->fsid, + service.get_map()->get_epoch(), + MOSDPing::PING_REPLY, + m->stamp, + min_message); + return conn->send(reply); +} + +seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + const osd_id_t from = m->get_source().num(); + auto found = peers.find(from); + if (found == peers.end()) { + // stale reply + return seastar::now(); + } + auto& peer = found->second; + auto ping = peer.ping_history.find(m->stamp); + if (ping == peer.ping_history.end()) { + // old replies, deprecated by newly sent pings. + return seastar::now(); + } + const auto now = clock::now(); + auto& unacked = ping->second.unacknowledged; + if (conn == peer.con_back) { + peer.last_rx_back = now; + unacked--; + } else if (conn == peer.con_front) { + peer.last_rx_front = now; + unacked--; + } + if (unacked == 0) { + peer.ping_history.erase(peer.ping_history.begin(), ++ping); + } + if (peer.is_healthy(now)) { + // cancel false reports + failure_queue.erase(from); + if (auto pending = failure_pending.find(from); + pending != failure_pending.end()) { + return send_still_alive(from, pending->second.addrs); + } + } + return seastar::now(); +} + +seastar::future<> Heartbeat::handle_you_died() +{ + // TODO: ask for newer osdmap + return seastar::now(); +} + +seastar::future<> Heartbeat::send_heartbeats() +{ + using peers_item_t = typename peers_map_t::value_type; + return seastar::parallel_for_each(peers, + [this](peers_item_t& item) { + const auto now = clock::now(); + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + auto& [peer, info] = item; + info.last_tx = now; + if (clock::is_zero(info.first_tx)) { + info.first_tx = now; + } + const utime_t sent_stamp{now}; + auto [reply, added] = info.ping_history.emplace(sent_stamp, + reply_t{deadline, 0}); + std::vector<ceph::net::ConnectionRef> conns{info.con_front, + info.con_back}; + return seastar::parallel_for_each(std::move(conns), + [sent_stamp, &reply=reply->second, this] (auto con) { + if (con) { + auto min_message = static_cast<uint32_t>( + local_conf()->osd_heartbeat_min_size); + auto ping = make_message<MOSDPing>(monc.get_fsid(), + service.get_map()->get_epoch(), + MOSDPing::PING, + sent_stamp, + min_message); + return con->send(ping).then([&reply] { + reply.unacknowledged++; + return seastar::now(); + }); + } else { + return seastar::now(); + } + }); + }); +} + +seastar::future<> Heartbeat::send_failures() +{ + using failure_item_t = typename failure_queue_t::value_type; + return seastar::parallel_for_each(failure_queue, + [this](failure_item_t& failure_item) { + auto [osd, failed_since] = failure_item; + if (failure_pending.count(osd)) { + return seastar::now(); + } + auto failed_for = chrono::duration_cast<chrono::seconds>( + clock::now() - failed_since).count(); + auto osdmap = service.get_map(); + auto failure_report = + make_message<MOSDFailure>(monc.get_fsid(), + osd, + osdmap->get_addrs(osd), + static_cast<int>(failed_for), + osdmap->get_epoch()); + failure_pending.emplace(osd, failure_info_t{failed_since, + osdmap->get_addrs(osd)}); + return monc.send_message(failure_report); + }).then([this] { + failure_queue.clear(); + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::send_still_alive(osd_id_t osd, + const entity_addrvec_t& addrs) +{ + auto still_alive = make_message<MOSDFailure>(monc.get_fsid(), + osd, + addrs, + 0, + service.get_map()->get_epoch(), + MOSDFailure::FLAG_ALIVE); + return monc.send_message(still_alive).then([=] { + failure_pending.erase(osd); + return seastar::now(); + }); +} + +bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const +{ + if (ping_history.empty()) { + // we haven't sent a ping yet or we have got all replies, + // in either way we are safe and healthy for now + return false; + } else { + auto oldest_ping = ping_history.begin(); + return now > oldest_ping->second.deadline; + } +} + +bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const +{ + if (con_front && clock::is_zero(last_rx_front)) { + return false; + } + if (con_back && clock::is_zero(last_rx_back)) { + return false; + } + // only declare to be healthy until we have received the first + // replies from both front/back connections + return !is_unhealthy(now); +} diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h new file mode 100644 index 00000000..b5eb0f7c --- /dev/null +++ b/src/crimson/osd/heartbeat.h @@ -0,0 +1,121 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <cstdint> +#include <seastar/core/future.hh> +#include "common/ceph_time.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Fwd.h" + +class MOSDPing; +class OSDMapService; + +namespace ceph::mon { + class Client; +} + +template<typename Message> using Ref = boost::intrusive_ptr<Message>; + +class Heartbeat : public ceph::net::Dispatcher { +public: + using osd_id_t = int; + + Heartbeat(int whoami, + uint32_t nonce, + const OSDMapService& service, + ceph::mon::Client& monc); + + seastar::future<> start(entity_addrvec_t front, + entity_addrvec_t back); + seastar::future<> stop(); + + seastar::future<> add_peer(osd_id_t peer, epoch_t epoch); + seastar::future<> update_peers(int whoami); + seastar::future<> remove_peer(osd_id_t peer); + + seastar::future<> send_heartbeats(); + seastar::future<> send_failures(); + + const entity_addrvec_t& get_front_addrs() const; + const entity_addrvec_t& get_back_addrs() const; + + // Dispatcher methods + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) override; + +private: + seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_reply(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_you_died(); + + seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&); + + using osds_t = std::vector<osd_id_t>; + /// remove down OSDs + /// @return peers not needed in this epoch + seastar::future<osds_t> remove_down_peers(); + /// add enough reporters for fast failure detection + void add_reporter_peers(int whoami); + + seastar::future<> start_messenger(ceph::net::Messenger* msgr, + const entity_addrvec_t& addrs); +private: + const int whoami; + const uint32_t nonce; + ceph::net::Messenger* front_msgr = nullptr; + ceph::net::Messenger* back_msgr = nullptr; + const OSDMapService& service; + ceph::mon::Client& monc; + + seastar::timer<seastar::lowres_clock> timer; + // use real_clock so it can be converted to utime_t + using clock = ceph::coarse_real_clock; + + struct reply_t { + clock::time_point deadline; + // one sent over front conn, another sent over back conn + uint8_t unacknowledged = 0; + }; + struct PeerInfo { + /// peer connection (front) + ceph::net::ConnectionRef con_front; + /// peer connection (back) + ceph::net::ConnectionRef con_back; + /// time we sent our first ping request + clock::time_point first_tx; + /// last time we sent a ping request + clock::time_point last_tx; + /// last time we got a ping reply on the front side + clock::time_point last_rx_front; + /// last time we got a ping reply on the back side + clock::time_point last_rx_back; + /// most recent epoch we wanted this peer + epoch_t epoch; + /// history of inflight pings, arranging by timestamp we sent + std::map<utime_t, reply_t> ping_history; + + bool is_unhealthy(clock::time_point now) const; + bool is_healthy(clock::time_point now) const; + }; + using peers_map_t = std::map<osd_id_t, PeerInfo>; + peers_map_t peers; + + // osds which are considered failed + // osd_id => when was the last time that both front and back pings were acked + // use for calculating how long the OSD has been unresponsive + using failure_queue_t = std::map<osd_id_t, clock::time_point>; + failure_queue_t failure_queue; + struct failure_info_t { + clock::time_point failed_since; + entity_addrvec_t addrs; + }; + // osds we've reported to monior as failed ones, but they are not marked down + // yet + std::map<osd_id_t, failure_info_t> failure_pending; +}; diff --git a/src/crimson/osd/main.cc b/src/crimson/osd/main.cc new file mode 100644 index 00000000..04a0c97d --- /dev/null +++ b/src/crimson/osd/main.cc @@ -0,0 +1,89 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include <sys/types.h> +#include <unistd.h> + +#include <iostream> + +#include <seastar/core/app-template.hh> +#include <seastar/core/thread.hh> + +#include "common/ceph_argparse.h" +#include "crimson/common/config_proxy.h" + +#include "osd.h" + +using config_t = ceph::common::ConfigProxy; + +void usage(const char* prog) { + std::cout << "usage: " << prog << " -i <ID>" << std::endl; + generic_server_usage(); +} + +int main(int argc, char* argv[]) +{ + std::vector<const char*> args{argv + 1, argv + argc}; + if (ceph_argparse_need_usage(args)) { + usage(argv[0]); + return EXIT_SUCCESS; + } + std::string cluster; + std::string conf_file_list; + // ceph_argparse_early_args() could _exit(), while local_conf() won't ready + // until it's started. so do the boilerplate-settings parsing here. + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_OSD, + &cluster, + &conf_file_list); + seastar::app_template app; + app.add_options() + ("mkfs", "create a [new] data directory"); + seastar::sharded<OSD> osd; + + using ceph::common::sharded_conf; + using ceph::common::sharded_perf_coll; + using ceph::common::local_conf; + + args.insert(begin(args), argv[0]); + try { + return app.run_deprecated(args.size(), const_cast<char**>(args.data()), [&] { + auto& config = app.configuration(); + seastar::engine().at_exit([] { + return sharded_conf().stop(); + }); + seastar::engine().at_exit([] { + return sharded_perf_coll().stop(); + }); + seastar::engine().at_exit([&] { + return osd.stop(); + }); + return sharded_conf().start(init_params.name, cluster).then([] { + return sharded_perf_coll().start(); + }).then([&conf_file_list] { + return local_conf().parse_config_files(conf_file_list); + }).then([&] { + return osd.start_single(std::stoi(local_conf()->name.get_id()), + static_cast<uint32_t>(getpid())); + }).then([&osd, mkfs = config.count("mkfs")] { + if (mkfs) { + return osd.invoke_on(0, &OSD::mkfs, + local_conf().get_val<uuid_d>("fsid")); + } else { + return osd.invoke_on(0, &OSD::start); + } + }); + }); + } catch (...) { + seastar::fprint(std::cerr, "FATAL: Exception during startup, aborting: %s\n", std::current_exception()); + return EXIT_FAILURE; + } +} + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * crimson-osd" + * End: + */ diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc new file mode 100644 index 00000000..bdb1642e --- /dev/null +++ b/src/crimson/osd/osd.cc @@ -0,0 +1,658 @@ +#include "osd.h" + +#include <boost/range/join.hpp> +#include <boost/smart_ptr/make_local_shared.hpp> + +#include "common/pick_address.h" +#include "messages/MOSDBeacon.h" +#include "messages/MOSDBoot.h" +#include "messages/MOSDMap.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_object.h" +#include "crimson/os/cyan_store.h" +#include "crimson/os/Transaction.h" +#include "crimson/osd/heartbeat.h" +#include "crimson/osd/osd_meta.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_meta.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } + + template<typename Message, typename... Args> + Ref<Message> make_message(Args&&... args) + { + return {new Message{std::forward<Args>(args)...}, false}; + } + static constexpr int TICK_INTERVAL = 1; +} + +using ceph::common::local_conf; +using ceph::os::CyanStore; + +OSD::OSD(int id, uint32_t nonce) + : whoami{id}, + nonce{nonce}, + beacon_timer{[this] { send_beacon(); }}, + heartbeat_timer{[this] { update_heartbeat_peers(); }} +{ + osdmaps[0] = boost::make_local_shared<OSDMap>(); +} + +OSD::~OSD() = default; + +namespace { +// Initial features in new superblock. +// Features here are also automatically upgraded +CompatSet get_osd_initial_compat_set() +{ + CompatSet::FeatureSet ceph_osd_feature_compat; + CompatSet::FeatureSet ceph_osd_feature_ro_compat; + CompatSet::FeatureSet ceph_osd_feature_incompat; + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES); + return CompatSet(ceph_osd_feature_compat, + ceph_osd_feature_ro_compat, + ceph_osd_feature_incompat); +} +} + +seastar::future<> OSD::mkfs(uuid_d cluster_fsid) +{ + const auto data_path = local_conf().get_val<std::string>("osd_data"); + store = std::make_unique<ceph::os::CyanStore>(data_path); + uuid_d osd_fsid; + osd_fsid.generate_random(); + return store->mkfs(osd_fsid).then([this] { + return store->mount(); + }).then([cluster_fsid, osd_fsid, this] { + superblock.cluster_fsid = cluster_fsid; + superblock.osd_fsid = osd_fsid; + superblock.whoami = whoami; + superblock.compat_features = get_osd_initial_compat_set(); + + meta_coll = make_unique<OSDMeta>( + store->create_new_collection(coll_t::meta()), store.get()); + ceph::os::Transaction t; + meta_coll->create(t); + meta_coll->store_superblock(t, superblock); + return store->do_transaction(meta_coll->collection(), std::move(t)); + }).then([cluster_fsid, this] { + store->write_meta("ceph_fsid", cluster_fsid.to_string()); + store->write_meta("whoami", std::to_string(whoami)); + return seastar::now(); + }); +} + +namespace { + entity_addrvec_t pick_addresses(int what) { + entity_addrvec_t addrs; + CephContext cct; + if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) { + throw std::runtime_error("failed to pick address"); + } + // TODO: v2: ::pick_addresses() returns v2 addresses, but crimson-msgr does + // not support v2 yet. remove following set_type() once v2 support is ready. + for (auto addr : addrs.v) { + addr.set_type(addr.TYPE_LEGACY); + logger().info("picked address {}", addr); + } + return addrs; + } + std::pair<entity_addrvec_t, bool> + replace_unknown_addrs(entity_addrvec_t maybe_unknowns, + const entity_addrvec_t& knowns) { + bool changed = false; + auto maybe_replace = [&](entity_addr_t addr) { + if (!addr.is_blank_ip()) { + return addr; + } + for (auto& b : knowns.v) { + if (addr.get_family() == b.get_family()) { + auto a = b; + a.set_nonce(addr.get_nonce()); + a.set_type(addr.get_type()); + a.set_port(addr.get_port()); + changed = true; + return a; + } + } + throw std::runtime_error("failed to replace unknown address"); + }; + entity_addrvec_t replaced; + std::transform(maybe_unknowns.v.begin(), + maybe_unknowns.v.end(), + std::back_inserter(replaced.v), + maybe_replace); + return {replaced, changed}; + } +} + +seastar::future<> OSD::start() +{ + logger().info("start"); + + return seastar::when_all_succeed( + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "cluster", + nonce, + seastar::engine().cpu_id()) + .then([this] (auto msgr) { cluster_msgr = msgr; }), + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "client", + nonce, + seastar::engine().cpu_id()) + .then([this] (auto msgr) { public_msgr = msgr; })) + .then([this] { + monc.reset(new ceph::mon::Client{*public_msgr}); + heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc}); + + for (auto msgr : {cluster_msgr, public_msgr}) { + if (local_conf()->ms_crc_data) { + msgr->set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr->set_crc_header(); + } + } + dispatchers.push_front(this); + dispatchers.push_front(monc.get()); + + const auto data_path = local_conf().get_val<std::string>("osd_data"); + store = std::make_unique<ceph::os::CyanStore>(data_path); + return store->mount(); + }).then([this] { + meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()), + store.get()); + return meta_coll->load_superblock(); + }).then([this](OSDSuperblock&& sb) { + superblock = std::move(sb); + return get_map(superblock.current_epoch); + }).then([this](cached_map_t&& map) { + osdmap = std::move(map); + return load_pgs(); + }).then([this] { + return seastar::when_all_succeed( + cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return cluster_msgr->start(&dispatchers); }), + public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return public_msgr->start(&dispatchers); })); + }).then([this] { + return monc->start(); + }).then([this] { + monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); + monc->sub_want("mgrmap", 0, 0); + monc->sub_want("osdmap", 0, 0); + return monc->renew_subs(); + }).then([this] { + if (auto [addrs, changed] = + replace_unknown_addrs(cluster_msgr->get_myaddrs(), + public_msgr->get_myaddrs()); changed) { + cluster_msgr->set_myaddrs(addrs); + } + return heartbeat->start(public_msgr->get_myaddrs(), + cluster_msgr->get_myaddrs()); + }).then([this] { + return start_boot(); + }); +} + +seastar::future<> OSD::start_boot() +{ + state.set_preboot(); + return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) { + return _preboot(oldest, newest); + }); +} + +seastar::future<> OSD::_preboot(version_t oldest, version_t newest) +{ + logger().info("osd.{}: _preboot", whoami); + if (osdmap->get_epoch() == 0) { + logger().warn("waiting for initial osdmap"); + } else if (osdmap->is_destroyed(whoami)) { + logger().warn("osdmap says I am destroyed"); + // provide a small margin so we don't livelock seeing if we + // un-destroyed ourselves. + if (osdmap->get_epoch() > newest - 1) { + throw std::runtime_error("i am destroyed"); + } + } else if (osdmap->is_noup(whoami)) { + logger().warn("osdmap NOUP flag is set, waiting for it to clear"); + } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { + logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it"); + } else if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) { + logger().error("osdmap require_osd_release < luminous; please upgrade to luminous"); + } else if (false) { + // TODO: update mon if current fullness state is different from osdmap + } else if (version_t n = local_conf()->osd_map_message_max; + osdmap->get_epoch() >= oldest - 1 && + osdmap->get_epoch() + n > newest) { + return _send_boot(); + } + // get all the latest maps + if (osdmap->get_epoch() + 1 >= oldest) { + return osdmap_subscribe(osdmap->get_epoch() + 1, false); + } else { + return osdmap_subscribe(oldest - 1, true); + } +} + +seastar::future<> OSD::_send_boot() +{ + state.set_booting(); + + logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs()); + logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs()); + logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr()); + auto m = make_message<MOSDBoot>(superblock, + osdmap->get_epoch(), + osdmap->get_epoch(), + heartbeat->get_back_addrs(), + heartbeat->get_front_addrs(), + cluster_msgr->get_myaddrs(), + CEPH_FEATURES_ALL); + return monc->send_message(m); +} + +seastar::future<> OSD::stop() +{ + // see also OSD::shutdown() + state.set_stopping(); + return gate.close().then([this] { + return heartbeat->stop(); + }).then([this] { + return monc->stop(); + }).then([this] { + return public_msgr->shutdown(); + }).then([this] { + return cluster_msgr->shutdown(); + }); +} + +seastar::future<> OSD::load_pgs() +{ + return seastar::parallel_for_each(store->list_collections(), + [this](auto coll) { + spg_t pgid; + if (coll.is_pg(&pgid)) { + return load_pg(pgid).then([pgid, this](auto&& pg) { + logger().info("load_pgs: loaded {}", pgid); + pgs.emplace(pgid, std::move(pg)); + return seastar::now(); + }); + } else if (coll.is_temp(&pgid)) { + // TODO: remove the collection + return seastar::now(); + } else { + logger().warn("ignoring unrecognized collection: {}", coll); + return seastar::now(); + } + }); +} + +seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid) +{ + using ec_profile_t = map<string,string>; + return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) { + return get_map(e); + }).then([pgid, this] (auto&& create_map) { + if (create_map->have_pg_pool(pgid.pool())) { + pg_pool_t pi = *create_map->get_pg_pool(pgid.pool()); + string name = create_map->get_pool_name(pgid.pool()); + ec_profile_t ec_profile; + if (pi.is_erasure()) { + ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile); + } + return seastar::make_ready_future<pg_pool_t, + string, + ec_profile_t>(std::move(pi), + std::move(name), + std::move(ec_profile)); + } else { + // pool was deleted; grab final pg_pool_t off disk. + return meta_coll->load_final_pool_info(pgid.pool()); + } + }).then([this](pg_pool_t&& pool, string&& name, ec_profile_t&& ec_profile) { + Ref<PG> pg{new PG{std::move(pool), + std::move(name), + std::move(ec_profile)}}; + return seastar::make_ready_future<Ref<PG>>(std::move(pg)); + }); +} + +seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) +{ + logger().info("ms_dispatch {}", *m); + if (state.is_stopping()) { + return seastar::now(); + } + + switch (m->get_type()) { + case CEPH_MSG_OSD_MAP: + return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m)); + default: + return seastar::now(); + } +} + +seastar::future<> OSD::ms_handle_connect(ceph::net::ConnectionRef conn) +{ + if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) { + return seastar::now(); + } else { + return seastar::now(); + } +} + +seastar::future<> OSD::ms_handle_reset(ceph::net::ConnectionRef conn) +{ + // TODO: cleanup the session attached to this connection + logger().warn("ms_handle_reset"); + return seastar::now(); +} + +seastar::future<> OSD::ms_handle_remote_reset(ceph::net::ConnectionRef conn) +{ + logger().warn("ms_handle_remote_reset"); + return seastar::now(); +} + +OSD::cached_map_t OSD::get_map() const +{ + return osdmap; +} + +seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e) +{ + // TODO: use LRU cache for managing osdmap, fallback to disk if we have to + if (auto found = osdmaps.find(e); found) { + return seastar::make_ready_future<cached_map_t>(std::move(found)); + } else { + return load_map_bl(e).then([e, this](bufferlist bl) { + auto osdmap = std::make_unique<OSDMap>(); + osdmap->decode(bl); + return seastar::make_ready_future<cached_map_t>( + osdmaps.insert(e, std::move(osdmap))); + }); + } +} + +void OSD::store_map_bl(ceph::os::Transaction& t, + epoch_t e, bufferlist&& bl) +{ + meta_coll->store_map(t, e, bl); + map_bl_cache.insert(e, std::move(bl)); +} + +seastar::future<bufferlist> OSD::load_map_bl(epoch_t e) +{ + if (std::optional<bufferlist> found = map_bl_cache.find(e); found) { + return seastar::make_ready_future<bufferlist>(*found); + } else { + return meta_coll->load_map(e); + } +} + +seastar::future<> OSD::store_maps(ceph::os::Transaction& t, + epoch_t start, Ref<MOSDMap> m) +{ + return seastar::do_for_each(boost::counting_iterator<epoch_t>(start), + boost::counting_iterator<epoch_t>(m->get_last() + 1), + [&t, m, this](epoch_t e) { + if (auto p = m->maps.find(e); p != m->maps.end()) { + auto o = std::make_unique<OSDMap>(); + o->decode(p->second); + logger().info("store_maps osdmap.{}", e); + store_map_bl(t, e, std::move(std::move(p->second))); + osdmaps.insert(e, std::move(o)); + return seastar::now(); + } else if (auto p = m->incremental_maps.find(e); + p != m->incremental_maps.end()) { + OSDMap::Incremental inc; + auto i = p->second.cbegin(); + inc.decode(i); + return load_map_bl(e - 1) + .then([&t, e, inc=std::move(inc), this](bufferlist bl) { + auto o = std::make_unique<OSDMap>(); + o->decode(bl); + o->apply_incremental(inc); + bufferlist fbl; + o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED); + store_map_bl(t, e, std::move(fbl)); + osdmaps.insert(e, std::move(o)); + return seastar::now(); + }); + } else { + logger().error("MOSDMap lied about what maps it had?"); + return seastar::now(); + } + }); +} + +seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request) +{ + logger().info("{}({})", __func__, epoch); + if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || + force_request) { + return monc->renew_subs(); + } else { + return seastar::now(); + } +} + +seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn, + Ref<MOSDMap> m) +{ + logger().info("handle_osd_map {}", *m); + if (m->fsid != superblock.cluster_fsid) { + logger().warn("fsid mismatched"); + return seastar::now(); + } + if (state.is_initializing()) { + logger().warn("i am still initializing"); + return seastar::now(); + } + + const auto first = m->get_first(); + const auto last = m->get_last(); + + // make sure there is something new, here, before we bother flushing + // the queues and such + if (last <= superblock.newest_map) { + return seastar::now(); + } + // missing some? + bool skip_maps = false; + epoch_t start = superblock.newest_map + 1; + if (first > start) { + logger().info("handle_osd_map message skips epochs {}..{}", + start, first - 1); + if (m->oldest_map <= start) { + return osdmap_subscribe(start, false); + } + // always try to get the full range of maps--as many as we can. this + // 1- is good to have + // 2- is at present the only way to ensure that we get a *full* map as + // the first map! + if (m->oldest_map < first) { + return osdmap_subscribe(m->oldest_map - 1, true); + } + skip_maps = true; + start = first; + } + + return seastar::do_with(ceph::os::Transaction{}, + [=](auto& t) { + return store_maps(t, start, m).then([=, &t] { + // even if this map isn't from a mon, we may have satisfied our subscription + monc->sub_got("osdmap", last); + if (!superblock.oldest_map || skip_maps) { + superblock.oldest_map = first; + } + superblock.newest_map = last; + superblock.current_epoch = last; + + // note in the superblock that we were clean thru the prior epoch + if (boot_epoch && boot_epoch >= superblock.mounted) { + superblock.mounted = boot_epoch; + superblock.clean_thru = last; + } + meta_coll->store_superblock(t, superblock); + return store->do_transaction(meta_coll->collection(), std::move(t)); + }); + }).then([=] { + // TODO: write to superblock and commit the transaction + return committed_osd_maps(start, last, m); + }); +} + +seastar::future<> OSD::committed_osd_maps(version_t first, + version_t last, + Ref<MOSDMap> m) +{ + logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last); + // advance through the new maps + return seastar::parallel_for_each(boost::irange(first, last + 1), + [this](epoch_t cur) { + return get_map(cur).then([this](cached_map_t&& o) { + osdmap = std::move(o); + if (up_epoch != 0 && + osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { + up_epoch = osdmap->get_epoch(); + if (!boot_epoch) { + boot_epoch = osdmap->get_epoch(); + } + } + }); + }).then([m, this] { + if (osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && + bind_epoch < osdmap->get_up_from(whoami)) { + if (state.is_booting()) { + logger().info("osd.{}: activating...", whoami); + state.set_active(); + beacon_timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_beacon_report_interval)); + heartbeat_timer.arm_periodic( + std::chrono::seconds(TICK_INTERVAL)); + } + } + + if (state.is_active()) { + logger().info("osd.{}: now active", whoami); + if (!osdmap->exists(whoami)) { + return shutdown(); + } + if (should_restart()) { + return restart(); + } else { + return seastar::now(); + } + } else if (state.is_preboot()) { + logger().info("osd.{}: now preboot", whoami); + + if (m->get_source().is_mon()) { + return _preboot(m->oldest_map, m->newest_map); + } else { + logger().info("osd.{}: start_boot", whoami); + return start_boot(); + } + } else { + logger().info("osd.{}: now {}", whoami, state); + // XXX + return seastar::now(); + } + }); +} + +bool OSD::should_restart() const +{ + if (!osdmap->is_up(whoami)) { + logger().info("map e {} marked osd.{} down", + osdmap->get_epoch(), whoami); + return true; + } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) { + logger().error("map e {} had wrong client addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_addrs(whoami), + public_msgr->get_myaddrs()); + return true; + } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { + logger().error("map e {} had wrong cluster addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_cluster_addrs(whoami), + cluster_msgr->get_myaddrs()); + return true; + } else { + return false; + } +} + +seastar::future<> OSD::restart() +{ + up_epoch = 0; + bind_epoch = osdmap->get_epoch(); + // TODO: promote to shutdown if being marked down for multiple times + // rebind messengers + return start_boot(); +} + +seastar::future<> OSD::shutdown() +{ + // TODO + superblock.mounted = boot_epoch; + superblock.clean_thru = osdmap->get_epoch(); + return seastar::now(); +} + +seastar::future<> OSD::send_beacon() +{ + // FIXME: min lec should be calculated from pg_stat + // and should set m->pgs + epoch_t min_last_epoch_clean = osdmap->get_epoch(); + auto m = make_message<MOSDBeacon>(osdmap->get_epoch(), + min_last_epoch_clean); + return monc->send_message(m); +} + +void OSD::update_heartbeat_peers() +{ + if (!state.is_active()) { + return; + } + for (auto& pg : pgs) { + vector<int> up, acting; + osdmap->pg_to_up_acting_osds(pg.first.pgid, + &up, nullptr, + &acting, nullptr); + for (auto osd : boost::join(up, acting)) { + if (osd != CRUSH_ITEM_NONE) { + heartbeat->add_peer(osd, osdmap->get_epoch()); + } + } + } + heartbeat->update_peers(whoami); +} diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h new file mode 100644 index 00000000..c5aff5e2 --- /dev/null +++ b/src/crimson/osd/osd.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <map> +#include <seastar/core/future.hh> +#include <seastar/core/gate.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/timer.hh> + +#include "crimson/common/simple_lru.h" +#include "crimson/common/shared_lru.h" +#include "crimson/mon/MonClient.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/osd/chained_dispatchers.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/osd/state.h" + +#include "osd/OSDMap.h" + +class MOSDMap; +class OSDMap; +class OSDMeta; +class PG; +class Heartbeat; + +namespace ceph::net { + class Messenger; +} + +namespace ceph::os { + class CyanStore; + struct Collection; + class Transaction; +} + +template<typename T> using Ref = boost::intrusive_ptr<T>; + +class OSD : public ceph::net::Dispatcher, + private OSDMapService { + seastar::gate gate; + const int whoami; + const uint32_t nonce; + seastar::timer<seastar::lowres_clock> beacon_timer; + // talk with osd + ceph::net::Messenger* cluster_msgr = nullptr; + // talk with client/mon/mgr + ceph::net::Messenger* public_msgr = nullptr; + ChainedDispatchers dispatchers; + std::unique_ptr<ceph::mon::Client> monc; + + std::unique_ptr<Heartbeat> heartbeat; + seastar::timer<seastar::lowres_clock> heartbeat_timer; + + SharedLRU<epoch_t, OSDMap> osdmaps; + SimpleLRU<epoch_t, bufferlist, false> map_bl_cache; + cached_map_t osdmap; + // TODO: use a wrapper for ObjectStore + std::unique_ptr<ceph::os::CyanStore> store; + std::unique_ptr<OSDMeta> meta_coll; + + std::unordered_map<spg_t, Ref<PG>> pgs; + OSDState state; + + /// _first_ epoch we were marked up (after this process started) + epoch_t boot_epoch = 0; + /// _most_recent_ epoch we were marked up + epoch_t up_epoch = 0; + //< epoch we last did a bind to new ip:ports + epoch_t bind_epoch = 0; + //< since when there is no more pending pg creates from mon + epoch_t last_pg_create_epoch = 0; + + OSDSuperblock superblock; + + // Dispatcher methods + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; + +public: + OSD(int id, uint32_t nonce); + ~OSD() override; + + seastar::future<> mkfs(uuid_d fsid); + + seastar::future<> start(); + seastar::future<> stop(); + +private: + seastar::future<> start_boot(); + seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap); + seastar::future<> _send_boot(); + + seastar::future<Ref<PG>> load_pg(spg_t pgid); + seastar::future<> load_pgs(); + + // OSDMapService methods + seastar::future<cached_map_t> get_map(epoch_t e) override; + cached_map_t get_map() const override; + + seastar::future<bufferlist> load_map_bl(epoch_t e); + void store_map_bl(ceph::os::Transaction& t, + epoch_t e, bufferlist&& bl); + seastar::future<> store_maps(ceph::os::Transaction& t, + epoch_t start, Ref<MOSDMap> m); + seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + + void write_superblock(ceph::os::Transaction& t); + seastar::future<> read_superblock(); + + seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn, + Ref<MOSDMap> m); + seastar::future<> committed_osd_maps(version_t first, + version_t last, + Ref<MOSDMap> m); + bool should_restart() const; + seastar::future<> restart(); + seastar::future<> shutdown(); + + seastar::future<> send_beacon(); + void update_heartbeat_peers(); +}; diff --git a/src/crimson/osd/osd_meta.cc b/src/crimson/osd/osd_meta.cc new file mode 100644 index 00000000..6eb225fe --- /dev/null +++ b/src/crimson/osd/osd_meta.cc @@ -0,0 +1,80 @@ +#include "osd_meta.h" + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_store.h" +#include "crimson/os/Transaction.h" + +void OSDMeta::create(ceph::os::Transaction& t) +{ + t.create_collection(coll->cid, 0); +} + +void OSDMeta::store_map(ceph::os::Transaction& t, + epoch_t e, const bufferlist& m) +{ + t.write(coll->cid, osdmap_oid(e), 0, m.length(), m); +} + +seastar::future<bufferlist> OSDMeta::load_map(epoch_t e) +{ + return store->read(coll, + osdmap_oid(e), 0, 0, + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); +} + +void OSDMeta::store_superblock(ceph::os::Transaction& t, + const OSDSuperblock& superblock) +{ + bufferlist bl; + encode(superblock, bl); + t.write(coll->cid, superblock_oid(), 0, bl.length(), bl); +} + +seastar::future<OSDSuperblock> OSDMeta::load_superblock() +{ + return store->read(coll, superblock_oid(), 0, 0) + .then([this] (bufferlist&& bl) { + auto p = bl.cbegin(); + OSDSuperblock superblock; + decode(superblock, p); + return seastar::make_ready_future<OSDSuperblock>(std::move(superblock)); + }); +} + +seastar::future<pg_pool_t, + std::string, + OSDMeta::ec_profile_t> +OSDMeta::load_final_pool_info(int64_t pool) { + return store->read(coll, final_pool_info_oid(pool), + 0, 0).then([this] (bufferlist&& bl) { + auto p = bl.cbegin(); + pg_pool_t pi; + string name; + ec_profile_t ec_profile; + decode(pi, p); + decode(name, p); + decode(ec_profile, p); + return seastar::make_ready_future<pg_pool_t, + string, + ec_profile_t>(std::move(pi), + std::move(name), + std::move(ec_profile)); + }); +} + +ghobject_t OSDMeta::osdmap_oid(epoch_t epoch) +{ + string name = fmt::format("osdmap.{}", epoch); + return ghobject_t(hobject_t(sobject_t(object_t(name), 0))); +} + +ghobject_t OSDMeta::final_pool_info_oid(int64_t pool) +{ + string name = fmt::format("final_pool_{}", pool); + return ghobject_t(hobject_t(sobject_t(object_t(name), CEPH_NOSNAP))); +} + +ghobject_t OSDMeta::superblock_oid() +{ + return ghobject_t(hobject_t(sobject_t(object_t("osd_superblock"), 0))); +} diff --git a/src/crimson/osd/osd_meta.h b/src/crimson/osd/osd_meta.h new file mode 100644 index 00000000..936d9548 --- /dev/null +++ b/src/crimson/osd/osd_meta.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <map> +#include <string> +#include <seastar/core/future.hh> +#include "osd/osd_types.h" + +namespace ceph::os { + class CyanStore; + class Collection; + class Transaction; +} + +/// metadata shared across PGs, or put in another way, +/// metadata not specific to certain PGs. +class OSDMeta { + template<typename T> using Ref = boost::intrusive_ptr<T>; + + ceph::os::CyanStore* store; + Ref<ceph::os::Collection> coll; + +public: + OSDMeta(Ref<ceph::os::Collection> coll, + ceph::os::CyanStore* store) + : store{store}, coll{coll} + {} + + + auto collection() { + return coll; + } + void create(ceph::os::Transaction& t); + + void store_map(ceph::os::Transaction& t, + epoch_t e, const bufferlist& m); + seastar::future<bufferlist> load_map(epoch_t e); + + void store_superblock(ceph::os::Transaction& t, + const OSDSuperblock& sb); + seastar::future<OSDSuperblock> load_superblock(); + + using ec_profile_t = std::map<std::string, std::string>; + seastar::future<pg_pool_t, + std::string, + ec_profile_t> load_final_pool_info(int64_t pool); +private: + static ghobject_t osdmap_oid(epoch_t epoch); + static ghobject_t final_pool_info_oid(int64_t pool); + static ghobject_t superblock_oid(); +}; diff --git a/src/crimson/osd/osdmap_service.h b/src/crimson/osd/osdmap_service.h new file mode 100644 index 00000000..0a3aaed3 --- /dev/null +++ b/src/crimson/osd/osdmap_service.h @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/smart_ptr/local_shared_ptr.hpp> + +#include "include/types.h" + +class OSDMap; + +class OSDMapService { +public: + using cached_map_t = boost::local_shared_ptr<OSDMap>; + virtual ~OSDMapService() = default; + virtual seastar::future<cached_map_t> get_map(epoch_t e) = 0; + /// get the latest map + virtual cached_map_t get_map() const = 0; +}; diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc new file mode 100644 index 00000000..bc7b8c2d --- /dev/null +++ b/src/crimson/osd/pg.cc @@ -0,0 +1,6 @@ +#include "pg.h" + +PG::PG(pg_pool_t&& pool, std::string&& name, ec_profile_t&& ec_profile) +{ + // TODO +} diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h new file mode 100644 index 00000000..4bb672f4 --- /dev/null +++ b/src/crimson/osd/pg.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include <seastar/core/future.hh> + +#include "osd/osd_types.h" + +template<typename T> using Ref = boost::intrusive_ptr<T>; + +class PG : public boost::intrusive_ref_counter< + PG, + boost::thread_unsafe_counter> +{ + using ec_profile_t = std::map<std::string,std::string>; +public: + PG(pg_pool_t&& pool, std::string&& name, ec_profile_t&& ec_profile); +}; diff --git a/src/crimson/osd/pg_meta.cc b/src/crimson/osd/pg_meta.cc new file mode 100644 index 00000000..2098e50a --- /dev/null +++ b/src/crimson/osd/pg_meta.cc @@ -0,0 +1,104 @@ +#include "pg_meta.h" + +#include <string_view> + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_store.h" + +// prefix pgmeta_oid keys with _ so that PGLog::read_log_and_missing() can +// easily skip them + +static const string_view infover_key = "_infover"sv; +static const string_view info_key = "_info"sv; +static const string_view biginfo_key = "_biginfo"sv; +static const string_view epoch_key = "_epoch"sv; +static const string_view fastinfo_key = "_fastinfo"sv; + +using ceph::os::CyanStore; + +PGMeta::PGMeta(CyanStore* store, spg_t pgid) + : store{store}, + pgid{pgid} +{} + +namespace { + template<typename T> + std::optional<T> find_value(const CyanStore::omap_values_t& values, + string_view key) + { + auto found = values.find(key); + if (found == values.end()) { + return {}; + } + auto p = found->second.cbegin(); + T value; + decode(value, p); + return std::make_optional(std::move(value)); + } +} +seastar::future<epoch_t> PGMeta::get_epoch() +{ + auto ch = store->open_collection(coll_t{pgid}); + return store->omap_get_values(ch, + pgid.make_pgmeta_oid(), + {string{infover_key}, + string{epoch_key}}).then( + [](auto&& values) { + { + // sanity check + auto infover = find_value<__u8>(values, infover_key); + assert(infover); + if (*infover < 10) { + throw std::runtime_error("incompatible pg meta"); + } + } + { + auto epoch = find_value<epoch_t>(values, epoch_key); + assert(epoch); + return seastar::make_ready_future<epoch_t>(*epoch); + } + }); +} + +seastar::future<pg_info_t, PastIntervals> PGMeta::load() +{ + auto ch = store->open_collection(coll_t{pgid}); + return store->omap_get_values(ch, + pgid.make_pgmeta_oid(), + {string{infover_key}, + string{info_key}, + string{biginfo_key}, + string{fastinfo_key}}).then( + [this](auto&& values) { + { + // sanity check + auto infover = find_value<__u8>(values, infover_key); + assert(infover); + if (infover < 10) { + throw std::runtime_error("incompatible pg meta"); + } + } + pg_info_t info; + { + auto found = find_value<pg_info_t>(values, info_key); + assert(found); + info = *std::move(found); + } + PastIntervals past_intervals; + { + using biginfo_t = std::pair<PastIntervals, decltype(info.purged_snaps)>; + auto big_info = find_value<biginfo_t>(values, biginfo_key); + assert(big_info); + past_intervals = std::move(big_info->first); + info.purged_snaps = std::move(big_info->second); + } + { + auto fast_info = find_value<pg_fast_info_t>(values, fastinfo_key); + assert(fast_info); + fast_info->try_apply_to(&info); + } + return seastar::make_ready_future<pg_info_t, PastIntervals>( + std::move(info), + std::move(past_intervals)); + }); +} diff --git a/src/crimson/osd/pg_meta.h b/src/crimson/osd/pg_meta.h new file mode 100644 index 00000000..10f2234a --- /dev/null +++ b/src/crimson/osd/pg_meta.h @@ -0,0 +1,22 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> +#include "osd/osd_types.h" + +namespace ceph::os { + class CyanStore; +} + +/// PG related metadata +class PGMeta +{ + ceph::os::CyanStore* store; + const spg_t pgid; +public: + PGMeta(ceph::os::CyanStore *store, spg_t pgid); + seastar::future<epoch_t> get_epoch(); + seastar::future<pg_info_t, PastIntervals> load(); +}; diff --git a/src/crimson/osd/state.h b/src/crimson/osd/state.h new file mode 100644 index 00000000..4c445348 --- /dev/null +++ b/src/crimson/osd/state.h @@ -0,0 +1,71 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <string_view> +#include <ostream> + +class OSDMap; + +class OSDState { + + enum class State { + INITIALIZING, + PREBOOT, + BOOTING, + ACTIVE, + STOPPING, + WAITING_FOR_HEALTHY, + }; + + State state = State::INITIALIZING; + +public: + bool is_initializing() const { + return state == State::INITIALIZING; + } + bool is_preboot() const { + return state == State::PREBOOT; + } + bool is_booting() const { + return state == State::BOOTING; + } + bool is_active() const { + return state == State::ACTIVE; + } + bool is_stopping() const { + return state == State::STOPPING; + } + bool is_waiting_for_healthy() const { + return state == State::WAITING_FOR_HEALTHY; + } + void set_preboot() { + state = State::PREBOOT; + } + void set_booting() { + state = State::BOOTING; + } + void set_active() { + state = State::ACTIVE; + } + void set_stopping() { + state = State::STOPPING; + } + std::string_view to_string() const { + switch (state) { + case State::INITIALIZING: return "initializing"; + case State::PREBOOT: return "preboot"; + case State::BOOTING: return "booting"; + case State::ACTIVE: return "active"; + case State::STOPPING: return "stopping"; + case State::WAITING_FOR_HEALTHY: return "waiting_for_healthy"; + default: return "???"; + } + } +}; + +inline std::ostream& +operator<<(std::ostream& os, const OSDState& s) { + return os << s.to_string(); +} |