summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/periodic_task_scheduler.cc
blob: 2024510dd6062c155ebd5260e4d1b700bbaf9540 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//  Copyright (c) Meta Platforms, Inc. and affiliates.
//
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#include "db/periodic_task_scheduler.h"

#include "rocksdb/system_clock.h"

#ifndef ROCKSDB_LITE
namespace ROCKSDB_NAMESPACE {

// `timer_mutex` is a global mutex serves 3 purposes currently:
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
//     they are currently not implemented in a thread-safe way; and
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
//     the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
// (3) protect tasks_map_ in PeriodicTaskScheduler
// Note: It's not efficient to have a static global mutex, for
// PeriodicTaskScheduler it should be okay, as the operations are called
// infrequently.
static port::Mutex timer_mutex;

static const std::map<PeriodicTaskType, uint64_t> kDefaultPeriodSeconds = {
    {PeriodicTaskType::kDumpStats, kInvalidPeriodSec},
    {PeriodicTaskType::kPersistStats, kInvalidPeriodSec},
    {PeriodicTaskType::kFlushInfoLog, 10},
    {PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec},
};

static const std::map<PeriodicTaskType, std::string> kPeriodicTaskTypeNames = {
    {PeriodicTaskType::kDumpStats, "dump_st"},
    {PeriodicTaskType::kPersistStats, "pst_st"},
    {PeriodicTaskType::kFlushInfoLog, "flush_info_log"},
    {PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"},
};

Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
                                       const PeriodicTaskFunc& fn) {
  return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type));
}

Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
                                       const PeriodicTaskFunc& fn,
                                       uint64_t repeat_period_seconds) {
  MutexLock l(&timer_mutex);
  static std::atomic<uint64_t> initial_delay(0);

  if (repeat_period_seconds == kInvalidPeriodSec) {
    return Status::InvalidArgument("Invalid task repeat period");
  }
  auto it = tasks_map_.find(task_type);
  if (it != tasks_map_.end()) {
    // the task already exists and it's the same, no update needed
    if (it->second.repeat_every_sec == repeat_period_seconds) {
      return Status::OK();
    }
    // cancel the existing one before register new one
    timer_->Cancel(it->second.name);
    tasks_map_.erase(it);
  }

  timer_->Start();
  // put task type name as prefix, for easy debug
  std::string unique_id =
      kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++);

  bool succeeded = timer_->Add(
      fn, unique_id,
      (initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond,
      repeat_period_seconds * kMicrosInSecond);
  if (!succeeded) {
    return Status::Aborted("Failed to register periodic task");
  }
  auto result = tasks_map_.try_emplace(
      task_type, TaskInfo{unique_id, repeat_period_seconds});
  if (!result.second) {
    return Status::Aborted("Failed to add periodic task");
  };
  return Status::OK();
}

Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) {
  MutexLock l(&timer_mutex);
  auto it = tasks_map_.find(task_type);
  if (it != tasks_map_.end()) {
    timer_->Cancel(it->second.name);
    tasks_map_.erase(it);
  }
  if (!timer_->HasPendingTask()) {
    timer_->Shutdown();
  }
  return Status::OK();
}

Timer* PeriodicTaskScheduler::Default() {
  static Timer timer(SystemClock::Default().get());
  return &timer;
}

#ifndef NDEBUG
void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) {
  static Timer test_timer(clock);
  test_timer.TEST_OverrideTimer(clock);
  MutexLock l(&timer_mutex);
  timer_ = &test_timer;
}
#endif  // NDEBUG

}  // namespace ROCKSDB_NAMESPACE

#endif  // ROCKSDB_LITE