diff options
Diffstat (limited to '')
-rw-r--r-- | src/osd/OSDMapMapping.h | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h new file mode 100644 index 00000000..b0965fc1 --- /dev/null +++ b/src/osd/OSDMapMapping.h @@ -0,0 +1,352 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + + +#ifndef CEPH_OSDMAPMAPPING_H +#define CEPH_OSDMAPMAPPING_H + +#include <vector> +#include <map> + +#include "osd/osd_types.h" +#include "common/WorkQueue.h" +#include "common/Cond.h" + +class OSDMap; + +/// work queue to perform work on batches of pgids on multiple CPUs +class ParallelPGMapper { +public: + struct Job { + utime_t start, finish; + unsigned shards = 0; + const OSDMap *osdmap; + bool aborted = false; + Context *onfinish = nullptr; + + Mutex lock = {"ParallelPGMapper::Job::lock"}; + Cond cond; + + Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {} + virtual ~Job() { + ceph_assert(shards == 0); + } + + // child must implement either form of process + virtual void process(const vector<pg_t>& pgs) = 0; + virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0; + virtual void complete() = 0; + + void set_finish_event(Context *fin) { + lock.Lock(); + if (shards == 0) { + // already done. + lock.Unlock(); + fin->complete(0); + } else { + // set finisher + onfinish = fin; + lock.Unlock(); + } + } + bool is_done() { + std::lock_guard l(lock); + return shards == 0; + } + utime_t get_duration() { + return finish - start; + } + void wait() { + std::lock_guard l(lock); + while (shards > 0) { + cond.Wait(lock); + } + } + bool wait_for(double duration) { + utime_t until = start; + until += duration; + std::lock_guard l(lock); + while (shards > 0) { + if (ceph_clock_now() >= until) { + return false; + } + cond.Wait(lock); + } + return true; + } + void abort() { + Context *fin = nullptr; + { + std::lock_guard l(lock); + aborted = true; + fin = onfinish; + onfinish = nullptr; + while (shards > 0) { + cond.Wait(lock); + } + } + if (fin) { + fin->complete(-ECANCELED); + } + } + + void start_one() { + std::lock_guard l(lock); + ++shards; + } + void finish_one(); + }; + +protected: + CephContext *cct; + + struct Item { + Job *job; + int64_t pool; + unsigned begin, end; + vector<pg_t> pgs; + + Item(Job *j, vector<pg_t> pgs) : job(j), pgs(pgs) {} + Item(Job *j, int64_t p, unsigned b, unsigned e) + : job(j), + pool(p), + begin(b), + end(e) {} + }; + std::deque<Item*> q; + + struct WQ : public ThreadPool::WorkQueue<Item> { + ParallelPGMapper *m; + + WQ(ParallelPGMapper *m_, ThreadPool *tp) + : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp), + m(m_) {} + + bool _enqueue(Item *i) override { + m->q.push_back(i); + return true; + } + void _dequeue(Item *i) override { + ceph_abort(); + } + Item *_dequeue() override { + while (!m->q.empty()) { + Item *i = m->q.front(); + m->q.pop_front(); + if (i->job->aborted) { + i->job->finish_one(); + delete i; + } else { + return i; + } + } + return nullptr; + } + + void _process(Item *i, ThreadPool::TPHandle &h) override; + + void _clear() override { + ceph_assert(_empty()); + } + + bool _empty() override { + return m->q.empty(); + } + } wq; + +public: + ParallelPGMapper(CephContext *cct, ThreadPool *tp) + : cct(cct), + wq(this, tp) {} + + void queue( + Job *job, + unsigned pgs_per_item, + const vector<pg_t>& input_pgs); + + void drain() { + wq.drain(); + } +}; + + +/// a precalculated mapping of every PG for a given OSDMap +class OSDMapMapping { +public: + MEMPOOL_CLASS_HELPERS(); +private: + + struct PoolMapping { + MEMPOOL_CLASS_HELPERS(); + + unsigned size = 0; + unsigned pg_num = 0; + bool erasure = false; + mempool::osdmap_mapping::vector<int32_t> table; + + size_t row_size() const { + return + 1 + // acting_primary + 1 + // up_primary + 1 + // num acting + 1 + // num up + size + // acting + size; // up + } + + PoolMapping(int s, int p, bool e) + : size(s), + pg_num(p), + erasure(e), + table(pg_num * row_size()) { + } + + void get(size_t ps, + std::vector<int> *up, + int *up_primary, + std::vector<int> *acting, + int *acting_primary) const { + const int32_t *row = &table[row_size() * ps]; + if (acting_primary) { + *acting_primary = row[0]; + } + if (up_primary) { + *up_primary = row[1]; + } + if (acting) { + acting->resize(row[2]); + for (int i = 0; i < row[2]; ++i) { + (*acting)[i] = row[4 + i]; + } + } + if (up) { + up->resize(row[3]); + for (int i = 0; i < row[3]; ++i) { + (*up)[i] = row[4 + size + i]; + } + } + } + + void set(size_t ps, + const std::vector<int>& up, + int up_primary, + const std::vector<int>& acting, + int acting_primary) { + int32_t *row = &table[row_size() * ps]; + row[0] = acting_primary; + row[1] = up_primary; + // these should always be <= the pool size, but just in case, avoid + // blowing out the array. Note that our mapping is not completely + // accurate in this case--this is just to avoid crashing. + row[2] = std::min<int32_t>(acting.size(), size); + row[3] = std::min<int32_t>(up.size(), size); + for (int i = 0; i < row[2]; ++i) { + row[4 + i] = acting[i]; + } + for (int i = 0; i < row[3]; ++i) { + row[4 + size + i] = up[i]; + } + } + }; + + mempool::osdmap_mapping::map<int64_t,PoolMapping> pools; + mempool::osdmap_mapping::vector< + mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg + //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg + epoch_t epoch = 0; + uint64_t num_pgs = 0; + + void _init_mappings(const OSDMap& osdmap); + void _update_range( + const OSDMap& map, + int64_t pool, + unsigned pg_begin, unsigned pg_end); + + void _build_rmap(const OSDMap& osdmap); + + void _start(const OSDMap& osdmap) { + _init_mappings(osdmap); + } + void _finish(const OSDMap& osdmap); + + void _dump(); + + friend class ParallelPGMapper; + + struct MappingJob : public ParallelPGMapper::Job { + OSDMapMapping *mapping; + MappingJob(const OSDMap *osdmap, OSDMapMapping *m) + : Job(osdmap), mapping(m) { + mapping->_start(*osdmap); + } + void process(const vector<pg_t>& pgs) override {} + void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override { + mapping->_update_range(*osdmap, pool, ps_begin, ps_end); + } + void complete() override { + mapping->_finish(*osdmap); + } + }; + +public: + void get(pg_t pgid, + std::vector<int> *up, + int *up_primary, + std::vector<int> *acting, + int *acting_primary) const { + auto p = pools.find(pgid.pool()); + ceph_assert(p != pools.end()); + ceph_assert(pgid.ps() < p->second.pg_num); + p->second.get(pgid.ps(), up, up_primary, acting, acting_primary); + } + + bool get_primary_and_shard(pg_t pgid, + int *acting_primary, + spg_t *spgid) { + auto p = pools.find(pgid.pool()); + ceph_assert(p != pools.end()); + ceph_assert(pgid.ps() < p->second.pg_num); + vector<int> acting; + p->second.get(pgid.ps(), nullptr, nullptr, &acting, acting_primary); + if (p->second.erasure) { + for (uint8_t i = 0; i < acting.size(); ++i) { + if (acting[i] == *acting_primary) { + *spgid = spg_t(pgid, shard_id_t(i)); + return true; + } + } + return false; + } else { + *spgid = spg_t(pgid); + return true; + } + } + + const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) { + ceph_assert(osd < acting_rmap.size()); + return acting_rmap[osd]; + } + + void update(const OSDMap& map); + void update(const OSDMap& map, pg_t pgid); + + std::unique_ptr<MappingJob> start_update( + const OSDMap& map, + ParallelPGMapper& mapper, + unsigned pgs_per_item) { + std::unique_ptr<MappingJob> job(new MappingJob(&map, this)); + mapper.queue(job.get(), pgs_per_item, {}); + return job; + } + + epoch_t get_epoch() const { + return epoch; + } + + uint64_t get_num_pgs() const { + return num_pgs; + } +}; + + +#endif |