summaryrefslogtreecommitdiffstats
path: root/src/osd/OpQueueItem.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/osd/OpQueueItem.h
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/osd/OpQueueItem.h')
-rw-r--r--src/osd/OpQueueItem.h342
1 files changed, 342 insertions, 0 deletions
diff --git a/src/osd/OpQueueItem.h b/src/osd/OpQueueItem.h
new file mode 100644
index 00000000..558c5c88
--- /dev/null
+++ b/src/osd/OpQueueItem.h
@@ -0,0 +1,342 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <ostream>
+
+#include "include/types.h"
+#include "include/utime.h"
+#include "osd/OpRequest.h"
+#include "osd/PG.h"
+#include "PGPeeringEvent.h"
+
+class OSD;
+class OSDShard;
+
+class OpQueueItem {
+public:
+ class OrderLocker {
+ public:
+ using Ref = unique_ptr<OrderLocker>;
+ virtual void lock() = 0;
+ virtual void unlock() = 0;
+ virtual ~OrderLocker() {}
+ };
+ // Abstraction for operations queueable in the op queue
+ class OpQueueable {
+ public:
+ enum class op_type_t {
+ client_op,
+ peering_event,
+ bg_snaptrim,
+ bg_recovery,
+ bg_scrub,
+ bg_pg_delete
+ };
+ using Ref = std::unique_ptr<OpQueueable>;
+
+ /// Items with the same queue token will end up in the same shard
+ virtual uint32_t get_queue_token() const = 0;
+
+ /* Items will be dequeued and locked atomically w.r.t. other items with the
+ * same ordering token */
+ virtual const spg_t& get_ordering_token() const = 0;
+ virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0;
+ virtual op_type_t get_op_type() const = 0;
+ virtual boost::optional<OpRequestRef> maybe_get_op() const {
+ return boost::none;
+ }
+
+ virtual uint64_t get_reserved_pushes() const {
+ return 0;
+ }
+
+ virtual bool is_peering() const {
+ return false;
+ }
+ virtual bool peering_requires_pg() const {
+ ceph_abort();
+ }
+ virtual const PGCreateInfo *creates_pg() const {
+ return nullptr;
+ }
+
+ virtual ostream &print(ostream &rhs) const = 0;
+
+ virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
+ virtual ~OpQueueable() {}
+ friend ostream& operator<<(ostream& out, const OpQueueable& q) {
+ return q.print(out);
+ }
+
+ };
+
+private:
+ OpQueueable::Ref qitem;
+ int cost;
+ unsigned priority;
+ utime_t start_time;
+ uint64_t owner; ///< global id (e.g., client.XXX)
+ epoch_t map_epoch; ///< an epoch we expect the PG to exist in
+
+public:
+ OpQueueItem(
+ OpQueueable::Ref &&item,
+ int cost,
+ unsigned priority,
+ utime_t start_time,
+ uint64_t owner,
+ epoch_t e)
+ : qitem(std::move(item)),
+ cost(cost),
+ priority(priority),
+ start_time(start_time),
+ owner(owner),
+ map_epoch(e)
+ {}
+ OpQueueItem(OpQueueItem &&) = default;
+ OpQueueItem(const OpQueueItem &) = delete;
+ OpQueueItem &operator=(OpQueueItem &&) = default;
+ OpQueueItem &operator=(const OpQueueItem &) = delete;
+
+ OrderLocker::Ref get_order_locker(PGRef pg) {
+ return qitem->get_order_locker(pg);
+ }
+ uint32_t get_queue_token() const {
+ return qitem->get_queue_token();
+ }
+ const spg_t& get_ordering_token() const {
+ return qitem->get_ordering_token();
+ }
+ using op_type_t = OpQueueable::op_type_t;
+ OpQueueable::op_type_t get_op_type() const {
+ return qitem->get_op_type();
+ }
+ boost::optional<OpRequestRef> maybe_get_op() const {
+ return qitem->maybe_get_op();
+ }
+ uint64_t get_reserved_pushes() const {
+ return qitem->get_reserved_pushes();
+ }
+ void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) {
+ qitem->run(osd, sdata, pg, handle);
+ }
+ unsigned get_priority() const { return priority; }
+ int get_cost() const { return cost; }
+ utime_t get_start_time() const { return start_time; }
+ uint64_t get_owner() const { return owner; }
+ epoch_t get_map_epoch() const { return map_epoch; }
+
+ bool is_peering() const {
+ return qitem->is_peering();
+ }
+
+ const PGCreateInfo *creates_pg() const {
+ return qitem->creates_pg();
+ }
+
+ bool peering_requires_pg() const {
+ return qitem->peering_requires_pg();
+ }
+
+ friend ostream& operator<<(ostream& out, const OpQueueItem& item) {
+ out << "OpQueueItem("
+ << item.get_ordering_token() << " " << *item.qitem
+ << " prio " << item.get_priority()
+ << " cost " << item.get_cost()
+ << " e" << item.get_map_epoch();
+ if (item.get_reserved_pushes()) {
+ out << " reserved_pushes " << item.get_reserved_pushes();
+ }
+ return out << ")";
+ }
+}; // class OpQueueItem
+
+/// Implements boilerplate for operations queued for the pg lock
+class PGOpQueueable : public OpQueueItem::OpQueueable {
+ spg_t pgid;
+protected:
+ const spg_t& get_pgid() const {
+ return pgid;
+ }
+public:
+ explicit PGOpQueueable(spg_t pg) : pgid(pg) {}
+ uint32_t get_queue_token() const override final {
+ return get_pgid().ps();
+ }
+
+ const spg_t& get_ordering_token() const override final {
+ return get_pgid();
+ }
+
+ OpQueueItem::OrderLocker::Ref get_order_locker(PGRef pg) override final {
+ class Locker : public OpQueueItem::OrderLocker {
+ PGRef pg;
+ public:
+ explicit Locker(PGRef pg) : pg(pg) {}
+ void lock() override final {
+ pg->lock();
+ }
+ void unlock() override final {
+ pg->unlock();
+ }
+ };
+ return OpQueueItem::OrderLocker::Ref(
+ new Locker(pg));
+ }
+};
+
+class PGOpItem : public PGOpQueueable {
+ OpRequestRef op;
+public:
+ PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::client_op;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGOpItem(op=" << *(op->get_req()) << ")";
+ }
+ boost::optional<OpRequestRef> maybe_get_op() const override final {
+ return op;
+ }
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
+
+class PGPeeringItem : public PGOpQueueable {
+ PGPeeringEventRef evt;
+public:
+ PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::peering_event;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
+ }
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+ bool is_peering() const override {
+ return true;
+ }
+ bool peering_requires_pg() const override {
+ return evt->requires_pg;
+ }
+ const PGCreateInfo *creates_pg() const override {
+ return evt->create_info.get();
+ }
+};
+
+class PGSnapTrim : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGSnapTrim(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_snaptrim;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGSnapTrim(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
+
+class PGScrub : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGScrub(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_scrub;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGScrub(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
+
+class PGRecovery : public PGOpQueueable {
+ epoch_t epoch_queued;
+ uint64_t reserved_pushes;
+public:
+ PGRecovery(
+ spg_t pg,
+ epoch_t epoch_queued,
+ uint64_t reserved_pushes)
+ : PGOpQueueable(pg),
+ epoch_queued(epoch_queued),
+ reserved_pushes(reserved_pushes) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_recovery;
+ }
+ virtual ostream &print(ostream &rhs) const override final {
+ return rhs << "PGRecovery(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << "reserved_pushes=" << reserved_pushes
+ << ")";
+ }
+ virtual uint64_t get_reserved_pushes() const override final {
+ return reserved_pushes;
+ }
+ virtual void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
+
+class PGRecoveryContext : public PGOpQueueable {
+ unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
+ epoch_t epoch;
+public:
+ PGRecoveryContext(spg_t pgid,
+ GenContext<ThreadPool::TPHandle&> *c, epoch_t epoch)
+ : PGOpQueueable(pgid),
+ c(c), epoch(epoch) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_recovery;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGRecoveryContext(pgid=" << get_pgid()
+ << " c=" << c.get() << " epoch=" << epoch
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
+
+class PGDelete : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGDelete(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg),
+ epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_pg_delete;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGDelete(" << get_pgid()
+ << " e" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};