summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/timer.h
blob: db71cefaf8dc9d7cb111b6a206971aa7642221bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//

#pragma once

#include <functional>
#include <memory>
#include <queue>
#include <unordered_map>
#include <utility>
#include <vector>

#include "monitoring/instrumented_mutex.h"
#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h"

namespace ROCKSDB_NAMESPACE {

// A Timer class to handle repeated work.
//
// `Start()` and `Shutdown()` are currently not thread-safe. The client must
// serialize calls to these two member functions.
//
// A single timer instance can handle multiple functions via a single thread.
// It is better to leave long running work to a dedicated thread pool.
//
// Timer can be started by calling `Start()`, and ended by calling `Shutdown()`.
// Work (in terms of a `void function`) can be scheduled by calling `Add` with
// a unique function name and de-scheduled by calling `Cancel`.
// Many functions can be added.
//
// Impl Details:
// A heap is used to keep track of when the next timer goes off.
// A map from a function name to the function keeps track of all the functions.
class Timer {
 public:
  explicit Timer(SystemClock* clock)
      : clock_(clock),
        mutex_(clock),
        cond_var_(&mutex_),
        running_(false),
        executing_task_(false) {}

  ~Timer() { Shutdown(); }

  // Add a new function to run.
  // fn_name has to be identical, otherwise it will fail to add and return false
  // start_after_us is the initial delay.
  // repeat_every_us is the interval between ending time of the last call and
  // starting time of the next call. For example, repeat_every_us = 2000 and
  // the function takes 1000us to run. If it starts at time [now]us, then it
  // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
  // repeat_every_us == 0 means do not repeat.
  bool Add(std::function<void()> fn, const std::string& fn_name,
           uint64_t start_after_us, uint64_t repeat_every_us) {
    auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0,
                                                  repeat_every_us);
    InstrumentedMutexLock l(&mutex_);
    // Assign time within mutex to make sure the next_run_time is larger than
    // the current running one
    fn_info->next_run_time_us = clock_->NowMicros() + start_after_us;
    // the new task start time should never before the current task executing
    // time, as the executing task can only be running if it's next_run_time_us
    // is due (<= clock_->NowMicros()).
    if (executing_task_ &&
        fn_info->next_run_time_us < heap_.top()->next_run_time_us) {
      return false;
    }
    auto it = map_.find(fn_name);
    if (it == map_.end()) {
      heap_.push(fn_info.get());
      map_.try_emplace(fn_name, std::move(fn_info));
    } else {
      // timer doesn't support duplicated function name
      return false;
    }
    cond_var_.SignalAll();
    return true;
  }

  void Cancel(const std::string& fn_name) {
    InstrumentedMutexLock l(&mutex_);

    // Mark the function with fn_name as invalid so that it will not be
    // requeued.
    auto it = map_.find(fn_name);
    if (it != map_.end() && it->second) {
      it->second->Cancel();
    }

    // If the currently running function is fn_name, then we need to wait
    // until it finishes before returning to caller.
    while (!heap_.empty() && executing_task_) {
      FunctionInfo* func_info = heap_.top();
      assert(func_info);
      if (func_info->name == fn_name) {
        WaitForTaskCompleteIfNecessary();
      } else {
        break;
      }
    }
  }

  void CancelAll() {
    InstrumentedMutexLock l(&mutex_);
    CancelAllWithLock();
  }

  // Start the Timer
  bool Start() {
    InstrumentedMutexLock l(&mutex_);
    if (running_) {
      return false;
    }

    running_ = true;
    thread_ = std::make_unique<port::Thread>(&Timer::Run, this);
    return true;
  }

  // Shutdown the Timer
  bool Shutdown() {
    {
      InstrumentedMutexLock l(&mutex_);
      if (!running_) {
        return false;
      }
      running_ = false;
      CancelAllWithLock();
      cond_var_.SignalAll();
    }

    if (thread_) {
      thread_->join();
    }
    return true;
  }

  bool HasPendingTask() const {
    InstrumentedMutexLock l(&mutex_);
    for (const auto& fn_info : map_) {
      if (fn_info.second->IsValid()) {
        return true;
      }
    }
    return false;
  }

#ifndef NDEBUG
  // Wait until Timer starting waiting, call the optional callback, then wait
  // for Timer waiting again.
  // Tests can provide a custom Clock object to mock time, and use the callback
  // here to bump current time and trigger Timer. See timer_test for example.
  //
  // Note: only support one caller of this method.
  void TEST_WaitForRun(const std::function<void()>& callback = nullptr) {
    InstrumentedMutexLock l(&mutex_);
    // It act as a spin lock
    while (executing_task_ ||
           (!heap_.empty() &&
            heap_.top()->next_run_time_us <= clock_->NowMicros())) {
      cond_var_.TimedWait(clock_->NowMicros() + 1000);
    }
    if (callback != nullptr) {
      callback();
    }
    cond_var_.SignalAll();
    do {
      cond_var_.TimedWait(clock_->NowMicros() + 1000);
    } while (executing_task_ ||
             (!heap_.empty() &&
              heap_.top()->next_run_time_us <= clock_->NowMicros()));
  }

