summaryrefslogtreecommitdiffstats
path: root/src/common/WeightedPriorityQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/WeightedPriorityQueue.h')
-rw-r--r--src/common/WeightedPriorityQueue.h353
1 files changed, 353 insertions, 0 deletions
diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h
new file mode 100644
index 000000000..cf34709b9
--- /dev/null
+++ b/src/common/WeightedPriorityQueue.h
@@ -0,0 +1,353 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef WP_QUEUE_H
+#define WP_QUEUE_H
+
+#include "OpQueue.h"
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/rbtree.hpp>
+#include <boost/intrusive/avl_set.hpp>
+
+#include "include/ceph_assert.h"
+
+namespace bi = boost::intrusive;
+
+template <typename T, typename S>
+class MapKey
+{
+ public:
+ bool operator()(const S i, const T &k) const
+ {
+ return i < k.key;
+ }
+ bool operator()(const T &k, const S i) const
+ {
+ return k.key < i;
+ }
+};
+
+template <typename T>
+class DelItem
+{
+ public:
+ void operator()(T* delete_this)
+ { delete delete_this; }
+};
+
+template <typename T, typename K>
+class WeightedPriorityQueue : public OpQueue <T, K>
+{
+ private:
+ class ListPair : public bi::list_base_hook<>
+ {
+ public:
+ unsigned cost;
+ T item;
+ ListPair(unsigned c, T&& i) :
+ cost(c),
+ item(std::move(i))
+ {}
+ };
+ class Klass : public bi::set_base_hook<>
+ {
+ typedef bi::list<ListPair> ListPairs;
+ typedef typename ListPairs::iterator Lit;
+ public:
+ K key; // klass
+ ListPairs lp;
+ Klass(K& k) :
+ key(k) {
+ }
+ ~Klass() {
+ lp.clear_and_dispose(DelItem<ListPair>());
+ }
+ friend bool operator< (const Klass &a, const Klass &b)
+ { return a.key < b.key; }
+ friend bool operator> (const Klass &a, const Klass &b)
+ { return a.key > b.key; }
+ friend bool operator== (const Klass &a, const Klass &b)
+ { return a.key == b.key; }
+ void insert(unsigned cost, T&& item, bool front) {
+ if (front) {
+ lp.push_front(*new ListPair(cost, std::move(item)));
+ } else {
+ lp.push_back(*new ListPair(cost, std::move(item)));
+ }
+ }
+ //Get the cost of the next item to dequeue
+ unsigned get_cost() const {
+ ceph_assert(!empty());
+ return lp.begin()->cost;
+ }
+ T pop() {
+ ceph_assert(!lp.empty());
+ T ret = std::move(lp.begin()->item);
+ lp.erase_and_dispose(lp.begin(), DelItem<ListPair>());
+ return ret;
+ }
+ bool empty() const {
+ return lp.empty();
+ }
+ unsigned get_size() const {
+ return lp.size();
+ }
+ void filter_class(std::list<T>* out) {
+ for (Lit i = --lp.end();; --i) {
+ if (out) {
+ out->push_front(std::move(i->item));
+ }
+ i = lp.erase_and_dispose(i, DelItem<ListPair>());
+ if (i == lp.begin()) {
+ break;
+ }
+ }
+ }
+ };
+ class SubQueue : public bi::set_base_hook<>
+ {
+ typedef bi::rbtree<Klass> Klasses;
+ typedef typename Klasses::iterator Kit;
+ void check_end() {
+ if (next == klasses.end()) {
+ next = klasses.begin();
+ }
+ }
+ public:
+ unsigned key; // priority
+ Klasses klasses;
+ Kit next;
+ SubQueue(unsigned& p) :
+ key(p),
+ next(klasses.begin()) {
+ }
+ ~SubQueue() {
+ klasses.clear_and_dispose(DelItem<Klass>());
+ }
+ friend bool operator< (const SubQueue &a, const SubQueue &b)
+ { return a.key < b.key; }
+ friend bool operator> (const SubQueue &a, const SubQueue &b)
+ { return a.key > b.key; }
+ friend bool operator== (const SubQueue &a, const SubQueue &b)
+ { return a.key == b.key; }
+ bool empty() const {
+ return klasses.empty();
+ }
+ void insert(K cl, unsigned cost, T&& item, bool front = false) {
+ typename Klasses::insert_commit_data insert_data;
+ std::pair<Kit, bool> ret =
+ klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data);
+ if (ret.second) {
+ ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data);
+ check_end();
+ }
+ ret.first->insert(cost, std::move(item), front);
+ }
+ unsigned get_cost() const {
+ ceph_assert(!empty());
+ return next->get_cost();
+ }
+ T pop() {
+ T ret = next->pop();
+ if (next->empty()) {
+ next = klasses.erase_and_dispose(next, DelItem<Klass>());
+ } else {
+ ++next;
+ }
+ check_end();
+ return ret;
+ }
+ void filter_class(K& cl, std::list<T>* out) {
+ Kit i = klasses.find(cl, MapKey<Klass, K>());
+ if (i != klasses.end()) {
+ i->filter_class(out);
+ Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>());
+ if (next == i) {
+ next = tmp;
+ }
+ check_end();
+ }
+ }
+ // this is intended for unit tests and should be never used on hot paths
+ unsigned get_size_slow() const {
+ unsigned count = 0;
+ for (const auto& klass : klasses) {
+ count += klass.get_size();
+ }
+ return count;
+ }
+ void dump(ceph::Formatter *f) const {
+ f->dump_int("num_keys", next->get_size());
+ if (!empty()) {
+ f->dump_int("first_item_cost", next->get_cost());
+ }
+ }
+ };
+ class Queue {
+ typedef bi::rbtree<SubQueue> SubQueues;
+ typedef typename SubQueues::iterator Sit;
+ SubQueues queues;
+ unsigned total_prio;
+ unsigned max_cost;
+ public:
+ Queue() :
+ total_prio(0),
+ max_cost(0) {
+ }
+ ~Queue() {
+ queues.clear_and_dispose(DelItem<SubQueue>());
+ }
+ bool empty() const {
+ return queues.empty();
+ }
+ void insert(unsigned p, K cl, unsigned cost, T&& item, bool front = false) {
+ typename SubQueues::insert_commit_data insert_data;
+ std::pair<typename SubQueues::iterator, bool> ret =
+ queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data);
+ if (ret.second) {
+ ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
+ total_prio += p;
+ }
+ ret.first->insert(cl, cost, std::move(item), front);
+ if (cost > max_cost) {
+ max_cost = cost;
+ }
+ }
+ T pop(bool strict = false) {
+ Sit i = --queues.end();
+ if (strict) {
+ T ret = i->pop();
+ if (i->empty()) {
+ queues.erase_and_dispose(i, DelItem<SubQueue>());
+ }
+ return ret;
+ }
+ if (queues.size() > 1) {
+ while (true) {
+ // Pick a new priority out of the total priority.
+ unsigned prio = rand() % total_prio + 1;
+ unsigned tp = total_prio - i->key;
+ // Find the priority corresponding to the picked number.
+ // Subtract high priorities to low priorities until the picked number
+ // is more than the total and try to dequeue that priority.
+ // Reverse the direction from previous implementation because there is a higher
+ // chance of dequeuing a high priority op so spend less time spinning.
+ while (prio <= tp) {
+ --i;
+ tp -= i->key;
+ }
+ // Flip a coin to see if this priority gets to run based on cost.
+ // The next op's cost is multiplied by .9 and subtracted from the
+ // max cost seen. Ops with lower costs will have a larger value
+ // and allow them to be selected easier than ops with high costs.
+ if (max_cost == 0 || rand() % max_cost <=
+ (max_cost - ((i->get_cost() * 9) / 10))) {
+ break;
+ }
+ i = --queues.end();
+ }
+ }
+ T ret = i->pop();
+ if (i->empty()) {
+ total_prio -= i->key;
+ queues.erase_and_dispose(i, DelItem<SubQueue>());
+ }
+ return ret;
+ }
+ void filter_class(K& cl, std::list<T>* out) {
+ for (Sit i = queues.begin(); i != queues.end();) {
+ i->filter_class(cl, out);
+ if (i->empty()) {
+ total_prio -= i->key;
+ i = queues.erase_and_dispose(i, DelItem<SubQueue>());
+ } else {
+ ++i;
+ }
+ }
+ }
+ // this is intended for unit tests and should be never used on hot paths
+ unsigned get_size_slow() const {
+ unsigned count = 0;
+ for (const auto& queue : queues) {
+ count += queue.get_size_slow();
+ }
+ return count;
+ }
+ void dump(ceph::Formatter *f) const {
+ for (typename SubQueues::const_iterator i = queues.begin();
+ i != queues.end(); ++i) {
+ f->dump_int("total_priority", total_prio);
+ f->dump_int("max_cost", max_cost);
+ f->open_object_section("subqueue");
+ f->dump_int("priority", i->key);
+ i->dump(f);
+ f->close_section();
+ }
+ }
+ };
+
+ Queue strict;
+ Queue normal;
+ public:
+ WeightedPriorityQueue(unsigned max_per, unsigned min_c) :
+ strict(),
+ normal()
+ {
+ std::srand(time(0));
+ }
+ void remove_by_class(K cl, std::list<T>* removed = 0) final {
+ strict.filter_class(cl, removed);
+ normal.filter_class(cl, removed);
+ }
+ bool empty() const final {
+ return strict.empty() && normal.empty();
+ }
+ void enqueue_strict(K cl, unsigned p, T&& item) final {
+ strict.insert(p, cl, 0, std::move(item));
+ }
+ void enqueue_strict_front(K cl, unsigned p, T&& item) final {
+ strict.insert(p, cl, 0, std::move(item), true);
+ }
+ void enqueue(K cl, unsigned p, unsigned cost, T&& item) final {
+ normal.insert(p, cl, cost, std::move(item));
+ }
+ void enqueue_front(K cl, unsigned p, unsigned cost, T&& item) final {
+ normal.insert(p, cl, cost, std::move(item), true);
+ }
+ T dequeue() override {
+ ceph_assert(!empty());
+ if (!strict.empty()) {
+ return strict.pop(true);
+ }
+ return normal.pop();
+ }
+ unsigned get_size_slow() {
+ return strict.get_size_slow() + normal.get_size_slow();
+ }
+ void dump(ceph::Formatter *f) const override {
+ f->open_array_section("high_queues");
+ strict.dump(f);
+ f->close_section();
+ f->open_array_section("queues");
+ normal.dump(f);
+ f->close_section();
+ }
+
+ void print(std::ostream &ostream) const final {
+ ostream << "WeightedPriorityQueue";
+ }
+};
+
+#endif