summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/watch.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/watch.cc')
-rw-r--r--src/crimson/osd/watch.cc354
1 files changed, 354 insertions, 0 deletions
diff --git a/src/crimson/osd/watch.cc b/src/crimson/osd/watch.cc
new file mode 100644
index 000000000..4573333c3
--- /dev/null
+++ b/src/crimson/osd/watch.cc
@@ -0,0 +1,354 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/range/algorithm_ext/insert.hpp>
+
+#include "crimson/osd/watch.h"
+#include "crimson/osd/osd_operations/internal_client_request.h"
+
+#include "messages/MWatchNotify.h"
+
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+// a watcher can remove itself if it has not seen a notification after a period of time.
+// in the case, we need to drop it also from the persisted `ObjectState` instance.
+// this operation resembles a bit the `_UNWATCH` subop.
+class WatchTimeoutRequest final : public InternalClientRequest {
+public:
+ WatchTimeoutRequest(WatchRef watch, Ref<PG> pg)
+ : InternalClientRequest(std::move(pg)),
+ watch(std::move(watch)) {
+ }
+
+ const hobject_t& get_target_oid() const final;
+ PG::do_osd_ops_params_t get_do_osd_ops_params() const final;
+ std::vector<OSDOp> create_osd_ops() final;
+
+private:
+ WatchRef watch;
+};
+
+const hobject_t& WatchTimeoutRequest::get_target_oid() const
+{
+ assert(watch->obc);
+ return watch->obc->get_oid();
+}
+
+PG::do_osd_ops_params_t
+WatchTimeoutRequest::get_do_osd_ops_params() const
+{
+ osd_reqid_t reqid;
+ reqid.name = watch->entity_name;
+ PG::do_osd_ops_params_t params{
+ watch->conn,
+ reqid,
+ ceph_clock_now(),
+ get_pg().get_osdmap_epoch(),
+ entity_inst_t{ watch->entity_name, watch->winfo.addr },
+ 0
+ };
+ logger().debug("{}: params.reqid={}", __func__, params.reqid);
+ return params;
+}
+
+std::vector<OSDOp> WatchTimeoutRequest::create_osd_ops()
+{
+ logger().debug("{}", __func__);
+ assert(watch);
+ OSDOp osd_op;
+ osd_op.op.op = CEPH_OSD_OP_WATCH;
+ osd_op.op.flags = 0;
+ osd_op.op.watch.op = CEPH_OSD_WATCH_OP_UNWATCH;
+ osd_op.op.watch.cookie = watch->winfo.cookie;
+ return std::vector{std::move(osd_op)};
+}
+
+Watch::~Watch()
+{
+ logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
+}
+
+seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
+{
+ if (this->conn == conn) {
+ logger().debug("conn={} already connected", conn);
+ return seastar::now();
+ }
+ timeout_timer.cancel();
+ timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
+ this->conn = std::move(conn);
+ return seastar::now();
+}
+
+void Watch::disconnect()
+{
+ ceph_assert(!conn);
+ timeout_timer.cancel();
+ timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
+}
+
+seastar::future<> Watch::send_notify_msg(NotifyRef notify)
+{
+ logger().info("{} for notify(id={})", __func__, notify->ninfo.notify_id);
+ return conn->send(crimson::make_message<MWatchNotify>(
+ winfo.cookie,
+ notify->user_version,
+ notify->ninfo.notify_id,
+ CEPH_WATCH_EVENT_NOTIFY,
+ notify->ninfo.bl,
+ notify->client_gid));
+}
+
+seastar::future<> Watch::start_notify(NotifyRef notify)
+{
+ logger().debug("{} gid={} cookie={} starting notify(id={})",
+ __func__, get_watcher_gid(), get_cookie(),
+ notify->ninfo.notify_id);
+ auto [ it, emplaced ] = in_progress_notifies.emplace(std::move(notify));
+ ceph_assert(emplaced);
+ ceph_assert(is_alive());
+ return is_connected() ? send_notify_msg(*it) : seastar::now();
+}
+
+seastar::future<> Watch::notify_ack(
+ const uint64_t notify_id,
+ const ceph::bufferlist& reply_bl)
+{
+ logger().debug("{} gid={} cookie={} notify_id={}",
+ __func__, get_watcher_gid(), get_cookie(), notify_id);
+ const auto it = in_progress_notifies.find(notify_id);
+ if (it == std::end(in_progress_notifies)) {
+ logger().error("{} notify_id={} not found on the in-progess list."
+ " Supressing but this should not happen.",
+ __func__, notify_id);
+ return seastar::now();
+ }
+ auto notify = *it;
+ logger().debug("Watch::notify_ack gid={} cookie={} found notify(id={})",
+ get_watcher_gid(),
+ get_cookie(),
+ notify->get_id());
+ // let's ensure we're extending the life-time till end of this method
+ static_assert(std::is_same_v<decltype(notify), NotifyRef>);
+ in_progress_notifies.erase(it);
+ return notify->complete_watcher(shared_from_this(), reply_bl);
+}
+
+seastar::future<> Watch::send_disconnect_msg()
+{
+ if (!is_connected()) {
+ return seastar::now();
+ }
+ ceph::bufferlist empty;
+ return conn->send(crimson::make_message<MWatchNotify>(
+ winfo.cookie,
+ 0,
+ 0,
+ CEPH_WATCH_EVENT_DISCONNECT,
+ empty));
+}
+
+void Watch::discard_state()
+{
+ logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
+ ceph_assert(obc);
+ in_progress_notifies.clear();
+ timeout_timer.cancel();
+}
+
+void Watch::got_ping(utime_t)
+{
+ if (is_connected()) {
+ // using cancel() + arm() as rearm() has no overload for time delta.
+ timeout_timer.cancel();
+ timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
+ }
+}
+
+seastar::future<> Watch::remove()
+{
+ logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
+ // in contrast to ceph-osd crimson sends CEPH_WATCH_EVENT_DISCONNECT directly
+ // from the timeout handler and _after_ CEPH_WATCH_EVENT_NOTIFY_COMPLETE.
+ // this simplifies the Watch::remove() interface as callers aren't obliged
+ // anymore to decide whether EVENT_DISCONNECT needs to be send or not -- it
+ // becomes an implementation detail of Watch.
+ return seastar::do_for_each(in_progress_notifies,
+ [this_shared=shared_from_this()] (auto notify) {
+ logger().debug("Watch::remove gid={} cookie={} notify(id={})",
+ this_shared->get_watcher_gid(),
+ this_shared->get_cookie(),
+ notify->ninfo.notify_id);
+ return notify->remove_watcher(this_shared);
+ }).then([this] {
+ discard_state();
+ return seastar::now();
+ });
+}
+
+void Watch::cancel_notify(const uint64_t notify_id)
+{
+ logger().debug("{} gid={} cookie={} notify(id={})",
+ __func__, get_watcher_gid(), get_cookie(),
+ notify_id);
+ const auto it = in_progress_notifies.find(notify_id);
+ assert(it != std::end(in_progress_notifies));
+ in_progress_notifies.erase(it);
+}
+
+void Watch::do_watch_timeout()
+{
+ assert(pg);
+ auto [op, fut] = pg->get_shard_services().start_operation<WatchTimeoutRequest>(
+ shared_from_this(), pg);
+ std::ignore = std::move(fut).then([op=std::move(op), this] {
+ return send_disconnect_msg();
+ });
+}
+
+bool notify_reply_t::operator<(const notify_reply_t& rhs) const
+{
+ // comparing std::pairs to emphasize our legacy. ceph-osd stores
+ // notify_replies as std::multimap<std::pair<gid, cookie>, bl>.
+ // unfortunately, what seems to be an implementation detail, got
+ // exposed as part of our public API (the `reply_buffer` parameter
+ // of the `rados_notify` family).
+ const auto lhsp = std::make_pair(watcher_gid, watcher_cookie);
+ const auto rhsp = std::make_pair(rhs.watcher_gid, rhs.watcher_cookie);
+ return lhsp < rhsp;
+}
+
+std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs)
+{
+ out << "notify_reply_t{watcher_gid=" << rhs.watcher_gid
+ << ", watcher_cookie=" << rhs.watcher_cookie << "}";
+ return out;
+}
+
+Notify::Notify(crimson::net::ConnectionRef conn,
+ const notify_info_t& ninfo,
+ const uint64_t client_gid,
+ const uint64_t user_version)
+ : ninfo(ninfo),
+ conn(std::move(conn)),
+ client_gid(client_gid),
+ user_version(user_version)
+{}
+
+Notify::~Notify()
+{
+ logger().debug("{} for notify(id={})", __func__, ninfo.notify_id);
+}
+
+seastar::future<> Notify::remove_watcher(WatchRef watch)
+{
+ logger().debug("{} for notify(id={})", __func__, ninfo.notify_id);
+
+ if (discarded || complete) {
+ logger().debug("{} for notify(id={}) discarded/complete already"
+ " discarded: {} complete: {}", __func__,
+ ninfo.notify_id, discarded ,complete);
+ return seastar::now();
+ }
+ [[maybe_unused]] const auto num_removed = watchers.erase(watch);
+ assert(num_removed > 0);
+ if (watchers.empty()) {
+ complete = true;
+ [[maybe_unused]] bool was_armed = timeout_timer.cancel();
+ assert(was_armed);
+ return send_completion();
+ } else {
+ return seastar::now();
+ }
+}
+
+
+seastar::future<> Notify::complete_watcher(
+ WatchRef watch,
+ const ceph::bufferlist& reply_bl)
+{
+ logger().debug("{} for notify(id={})", __func__, ninfo.notify_id);
+
+ if (discarded || complete) {
+ logger().debug("{} for notify(id={}) discarded/complete already"
+ " discarded: {} complete: {}", __func__,
+ ninfo.notify_id, discarded ,complete);
+ return seastar::now();
+ }
+ notify_replies.emplace(notify_reply_t{
+ watch->get_watcher_gid(),
+ watch->get_cookie(),
+ reply_bl});
+ return remove_watcher(std::move(watch));
+}
+
+seastar::future<> Notify::send_completion(
+ std::set<WatchRef> timedout_watchers)
+{
+ logger().info("{} -- {} in progress watchers, timedout watchers {}",
+ __func__, watchers.size(), timedout_watchers.size());
+ logger().debug("{} sending notify replies: {}", __func__, notify_replies);
+
+ ceph::bufferlist empty;
+ auto reply = crimson::make_message<MWatchNotify>(
+ ninfo.cookie,
+ user_version,
+ ninfo.notify_id,
+ CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
+ empty,
+ client_gid);
+ ceph::bufferlist reply_bl;
+ {
+ std::vector<std::pair<uint64_t,uint64_t>> missed;
+ missed.reserve(std::size(timedout_watchers));
+ boost::insert(
+ missed, std::begin(missed),
+ timedout_watchers | boost::adaptors::transformed([] (auto w) {
+ return std::make_pair(w->get_watcher_gid(), w->get_cookie());
+ }));
+ ceph::encode(notify_replies, reply_bl);
+ ceph::encode(missed, reply_bl);
+ }
+ reply->set_data(std::move(reply_bl));
+ if (!timedout_watchers.empty()) {
+ reply->return_code = -ETIMEDOUT;
+ }
+ return conn->send(std::move(reply));
+}
+
+void Notify::do_notify_timeout()
+{
+ logger().debug("{} complete={}", __func__, complete);
+ if (complete) {
+ return;
+ }
+ // it might be that `this` is kept alive only because of the reference
+ // a watcher stores and which is being removed by `cancel_notify()`.
+ // to avoid use-after-free we bump up the ref counter with `guard_ptr`.
+ [[maybe_unused]] auto guard_ptr = shared_from_this();
+ for (auto& watcher : watchers) {
+ logger().debug("canceling watcher cookie={} gid={} use_count={}",
+ watcher->get_cookie(),
+ watcher->get_watcher_gid(),
+ watcher->use_count());
+ watcher->cancel_notify(ninfo.notify_id);
+ }
+ std::ignore = send_completion(std::move(watchers));
+ watchers.clear();
+}
+
+} // namespace crimson::osd
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::WatchTimeoutRequest> : fmt::ostream_formatter {};
+#endif