summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/single_thread_executor.h
blob: c69f2a2921327a363b7f2c175de239da9880bf8a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//  Copyright (c) Meta Platforms, Inc. and affiliates.
//
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
#pragma once

#if USE_COROUTINES
#include <atomic>

#include "folly/CPortability.h"
#include "folly/CppAttributes.h"
#include "folly/Executor.h"
#include "util/async_file_reader.h"

namespace ROCKSDB_NAMESPACE {
// Implements a simple executor that runs callback functions in the same
// thread, unlike CPUThreadExecutor which may schedule the callback on
// another thread. Runs in a tight loop calling the queued callbacks,
// and polls for async IO completions when idle. The completions will
// resume suspended coroutines and they get added to the queue, which
// will get picked up by this loop.
// Any possibility of deadlock is precluded because the file system
// guarantees that async IO completion callbacks will not be scheduled
// to run in this thread or this executor.
class SingleThreadExecutor : public folly::Executor {
 public:
  explicit SingleThreadExecutor(AsyncFileReader& reader)
      : reader_(reader), busy_(false) {}

  void add(folly::Func callback) override {
    auto& q = q_;
    q.push(std::move(callback));
    if (q.size() == 1 && !busy_) {
      while (!q.empty()) {
        q.front()();
        q.pop();

        if (q.empty()) {
          // Prevent recursion, as the Wait may queue resumed coroutines
          busy_ = true;
          reader_.Wait();
          busy_ = false;
        }
      }
    }
  }

 private:
  std::queue<folly::Func> q_;
  AsyncFileReader& reader_;
  bool busy_;
};
}  // namespace ROCKSDB_NAMESPACE
#endif  // USE_COROUTINES