summaryrefslogtreecommitdiffstats
path: root/src/osd/OSDMapMapping.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/osd/OSDMapMapping.h
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/osd/OSDMapMapping.h354
1 files changed, 354 insertions, 0 deletions
diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h
new file mode 100644
index 000000000..216c30446
--- /dev/null
+++ b/src/osd/OSDMapMapping.h
@@ -0,0 +1,354 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+
+#ifndef CEPH_OSDMAPMAPPING_H
+#define CEPH_OSDMAPMAPPING_H
+
+#include <vector>
+#include <map>
+
+#include "osd/osd_types.h"
+#include "common/WorkQueue.h"
+#include "common/Cond.h"
+
+class OSDMap;
+
+/// work queue to perform work on batches of pgids on multiple CPUs
+class ParallelPGMapper {
+public:
+ struct Job {
+ utime_t start, finish;
+ unsigned shards = 0;
+ const OSDMap *osdmap;
+ bool aborted = false;
+ Context *onfinish = nullptr;
+
+ ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock");
+ ceph::condition_variable cond;
+
+ Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
+ virtual ~Job() {
+ ceph_assert(shards == 0);
+ }
+
+ // child must implement either form of process
+ virtual void process(const std::vector<pg_t>& pgs) = 0;
+ virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
+ virtual void complete() = 0;
+
+ void set_finish_event(Context *fin) {
+ lock.lock();
+ if (shards == 0) {
+ // already done.
+ lock.unlock();
+ fin->complete(0);
+ } else {
+ // set finisher
+ onfinish = fin;
+ lock.unlock();
+ }
+ }
+ bool is_done() {
+ std::lock_guard l(lock);
+ return shards == 0;
+ }
+ utime_t get_duration() {
+ return finish - start;
+ }
+ void wait() {
+ std::unique_lock l(lock);
+ cond.wait(l, [this] { return shards == 0; });
+ }
+ bool wait_for(double duration) {
+ utime_t until = start;
+ until += duration;
+ std::unique_lock l(lock);
+ while (shards > 0) {
+ if (ceph_clock_now() >= until) {
+ return false;
+ }
+ cond.wait(l);
+ }
+ return true;
+ }
+ void abort() {
+ Context *fin = nullptr;
+ {
+ std::unique_lock l(lock);
+ aborted = true;
+ fin = onfinish;
+ onfinish = nullptr;
+ cond.wait(l, [this] { return shards == 0; });
+ }
+ if (fin) {
+ fin->complete(-ECANCELED);
+ }
+ }
+
+ void start_one() {
+ std::lock_guard l(lock);
+ ++shards;
+ }
+ void finish_one();
+ };
+
+protected:
+ CephContext *cct;
+
+ struct Item {
+ Job *job;
+ int64_t pool;
+ unsigned begin, end;
+ std::vector<pg_t> pgs;
+
+ Item(Job *j, std::vector<pg_t> pgs) : job(j), pgs(pgs) {}
+ Item(Job *j, int64_t p, unsigned b, unsigned e)
+ : job(j),
+ pool(p),
+ begin(b),
+ end(e) {}
+ };
+ std::deque<Item*> q;
+
+ struct WQ : public ThreadPool::WorkQueue<Item> {
+ ParallelPGMapper *m;
+
+ WQ(ParallelPGMapper *m_, ThreadPool *tp)
+ : ThreadPool::WorkQueue<Item>(
+ "ParallelPGMapper::WQ",
+ ceph::make_timespan(m_->cct->_conf->threadpool_default_timeout),
+ ceph::timespan::zero(),
+ tp),
+ m(m_) {}
+
+ bool _enqueue(Item *i) override {
+ m->q.push_back(i);
+ return true;
+ }
+ void _dequeue(Item *i) override {
+ ceph_abort();
+ }
+ Item *_dequeue() override {
+ while (!m->q.empty()) {
+ Item *i = m->q.front();
+ m->q.pop_front();
+ if (i->job->aborted) {
+ i->job->finish_one();
+ delete i;
+ } else {
+ return i;
+ }
+ }
+ return nullptr;
+ }
+
+ void _process(Item *i, ThreadPool::TPHandle &h) override;
+
+ void _clear() override {
+ ceph_assert(_empty());
+ }
+
+ bool _empty() override {
+ return m->q.empty();
+ }
+ } wq;
+
+public:
+ ParallelPGMapper(CephContext *cct, ThreadPool *tp)
+ : cct(cct),
+ wq(this, tp) {}
+
+ void queue(
+ Job *job,
+ unsigned pgs_per_item,
+ const std::vector<pg_t>& input_pgs);
+
+ void drain() {
+ wq.drain();
+ }
+};
+
+
+/// a precalculated mapping of every PG for a given OSDMap
+class OSDMapMapping {
+public:
+ MEMPOOL_CLASS_HELPERS();
+private:
+
+ struct PoolMapping {
+ MEMPOOL_CLASS_HELPERS();
+
+ unsigned size = 0;
+ unsigned pg_num = 0;
+ bool erasure = false;
+ mempool::osdmap_mapping::vector<int32_t> table;
+
+ size_t row_size() const {
+ return
+ 1 + // acting_primary
+ 1 + // up_primary
+ 1 + // num acting
+ 1 + // num up
+ size + // acting
+ size; // up
+ }
+
+ PoolMapping(int s, int p, bool e)
+ : size(s),
+ pg_num(p),
+ erasure(e),
+ table(pg_num * row_size()) {
+ }
+
+ void get(size_t ps,
+ std::vector<int> *up,
+ int *up_primary,
+ std::vector<int> *acting,
+ int *acting_primary) const {
+ const int32_t *row = &table[row_size() * ps];
+ if (acting_primary) {
+ *acting_primary = row[0];
+ }
+ if (up_primary) {
+ *up_primary = row[1];
+ }
+ if (acting) {
+ acting->resize(row[2]);
+ for (int i = 0; i < row[2]; ++i) {
+ (*acting)[i] = row[4 + i];
+ }
+ }
+ if (up) {
+ up->resize(row[3]);
+ for (int i = 0; i < row[3]; ++i) {
+ (*up)[i] = row[4 + size + i];
+ }
+ }
+ }
+
+ void set(size_t ps,
+ const std::vector<int>& up,
+ int up_primary,
+ const std::vector<int>& acting,
+ int acting_primary) {
+ int32_t *row = &table[row_size() * ps];
+ row[0] = acting_primary;
+ row[1] = up_primary;
+ // these should always be <= the pool size, but just in case, avoid
+ // blowing out the array. Note that our mapping is not completely
+ // accurate in this case--this is just to avoid crashing.
+ row[2] = std::min<int32_t>(acting.size(), size);
+ row[3] = std::min<int32_t>(up.size(), size);
+ for (int i = 0; i < row[2]; ++i) {
+ row[4 + i] = acting[i];
+ }
+ for (int i = 0; i < row[3]; ++i) {
+ row[4 + size + i] = up[i];
+ }
+ }
+ };
+
+ mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
+ mempool::osdmap_mapping::vector<
+ mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg
+ //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
+ epoch_t epoch = 0;
+ uint64_t num_pgs = 0;
+
+ void _init_mappings(const OSDMap& osdmap);
+ void _update_range(
+ const OSDMap& map,
+ int64_t pool,
+ unsigned pg_begin, unsigned pg_end);
+
+ void _build_rmap(const OSDMap& osdmap);
+
+ void _start(const OSDMap& osdmap) {
+ _init_mappings(osdmap);
+ }
+ void _finish(const OSDMap& osdmap);
+
+ void _dump();
+
+ friend class ParallelPGMapper;
+
+ struct MappingJob : public ParallelPGMapper::Job {
+ OSDMapMapping *mapping;
+ MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
+ : Job(osdmap), mapping(m) {
+ mapping->_start(*osdmap);
+ }
+ void process(const std::vector<pg_t>& pgs) override {}
+ void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
+ mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
+ }
+ void complete() override {
+ mapping->_finish(*osdmap);
+ }
+ };
+ friend class OSDMapTest;
+ // for testing only
+ void update(const OSDMap& map);
+
+public:
+ void get(pg_t pgid,
+ std::vector<int> *up,
+ int *up_primary,
+ std::vector<int> *acting,
+ int *acting_primary) const {
+ auto p = pools.find(pgid.pool());
+ ceph_assert(p != pools.end());
+ ceph_assert(pgid.ps() < p->second.pg_num);
+ p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
+ }
+
+ bool get_primary_and_shard(pg_t pgid,
+ int *acting_primary,
+ spg_t *spgid) {
+ auto p = pools.find(pgid.pool());
+ ceph_assert(p != pools.end());
+ ceph_assert(pgid.ps() < p->second.pg_num);
+ std::vector<int> acting;
+ p->second.get(pgid.ps(), nullptr, nullptr, &acting, acting_primary);
+ if (p->second.erasure) {
+ for (uint8_t i = 0; i < acting.size(); ++i) {
+ if (acting[i] == *acting_primary) {
+ *spgid = spg_t(pgid, shard_id_t(i));
+ return true;
+ }
+ }
+ return false;
+ } else {
+ *spgid = spg_t(pgid);
+ return true;
+ }
+ }
+
+ const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
+ ceph_assert(osd < acting_rmap.size());
+ return acting_rmap[osd];
+ }
+
+ void update(const OSDMap& map, pg_t pgid);
+
+ std::unique_ptr<MappingJob> start_update(
+ const OSDMap& map,
+ ParallelPGMapper& mapper,
+ unsigned pgs_per_item) {
+ std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
+ mapper.queue(job.get(), pgs_per_item, {});
+ return job;
+ }
+
+ epoch_t get_epoch() const {
+ return epoch;
+ }
+
+ uint64_t get_num_pgs() const {
+ return num_pgs;
+ }
+};
+
+
+#endif