From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/util/single_thread_executor.h | 56 +++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/rocksdb/util/single_thread_executor.h (limited to 'src/rocksdb/util/single_thread_executor.h') diff --git a/src/rocksdb/util/single_thread_executor.h b/src/rocksdb/util/single_thread_executor.h new file mode 100644 index 000000000..c69f2a292 --- /dev/null +++ b/src/rocksdb/util/single_thread_executor.h @@ -0,0 +1,56 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#if USE_COROUTINES +#include + +#include "folly/CPortability.h" +#include "folly/CppAttributes.h" +#include "folly/Executor.h" +#include "util/async_file_reader.h" + +namespace ROCKSDB_NAMESPACE { +// Implements a simple executor that runs callback functions in the same +// thread, unlike CPUThreadExecutor which may schedule the callback on +// another thread. Runs in a tight loop calling the queued callbacks, +// and polls for async IO completions when idle. The completions will +// resume suspended coroutines and they get added to the queue, which +// will get picked up by this loop. +// Any possibility of deadlock is precluded because the file system +// guarantees that async IO completion callbacks will not be scheduled +// to run in this thread or this executor. +class SingleThreadExecutor : public folly::Executor { + public: + explicit SingleThreadExecutor(AsyncFileReader& reader) + : reader_(reader), busy_(false) {} + + void add(folly::Func callback) override { + auto& q = q_; + q.push(std::move(callback)); + if (q.size() == 1 && !busy_) { + while (!q.empty()) { + q.front()(); + q.pop(); + + if (q.empty()) { + // Prevent recursion, as the Wait may queue resumed coroutines + busy_ = true; + reader_.Wait(); + busy_ = false; + } + } + } + } + + private: + std::queue q_; + AsyncFileReader& reader_; + bool busy_; +}; +} // namespace ROCKSDB_NAMESPACE +#endif // USE_COROUTINES -- cgit v1.2.3