diff options
Diffstat (limited to 'src/rocksdb/util/timer_queue.h')
-rw-r--r-- | src/rocksdb/util/timer_queue.h | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/src/rocksdb/util/timer_queue.h b/src/rocksdb/util/timer_queue.h new file mode 100644 index 00000000..bd8a4f85 --- /dev/null +++ b/src/rocksdb/util/timer_queue.h @@ -0,0 +1,230 @@ +// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Borrowed from +// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ +// Timer Queue +// +// License +// +// The source code in this article is licensed under the CC0 license, so feel +// free to copy, modify, share, do whatever you want with it. +// No attribution is required, but Ill be happy if you do. +// CC0 license + +// The person who associated a work with this deed has dedicated the work to the +// public domain by waiving all of his or her rights to the work worldwide +// under copyright law, including all related and neighboring rights, to the +// extent allowed by law. You can copy, modify, distribute and perform the +// work, even for commercial purposes, all without asking permission. + +#pragma once + +#include <assert.h> +#include <chrono> +#include <condition_variable> +#include <functional> +#include <queue> +#include <thread> +#include <utility> +#include <vector> + +#include "port/port.h" +#include "util/sync_point.h" + +// Allows execution of handlers at a specified time in the future +// Guarantees: +// - All handlers are executed ONCE, even if cancelled (aborted parameter will +// be set to true) +// - If TimerQueue is destroyed, it will cancel all handlers. +// - Handlers are ALWAYS executed in the Timer Queue worker thread. +// - Handlers execution order is NOT guaranteed +// +//////////////////////////////////////////////////////////////////////////////// +// borrowed from +// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ +class TimerQueue { + public: + TimerQueue() : m_th(&TimerQueue::run, this) {} + + ~TimerQueue() { shutdown(); } + + // This function is not thread-safe. + void shutdown() { + if (closed_) { + return; + } + cancelAll(); + // Abusing the timer queue to trigger the shutdown. + add(0, [this](bool) { + m_finish = true; + return std::make_pair(false, 0); + }); + m_th.join(); + closed_ = true; + } + + // Adds a new timer + // \return + // Returns the ID of the new timer. You can use this ID to cancel the + // timer + uint64_t add(int64_t milliseconds, + std::function<std::pair<bool, int64_t>(bool)> handler) { + WorkItem item; + Clock::time_point tp = Clock::now(); + item.end = tp + std::chrono::milliseconds(milliseconds); + TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end); + item.period = milliseconds; + item.handler = std::move(handler); + + std::unique_lock<std::mutex> lk(m_mtx); + uint64_t id = ++m_idcounter; + item.id = id; + m_items.push(std::move(item)); + + // Something changed, so wake up timer thread + m_checkWork.notify_one(); + return id; + } + + // Cancels the specified timer + // \return + // 1 if the timer was cancelled. + // 0 if you were too late to cancel (or the timer ID was never valid to + // start with) + size_t cancel(uint64_t id) { + // Instead of removing the item from the container (thus breaking the + // heap integrity), we set the item as having no handler, and put + // that handler on a new item at the top for immediate execution + // The timer thread will then ignore the original item, since it has no + // handler. + std::unique_lock<std::mutex> lk(m_mtx); + for (auto&& item : m_items.getContainer()) { + if (item.id == id && item.handler) { + WorkItem newItem; + // Zero time, so it stays at the top for immediate execution + newItem.end = Clock::time_point(); + newItem.id = 0; // Means it is a canceled item + // Move the handler from item to newitem (thus clearing item) + newItem.handler = std::move(item.handler); + m_items.push(std::move(newItem)); + + // Something changed, so wake up timer thread + m_checkWork.notify_one(); + return 1; + } + } + return 0; + } + + // Cancels all timers + // \return + // The number of timers cancelled + size_t cancelAll() { + // Setting all "end" to 0 (for immediate execution) is ok, + // since it maintains the heap integrity + std::unique_lock<std::mutex> lk(m_mtx); + m_cancel = true; + for (auto&& item : m_items.getContainer()) { + if (item.id && item.handler) { + item.end = Clock::time_point(); + item.id = 0; + } + } + auto ret = m_items.size(); + + m_checkWork.notify_one(); + return ret; + } + + private: + using Clock = std::chrono::steady_clock; + TimerQueue(const TimerQueue&) = delete; + TimerQueue& operator=(const TimerQueue&) = delete; + + void run() { + std::unique_lock<std::mutex> lk(m_mtx); + while (!m_finish) { + auto end = calcWaitTime_lock(); + if (end.first) { + // Timers found, so wait until it expires (or something else + // changes) + m_checkWork.wait_until(lk, end.second); + } else { + // No timers exist, so wait forever until something changes + m_checkWork.wait(lk); + } + + // Check and execute as much work as possible, such as, all expired + // timers + checkWork(&lk); + } + + // If we are shutting down, we should not have any items left, + // since the shutdown cancels all items + assert(m_items.size() == 0); + } + + std::pair<bool, Clock::time_point> calcWaitTime_lock() { + while (m_items.size()) { + if (m_items.top().handler) { + // Item present, so return the new wait time + return std::make_pair(true, m_items.top().end); + } else { + // Discard empty handlers (they were cancelled) + m_items.pop(); + } + } + + // No items found, so return no wait time (causes the thread to wait + // indefinitely) + return std::make_pair(false, Clock::time_point()); + } + + void checkWork(std::unique_lock<std::mutex>* lk) { + while (m_items.size() && m_items.top().end <= Clock::now()) { + WorkItem item(m_items.top()); + m_items.pop(); + + if (item.handler) { + (*lk).unlock(); + auto reschedule_pair = item.handler(item.id == 0); + (*lk).lock(); + if (!m_cancel && reschedule_pair.first) { + int64_t new_period = (reschedule_pair.second == -1) + ? item.period + : reschedule_pair.second; + + item.period = new_period; + item.end = Clock::now() + std::chrono::milliseconds(new_period); + m_items.push(std::move(item)); + } + } + } + } + + bool m_finish = false; + bool m_cancel = false; + uint64_t m_idcounter = 0; + std::condition_variable m_checkWork; + + struct WorkItem { + Clock::time_point end; + int64_t period; + uint64_t id; // id==0 means it was cancelled + std::function<std::pair<bool, int64_t>(bool)> handler; + bool operator>(const WorkItem& other) const { return end > other.end; } + }; + + std::mutex m_mtx; + // Inheriting from priority_queue, so we can access the internal container + class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>, + std::greater<WorkItem>> { + public: + std::vector<WorkItem>& getContainer() { return this->c; } + } m_items; + rocksdb::port::Thread m_th; + bool closed_ = false; +}; |