summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/timer_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/util/timer_queue.h')
-rw-r--r--src/rocksdb/util/timer_queue.h231
1 files changed, 231 insertions, 0 deletions
diff --git a/src/rocksdb/util/timer_queue.h b/src/rocksdb/util/timer_queue.h
new file mode 100644
index 000000000..36a1744ac
--- /dev/null
+++ b/src/rocksdb/util/timer_queue.h
@@ -0,0 +1,231 @@
+// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Borrowed from
+// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
+// Timer Queue
+//
+// License
+//
+// The source code in this article is licensed under the CC0 license, so feel
+// free to copy, modify, share, do whatever you want with it.
+// No attribution is required, but Ill be happy if you do.
+// CC0 license
+
+// The person who associated a work with this deed has dedicated the work to the
+// public domain by waiving all of his or her rights to the work worldwide
+// under copyright law, including all related and neighboring rights, to the
+// extent allowed by law. You can copy, modify, distribute and perform the
+// work, even for commercial purposes, all without asking permission.
+
+#pragma once
+
+#include <assert.h>
+
+#include <chrono>
+#include <condition_variable>
+#include <functional>
+#include <queue>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include "port/port.h"
+#include "test_util/sync_point.h"
+
+// Allows execution of handlers at a specified time in the future
+// Guarantees:
+// - All handlers are executed ONCE, even if cancelled (aborted parameter will
+// be set to true)
+// - If TimerQueue is destroyed, it will cancel all handlers.
+// - Handlers are ALWAYS executed in the Timer Queue worker thread.
+// - Handlers execution order is NOT guaranteed
+//
+////////////////////////////////////////////////////////////////////////////////
+// borrowed from
+// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
+class TimerQueue {
+ public:
+ TimerQueue() : m_th(&TimerQueue::run, this) {}
+
+ ~TimerQueue() { shutdown(); }
+
+ // This function is not thread-safe.
+ void shutdown() {
+ if (closed_) {
+ return;
+ }
+ cancelAll();
+ // Abusing the timer queue to trigger the shutdown.
+ add(0, [this](bool) {
+ m_finish = true;
+ return std::make_pair(false, 0);
+ });
+ m_th.join();
+ closed_ = true;
+ }
+
+ // Adds a new timer
+ // \return
+ // Returns the ID of the new timer. You can use this ID to cancel the
+ // timer
+ uint64_t add(int64_t milliseconds,
+ std::function<std::pair<bool, int64_t>(bool)> handler) {
+ WorkItem item;
+ Clock::time_point tp = Clock::now();
+ item.end = tp + std::chrono::milliseconds(milliseconds);
+ TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
+ item.period = milliseconds;
+ item.handler = std::move(handler);
+
+ std::unique_lock<std::mutex> lk(m_mtx);
+ uint64_t id = ++m_idcounter;
+ item.id = id;
+ m_items.push(std::move(item));
+
+ // Something changed, so wake up timer thread
+ m_checkWork.notify_one();
+ return id;
+ }
+
+ // Cancels the specified timer
+ // \return
+ // 1 if the timer was cancelled.
+ // 0 if you were too late to cancel (or the timer ID was never valid to
+ // start with)
+ size_t cancel(uint64_t id) {
+ // Instead of removing the item from the container (thus breaking the
+ // heap integrity), we set the item as having no handler, and put
+ // that handler on a new item at the top for immediate execution
+ // The timer thread will then ignore the original item, since it has no
+ // handler.
+ std::unique_lock<std::mutex> lk(m_mtx);
+ for (auto&& item : m_items.getContainer()) {
+ if (item.id == id && item.handler) {
+ WorkItem newItem;
+ // Zero time, so it stays at the top for immediate execution
+ newItem.end = Clock::time_point();
+ newItem.id = 0; // Means it is a canceled item
+ // Move the handler from item to newitem (thus clearing item)
+ newItem.handler = std::move(item.handler);
+ m_items.push(std::move(newItem));
+
+ // Something changed, so wake up timer thread
+ m_checkWork.notify_one();
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+ // Cancels all timers
+ // \return
+ // The number of timers cancelled
+ size_t cancelAll() {
+ // Setting all "end" to 0 (for immediate execution) is ok,
+ // since it maintains the heap integrity
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_cancel = true;
+ for (auto&& item : m_items.getContainer()) {
+ if (item.id && item.handler) {
+ item.end = Clock::time_point();
+ item.id = 0;
+ }
+ }
+ auto ret = m_items.size();
+
+ m_checkWork.notify_one();
+ return ret;
+ }
+
+ private:
+ using Clock = std::chrono::steady_clock;
+ TimerQueue(const TimerQueue&) = delete;
+ TimerQueue& operator=(const TimerQueue&) = delete;
+
+ void run() {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ while (!m_finish) {
+ auto end = calcWaitTime_lock();
+ if (end.first) {
+ // Timers found, so wait until it expires (or something else
+ // changes)
+ m_checkWork.wait_until(lk, end.second);
+ } else {
+ // No timers exist, so wait forever until something changes
+ m_checkWork.wait(lk);
+ }
+
+ // Check and execute as much work as possible, such as, all expired
+ // timers
+ checkWork(&lk);
+ }
+
+ // If we are shutting down, we should not have any items left,
+ // since the shutdown cancels all items
+ assert(m_items.size() == 0);
+ }
+
+ std::pair<bool, Clock::time_point> calcWaitTime_lock() {
+ while (m_items.size()) {
+ if (m_items.top().handler) {
+ // Item present, so return the new wait time
+ return std::make_pair(true, m_items.top().end);
+ } else {
+ // Discard empty handlers (they were cancelled)
+ m_items.pop();
+ }
+ }
+
+ // No items found, so return no wait time (causes the thread to wait
+ // indefinitely)
+ return std::make_pair(false, Clock::time_point());
+ }
+
+ void checkWork(std::unique_lock<std::mutex>* lk) {
+ while (m_items.size() && m_items.top().end <= Clock::now()) {
+ WorkItem item(m_items.top());
+ m_items.pop();
+
+ if (item.handler) {
+ (*lk).unlock();
+ auto reschedule_pair = item.handler(item.id == 0);
+ (*lk).lock();
+ if (!m_cancel && reschedule_pair.first) {
+ int64_t new_period = (reschedule_pair.second == -1)
+ ? item.period
+ : reschedule_pair.second;
+
+ item.period = new_period;
+ item.end = Clock::now() + std::chrono::milliseconds(new_period);
+ m_items.push(std::move(item));
+ }
+ }
+ }
+ }
+
+ bool m_finish = false;
+ bool m_cancel = false;
+ uint64_t m_idcounter = 0;
+ std::condition_variable m_checkWork;
+
+ struct WorkItem {
+ Clock::time_point end;
+ int64_t period;
+ uint64_t id; // id==0 means it was cancelled
+ std::function<std::pair<bool, int64_t>(bool)> handler;
+ bool operator>(const WorkItem& other) const { return end > other.end; }
+ };
+
+ std::mutex m_mtx;
+ // Inheriting from priority_queue, so we can access the internal container
+ class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>,
+ std::greater<WorkItem>> {
+ public:
+ std::vector<WorkItem>& getContainer() { return this->c; }
+ } m_items;
+ ROCKSDB_NAMESPACE::port::Thread m_th;
+ bool closed_ = false;
+};