From 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 20:24:20 +0200 Subject: Adding upstream version 14.2.21. Signed-off-by: Daniel Baumann --- src/common/PrioritizedQueue.h | 348 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 348 insertions(+) create mode 100644 src/common/PrioritizedQueue.h (limited to 'src/common/PrioritizedQueue.h') diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h new file mode 100644 index 00000000..6d7de129 --- /dev/null +++ b/src/common/PrioritizedQueue.h @@ -0,0 +1,348 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef PRIORITY_QUEUE_H +#define PRIORITY_QUEUE_H + +#include "include/ceph_assert.h" + +#include "common/Formatter.h" +#include "common/OpQueue.h" + +/** + * Manages queue for normal and strict priority items + * + * On dequeue, the queue will select the lowest priority queue + * such that the q has bucket > cost of front queue item. + * + * If there is no such queue, we choose the next queue item for + * the highest priority queue. + * + * Before returning a dequeued item, we place into each bucket + * cost * (priority/total_priority) tokens. + * + * enqueue_strict and enqueue_strict_front queue items into queues + * which are serviced in strict priority order before items queued + * with enqueue and enqueue_front + * + * Within a priority class, we schedule round robin based on the class + * of type K used to enqueue items. e.g. you could use entity_inst_t + * to provide fairness for different clients. + */ +template +class PrioritizedQueue : public OpQueue { + int64_t total_priority; + int64_t max_tokens_per_subqueue; + int64_t min_cost; + + typedef std::list > ListPairs; + + struct SubQueue { + private: + typedef std::map Classes; + Classes q; + unsigned tokens, max_tokens; + int64_t size; + typename Classes::iterator cur; + public: + SubQueue(const SubQueue &other) + : q(other.q), + tokens(other.tokens), + max_tokens(other.max_tokens), + size(other.size), + cur(q.begin()) {} + SubQueue() + : tokens(0), + max_tokens(0), + size(0), cur(q.begin()) {} + void set_max_tokens(unsigned mt) { + max_tokens = mt; + } + unsigned get_max_tokens() const { + return max_tokens; + } + unsigned num_tokens() const { + return tokens; + } + void put_tokens(unsigned t) { + tokens += t; + if (tokens > max_tokens) { + tokens = max_tokens; + } + } + void take_tokens(unsigned t) { + if (tokens > t) { + tokens -= t; + } else { + tokens = 0; + } + } + void enqueue(K cl, unsigned cost, T &&item) { + q[cl].push_back(std::make_pair(cost, std::move(item))); + if (cur == q.end()) + cur = q.begin(); + size++; + } + void enqueue_front(K cl, unsigned cost, T &&item) { + q[cl].push_front(std::make_pair(cost, std::move(item))); + if (cur == q.end()) + cur = q.begin(); + size++; + } + std::pair &front() const { + ceph_assert(!(q.empty())); + ceph_assert(cur != q.end()); + return cur->second.front(); + } + T pop_front() { + ceph_assert(!(q.empty())); + ceph_assert(cur != q.end()); + T ret = std::move(cur->second.front().second); + cur->second.pop_front(); + if (cur->second.empty()) { + q.erase(cur++); + } else { + ++cur; + } + if (cur == q.end()) { + cur = q.begin(); + } + size--; + return ret; + } + unsigned length() const { + ceph_assert(size >= 0); + return (unsigned)size; + } + bool empty() const { + return q.empty(); + } + void remove_by_class(K k, std::list *out) { + typename Classes::iterator i = q.find(k); + if (i == q.end()) { + return; + } + size -= i->second.size(); + if (i == cur) { + ++cur; + } + if (out) { + for (typename ListPairs::reverse_iterator j = + i->second.rbegin(); + j != i->second.rend(); + ++j) { + out->push_front(std::move(j->second)); + } + } + q.erase(i); + if (cur == q.end()) { + cur = q.begin(); + } + } + + void dump(ceph::Formatter *f) const { + f->dump_int("tokens", tokens); + f->dump_int("max_tokens", max_tokens); + f->dump_int("size", size); + f->dump_int("num_keys", q.size()); + if (!empty()) { + f->dump_int("first_item_cost", front().first); + } + } + }; + + typedef std::map SubQueues; + SubQueues high_queue; + SubQueues queue; + + SubQueue *create_queue(unsigned priority) { + typename SubQueues::iterator p = queue.find(priority); + if (p != queue.end()) { + return &p->second; + } + total_priority += priority; + SubQueue *sq = &queue[priority]; + sq->set_max_tokens(max_tokens_per_subqueue); + return sq; + } + + void remove_queue(unsigned priority) { + ceph_assert(queue.count(priority)); + queue.erase(priority); + total_priority -= priority; + ceph_assert(total_priority >= 0); + } + + void distribute_tokens(unsigned cost) { + if (total_priority == 0) { + return; + } + for (typename SubQueues::iterator i = queue.begin(); + i != queue.end(); + ++i) { + i->second.put_tokens(((i->first * cost) / total_priority) + 1); + } + } + +public: + PrioritizedQueue(unsigned max_per, unsigned min_c) + : total_priority(0), + max_tokens_per_subqueue(max_per), + min_cost(min_c) + {} + + unsigned length() const { + unsigned total = 0; + for (typename SubQueues::const_iterator i = queue.begin(); + i != queue.end(); + ++i) { + ceph_assert(i->second.length()); + total += i->second.length(); + } + for (typename SubQueues::const_iterator i = high_queue.begin(); + i != high_queue.end(); + ++i) { + ceph_assert(i->second.length()); + total += i->second.length(); + } + return total; + } + + void remove_by_class(K k, std::list *out = 0) final { + for (typename SubQueues::iterator i = queue.begin(); + i != queue.end(); + ) { + i->second.remove_by_class(k, out); + if (i->second.empty()) { + unsigned priority = i->first; + ++i; + remove_queue(priority); + } else { + ++i; + } + } + for (typename SubQueues::iterator i = high_queue.begin(); + i != high_queue.end(); + ) { + i->second.remove_by_class(k, out); + if (i->second.empty()) { + high_queue.erase(i++); + } else { + ++i; + } + } + } + + void enqueue_strict(K cl, unsigned priority, T&& item) final { + high_queue[priority].enqueue(cl, 0, std::move(item)); + } + + void enqueue_strict_front(K cl, unsigned priority, T&& item) final { + high_queue[priority].enqueue_front(cl, 0, std::move(item)); + } + + void enqueue(K cl, unsigned priority, unsigned cost, T&& item) final { + if (cost < min_cost) + cost = min_cost; + if (cost > max_tokens_per_subqueue) + cost = max_tokens_per_subqueue; + create_queue(priority)->enqueue(cl, cost, std::move(item)); + } + + void enqueue_front(K cl, unsigned priority, unsigned cost, T&& item) final { + if (cost < min_cost) + cost = min_cost; + if (cost > max_tokens_per_subqueue) + cost = max_tokens_per_subqueue; + create_queue(priority)->enqueue_front(cl, cost, std::move(item)); + } + + bool empty() const final { + ceph_assert(total_priority >= 0); + ceph_assert((total_priority == 0) || !(queue.empty())); + return queue.empty() && high_queue.empty(); + } + + T dequeue() final { + ceph_assert(!empty()); + + if (!(high_queue.empty())) { + T ret = std::move(high_queue.rbegin()->second.front().second); + high_queue.rbegin()->second.pop_front(); + if (high_queue.rbegin()->second.empty()) { + high_queue.erase(high_queue.rbegin()->first); + } + return ret; + } + + // if there are multiple buckets/subqueues with sufficient tokens, + // we behave like a strict priority queue among all subqueues that + // are eligible to run. + for (typename SubQueues::iterator i = queue.begin(); + i != queue.end(); + ++i) { + ceph_assert(!(i->second.empty())); + if (i->second.front().first < i->second.num_tokens()) { + unsigned cost = i->second.front().first; + i->second.take_tokens(cost); + T ret = std::move(i->second.front().second); + i->second.pop_front(); + if (i->second.empty()) { + remove_queue(i->first); + } + distribute_tokens(cost); + return ret; + } + } + + // if no subqueues have sufficient tokens, we behave like a strict + // priority queue. + unsigned cost = queue.rbegin()->second.front().first; + T ret = std::move(queue.rbegin()->second.front().second); + queue.rbegin()->second.pop_front(); + if (queue.rbegin()->second.empty()) { + remove_queue(queue.rbegin()->first); + } + distribute_tokens(cost); + return ret; + } + + void dump(ceph::Formatter *f) const final { + f->dump_int("total_priority", total_priority); + f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue); + f->dump_int("min_cost", min_cost); + f->open_array_section("high_queues"); + for (typename SubQueues::const_iterator p = high_queue.begin(); + p != high_queue.end(); + ++p) { + f->open_object_section("subqueue"); + f->dump_int("priority", p->first); + p->second.dump(f); + f->close_section(); + } + f->close_section(); + f->open_array_section("queues"); + for (typename SubQueues::const_iterator p = queue.begin(); + p != queue.end(); + ++p) { + f->open_object_section("subqueue"); + f->dump_int("priority", p->first); + p->second.dump(f); + f->close_section(); + } + f->close_section(); + } +}; + +#endif -- cgit v1.2.3