diff options
Diffstat (limited to 'src/common/ceph_timer.h')
-rw-r--r-- | src/common/ceph_timer.h | 312 |
1 files changed, 312 insertions, 0 deletions
diff --git a/src/common/ceph_timer.h b/src/common/ceph_timer.h new file mode 100644 index 000000000..2be077834 --- /dev/null +++ b/src/common/ceph_timer.h @@ -0,0 +1,312 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef COMMON_CEPH_TIMER_H +#define COMMON_CEPH_TIMER_H + +#include <cassert> +#include <condition_variable> +#include <cstdint> +#include <functional> +#include <memory> +#include <mutex> +#include <thread> +#include <boost/intrusive/set.hpp> + +#include "include/function2.hpp" +#include "include/compat.h" + +#include "common/detail/construct_suspended.h" + +namespace bi = boost::intrusive; +namespace ceph { + +// Compared to the SafeTimer this does fewer allocations (you +// don't have to allocate a new Context every time you +// want to cue the next tick.) +// +// It also does not share a lock with the caller. If you call +// cancel event, it either cancels the event (and returns true) or +// you missed it. If this does not work for you, you can set up a +// flag and mutex of your own. +// +// You get to pick your clock. I like mono_clock, since I usually +// want to wait FOR a given duration. real_clock is worthwhile if +// you want to wait UNTIL a specific moment of wallclock time. If +// you want you can set up a timer that executes a function after +// you use up ten seconds of CPU time. + +template<typename TC> +class timer { + using sh = bi::set_member_hook<bi::link_mode<bi::normal_link>>; + + struct event { + typename TC::time_point t = typename TC::time_point::min(); + std::uint64_t id = 0; + fu2::unique_function<void()> f; + + sh schedule_link; + sh event_link; + + event() = default; + event(typename TC::time_point t, std::uint64_t id, + fu2::unique_function<void()> f) : t(t), id(id), f(std::move(f)) {} + + event(const event&) = delete; + event& operator =(const event&) = delete; + + event(event&&) = delete; + event& operator =(event&&) = delete; + + bool operator <(const event& e) const noexcept { + return t == e.t ? id < e.id : t < e.t; + } + }; + struct id_key { + using type = std::uint64_t; + const type& operator ()(const event& e) const noexcept { + return e.id; + } + }; + + bi::set<event, bi::member_hook<event, sh, &event::schedule_link>, + bi::constant_time_size<false>> schedule; + + bi::set<event, bi::member_hook<event, sh, &event::event_link>, + bi::constant_time_size<false>, + bi::key_of_value<id_key>> events; + + std::mutex lock; + std::condition_variable cond; + + event* running = nullptr; + std::uint64_t next_id = 0; + + bool suspended; + std::thread thread; + + void timer_thread() { + std::unique_lock l(lock); + while (!suspended) { + auto now = TC::now(); + + while (!schedule.empty()) { + auto p = schedule.begin(); + // Should we wait for the future? + if (p->t > now) + break; + + auto& e = *p; + schedule.erase(e); + events.erase(e.id); + + // Since we have only one thread it is impossible to have more + // than one running event + running = &e; + + l.unlock(); + p->f(); + l.lock(); + + if (running) { + running = nullptr; + delete &e; + } // Otherwise the event requeued itself + } + + if (suspended) + break; + if (schedule.empty()) { + cond.wait(l); + } else { + // Since wait_until takes its parameter by reference, passing + // the time /in the event/ is unsafe, as it might be canceled + // while we wait. + const auto t = schedule.begin()->t; + cond.wait_until(l, t); + } + } + } + +public: + timer() : suspended(false) { + thread = std::thread(&timer::timer_thread, this); + ceph_pthread_setname(thread.native_handle(), "ceph_timer"); + } + + // Create a suspended timer, jobs will be executed in order when + // it is resumed. + timer(construct_suspended_t) : suspended(true) {} + + timer(const timer&) = delete; + timer& operator =(const timer&) = delete; + + ~timer() { + suspend(); + cancel_all_events(); + } + + // Suspend operation of the timer (and let its thread die). + void suspend() { + std::unique_lock l(lock); + if (suspended) + return; + + suspended = true; + cond.notify_one(); + l.unlock(); + thread.join(); + } + + // Resume operation of the timer. (Must have been previously + // suspended.) + void resume() { + std::unique_lock l(lock); + if (!suspended) + return; + + suspended = false; + assert(!thread.joinable()); + thread = std::thread(&timer::timer_thread, this); + } + + // Schedule an event in the relative future + template<typename Callable, typename... Args> + std::uint64_t add_event(typename TC::duration duration, + Callable&& f, Args&&... args) { + return add_event(TC::now() + duration, + std::forward<Callable>(f), + std::forward<Args>(args)...); + } + + // Schedule an event in the absolute future + template<typename Callable, typename... Args> + std::uint64_t add_event(typename TC::time_point when, + Callable&& f, Args&&... args) { + std::lock_guard l(lock); + auto e = std::make_unique<event>(when, ++next_id, + std::bind(std::forward<Callable>(f), + std::forward<Args>(args)...)); + auto id = e->id; + auto i = schedule.insert(*e); + events.insert(*(e.release())); + + /* If the event we have just inserted comes before everything + * else, we need to adjust our timeout. */ + if (i.first == schedule.begin()) + cond.notify_one(); + + // Previously each event was a context, identified by a + // pointer, and each context to be called only once. Since you + // can queue the same function pointer, member function, + // lambda, or functor up multiple times, identifying things by + // function for the purposes of cancellation is no longer + // suitable. Thus: + return id; + } + + // Adjust the timeout of a currently-scheduled event (relative) + bool adjust_event(std::uint64_t id, typename TC::duration duration) { + return adjust_event(id, TC::now() + duration); + } + + // Adjust the timeout of a currently-scheduled event (absolute) + bool adjust_event(std::uint64_t id, typename TC::time_point when) { + std::lock_guard l(lock); + + auto it = events.find(id); + + if (it == events.end()) + return false; + + auto& e = *it; + + schedule.erase(e); + e.t = when; + schedule.insert(e); + + return true; + } + + // Cancel an event. If the event has already come and gone (or you + // never submitted it) you will receive false. Otherwise you will + // receive true and it is guaranteed the event will not execute. + bool cancel_event(const std::uint64_t id) { + std::lock_guard l(lock); + auto p = events.find(id); + if (p == events.end()) { + return false; + } + + auto& e = *p; + events.erase(e.id); + schedule.erase(e); + delete &e; + + return true; + } + + // Reschedules a currently running event in the relative + // future. Must be called only from an event executed by this + // timer. If you have a function that can be called either from + // this timer or some other way, it is your responsibility to make + // sure it can tell the difference only does not call + // reschedule_me in the non-timer case. + // + // Returns an event id. If you had an event_id from the first + // scheduling, replace it with this return value. + std::uint64_t reschedule_me(typename TC::duration duration) { + return reschedule_me(TC::now() + duration); + } + + // Reschedules a currently running event in the absolute + // future. Must be called only from an event executed by this + // timer. if you have a function that can be called either from + // this timer or some other way, it is your responsibility to make + // sure it can tell the difference only does not call + // reschedule_me in the non-timer case. + // + // Returns an event id. If you had an event_id from the first + // scheduling, replace it with this return value. + std::uint64_t reschedule_me(typename TC::time_point when) { + assert(std::this_thread::get_id() == thread.get_id()); + std::lock_guard l(lock); + running->t = when; + std::uint64_t id = ++next_id; + running->id = id; + schedule.insert(*running); + events.insert(*running); + + // Hacky, but keeps us from being deleted + running = nullptr; + + // Same function, but you get a new ID. + return id; + } + + // Remove all events from the queue. + void cancel_all_events() { + std::lock_guard l(lock); + while (!events.empty()) { + auto p = events.begin(); + event& e = *p; + schedule.erase(e); + events.erase(e.id); + delete &e; + } + } +}; // timer +} // namespace ceph + +#endif |