summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/repeatable_thread.h
blob: 967cc49945e4424c5c4f900d97801e5ee61f3395 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#pragma once

#include <functional>
#include <string>

#include "port/port.h"
#include "rocksdb/env.h"
#include "util/mock_time_env.h"
#include "util/mutexlock.h"

namespace rocksdb {

class RepeatableThread {
 public:
  RepeatableThread(std::function<void()> function,
                   const std::string& thread_name, Env* env, uint64_t delay_us,
                   uint64_t initial_delay_us = 0)
      : function_(function),
        thread_name_("rocksdb:" + thread_name),
        env_(env),
        delay_us_(delay_us),
        initial_delay_us_(initial_delay_us),
        mutex_(env),
        cond_var_(&mutex_),
        running_(true),
#ifndef NDEBUG
        waiting_(false),
        run_count_(0),
#endif
        thread_([this] { thread(); }) {
  }

  void cancel() {
    {
      InstrumentedMutexLock l(&mutex_);
      if (!running_) {
        return;
      }
      running_ = false;
      cond_var_.SignalAll();
    }
    thread_.join();
  }

  bool IsRunning() { return running_; }

  ~RepeatableThread() { cancel(); }

#ifndef NDEBUG
  // Wait until RepeatableThread starting waiting, call the optional callback,
  // then wait for one run of RepeatableThread. Tests can use provide a
  // custom env object to mock time, and use the callback here to bump current
  // time and trigger RepeatableThread. See repeatable_thread_test for example.
  //
  // Note: only support one caller of this method.
  void TEST_WaitForRun(std::function<void()> callback = nullptr) {
    InstrumentedMutexLock l(&mutex_);
    while (!waiting_) {
      cond_var_.Wait();
    }
    uint64_t prev_count = run_count_;
    if (callback != nullptr) {
      callback();
    }
    cond_var_.SignalAll();
    while (!(run_count_ > prev_count)) {
      cond_var_.Wait();
    }
  }
#endif

 private:
  bool wait(uint64_t delay) {
    InstrumentedMutexLock l(&mutex_);
    if (running_ && delay > 0) {
      uint64_t wait_until = env_->NowMicros() + delay;
#ifndef NDEBUG
      waiting_ = true;
      cond_var_.SignalAll();
#endif
      while (running_) {
        cond_var_.TimedWait(wait_until);
        if (env_->NowMicros() >= wait_until) {
          break;
        }
      }
#ifndef NDEBUG
      waiting_ = false;
#endif
    }
    return running_;
  }

  void thread() {
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
    // Set thread name.
    auto thread_handle = thread_.native_handle();
    int ret __attribute__((__unused__)) =
        pthread_setname_np(thread_handle, thread_name_.c_str());
    assert(ret == 0);
#endif
#endif

    assert(delay_us_ > 0);
    if (!wait(initial_delay_us_)) {
      return;
    }
    do {
      function_();
#ifndef NDEBUG
      {
        InstrumentedMutexLock l(&mutex_);
        run_count_++;
        cond_var_.SignalAll();
      }
#endif
    } while (wait(delay_us_));
  }

  const std::function<void()> function_;
  const std::string thread_name_;
  Env* const env_;
  const uint64_t delay_us_;
  const uint64_t initial_delay_us_;

  // Mutex lock should be held when accessing running_, waiting_
  // and run_count_.
  InstrumentedMutex mutex_;
  InstrumentedCondVar cond_var_;
  bool running_;
#ifndef NDEBUG
  // RepeatableThread waiting for timeout.
  bool waiting_;
  // Times function_ had run.
  uint64_t run_count_;
#endif
  port::Thread thread_;
};

}  // namespace rocksdb