  size_t TEST_GetPendingTaskNum() const {
    InstrumentedMutexLock l(&mutex_);
    size_t ret = 0;
    for (const auto& fn_info : map_) {
      if (fn_info.second->IsValid()) {
        ret++;
      }
    }
    return ret;
  }

  void TEST_OverrideTimer(SystemClock* clock) {
    InstrumentedMutexLock l(&mutex_);
    clock_ = clock;
  }
#endif  // NDEBUG

 private:
  void Run() {
    InstrumentedMutexLock l(&mutex_);

    while (running_) {
      if (heap_.empty()) {
        // wait
        TEST_SYNC_POINT("Timer::Run::Waiting");
        cond_var_.Wait();
        continue;
      }

      FunctionInfo* current_fn = heap_.top();
      assert(current_fn);

      if (!current_fn->IsValid()) {
        heap_.pop();
        map_.erase(current_fn->name);
        continue;
      }

      if (current_fn->next_run_time_us <= clock_->NowMicros()) {
        // make a copy of the function so it won't be changed after
        // mutex_.unlock.
        std::function<void()> fn = current_fn->fn;
        executing_task_ = true;
        mutex_.Unlock();
        // Execute the work
        fn();
        mutex_.Lock();
        executing_task_ = false;
        cond_var_.SignalAll();

        // Remove the work from the heap once it is done executing, make sure
        // it's the same function after executing the work while mutex is
        // released.
        // Note that we are just removing the pointer from the heap. Its
        // memory is still managed in the map (as it holds a unique ptr).
        // So current_fn is still a valid ptr.
        assert(heap_.top() == current_fn);
        heap_.pop();

        // current_fn may be cancelled already.
        if (current_fn->IsValid() && current_fn->repeat_every_us > 0) {
          assert(running_);
          current_fn->next_run_time_us =
              clock_->NowMicros() + current_fn->repeat_every_us;

          // Schedule new work into the heap with new time.
          heap_.push(current_fn);
        } else {
          // if current_fn is cancelled or no need to repeat, remove it from the
          // map to avoid leak.
          map_.erase(current_fn->name);
        }
      } else {
        cond_var_.TimedWait(current_fn->next_run_time_us);
      }
    }
  }

  void CancelAllWithLock() {
    mutex_.AssertHeld();
    if (map_.empty() && heap_.empty()) {
      return;
    }

    // With mutex_ held, set all tasks to invalid so that they will not be
    // re-queued.
    for (auto& elem : map_) {
      auto& func_info = elem.second;
      assert(func_info);
      func_info->Cancel();
    }

    // WaitForTaskCompleteIfNecessary() may release mutex_
    WaitForTaskCompleteIfNecessary();

    while (!heap_.empty()) {
      heap_.pop();
    }
    map_.clear();
  }

  // A wrapper around std::function to keep track when it should run next
  // and at what frequency.
  struct FunctionInfo {
    // the actual work
    std::function<void()> fn;
    // name of the function
    std::string name;
    // when the function should run next
    uint64_t next_run_time_us;
    // repeat interval
    uint64_t repeat_every_us;
    // controls whether this function is valid.
    // A function is valid upon construction and until someone explicitly
    // calls `Cancel()`.
    bool valid;

    FunctionInfo(std::function<void()>&& _fn, std::string _name,
                 const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
        : fn(std::move(_fn)),
          name(std::move(_name)),
          next_run_time_us(_next_run_time_us),
          repeat_every_us(_repeat_every_us),
          valid(true) {}

    void Cancel() { valid = false; }

    bool IsValid() const { return valid; }
  };

  void WaitForTaskCompleteIfNecessary() {
    mutex_.AssertHeld();
    while (executing_task_) {
      TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting");
      cond_var_.Wait();
    }
  }

  struct RunTimeOrder {
    bool operator()(const FunctionInfo* f1, const FunctionInfo* f2) {
      return f1->next_run_time_us > f2->next_run_time_us;
    }
  };

  SystemClock* clock_;
  // This mutex controls both the heap_ and the map_. It needs to be held for
  // making any changes in them.
  mutable InstrumentedMutex mutex_;
  InstrumentedCondVar cond_var_;
  std::unique_ptr<port::Thread> thread_;
  bool running_;
  bool executing_task_;

  std::priority_queue<FunctionInfo*, std::vector<FunctionInfo*>, RunTimeOrder>
      heap_;

  // In addition to providing a mapping from a function name to a function,
  // it is also responsible for memory management.
  std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_;
};

}  // namespace ROCKSDB_NAMESPACE