summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/async_file_reader.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/util/async_file_reader.h144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/rocksdb/util/async_file_reader.h b/src/rocksdb/util/async_file_reader.h
new file mode 100644
index 000000000..df69a840e
--- /dev/null
+++ b/src/rocksdb/util/async_file_reader.h
@@ -0,0 +1,144 @@
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+//
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).#pragma once
+#pragma once
+
+#if USE_COROUTINES
+#include "file/random_access_file_reader.h"
+#include "folly/experimental/coro/ViaIfAsync.h"
+#include "port/port.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/statistics.h"
+#include "util/autovector.h"
+#include "util/stop_watch.h"
+
+namespace ROCKSDB_NAMESPACE {
+class SingleThreadExecutor;
+
+// AsyncFileReader implements the Awaitable concept, which allows calling
+// coroutines to co_await it. When the AsyncFileReader Awaitable is
+// resumed, it initiates the fie reads requested by the awaiting caller
+// by calling RandomAccessFileReader's ReadAsync. It then suspends the
+// awaiting coroutine. The suspended awaiter is later resumed by Wait().
+class AsyncFileReader {
+ class ReadAwaiter;
+ template <typename Awaiter>
+ class ReadOperation;
+
+ public:
+ AsyncFileReader(FileSystem* fs, Statistics* stats) : fs_(fs), stats_(stats) {}
+
+ ~AsyncFileReader() {}
+
+ ReadOperation<ReadAwaiter> MultiReadAsync(RandomAccessFileReader* file,
+ const IOOptions& opts,
+ FSReadRequest* read_reqs,
+ size_t num_reqs,
+ AlignedBuf* aligned_buf) noexcept {
+ return ReadOperation<ReadAwaiter>{*this, file, opts,
+ read_reqs, num_reqs, aligned_buf};
+ }
+
+ private:
+ friend SingleThreadExecutor;
+
+ // Implementation of the Awaitable concept
+ class ReadAwaiter {
+ public:
+ explicit ReadAwaiter(AsyncFileReader& reader, RandomAccessFileReader* file,
+ const IOOptions& opts, FSReadRequest* read_reqs,
+ size_t num_reqs, AlignedBuf* /*aligned_buf*/) noexcept
+ : reader_(reader),
+ file_(file),
+ opts_(opts),
+ read_reqs_(read_reqs),
+ num_reqs_(num_reqs),
+ next_(nullptr) {}
+
+ bool await_ready() noexcept { return false; }
+
+ // A return value of true means suspend the awaiter (calling coroutine). The
+ // awaiting_coro parameter is the handle of the awaiter. The handle can be
+ // resumed later, so we cache it here.
+ bool await_suspend(
+ folly::coro::impl::coroutine_handle<> awaiting_coro) noexcept {
+ awaiting_coro_ = awaiting_coro;
+ // MultiReadAsyncImpl always returns true, so caller will be suspended
+ return reader_.MultiReadAsyncImpl(this);
+ }
+
+ void await_resume() noexcept {}
+
+ private:
+ friend AsyncFileReader;
+
+ // The parameters passed to MultiReadAsync are cached here when the caller
+ // calls MultiReadAsync. Later, when the execution of this awaitable is
+ // started, these are used to do the actual IO
+ AsyncFileReader& reader_;
+ RandomAccessFileReader* file_;
+ const IOOptions& opts_;
+ FSReadRequest* read_reqs_;
+ size_t num_reqs_;
+ autovector<void*, 32> io_handle_;
+ autovector<IOHandleDeleter, 32> del_fn_;
+ folly::coro::impl::coroutine_handle<> awaiting_coro_;
+ // Use this to link to the next ReadAwaiter in the suspended coroutine
+ // list. The head and tail of the list are tracked by AsyncFileReader.
+ // We use this approach rather than an STL container in order to avoid
+ // extra memory allocations. The coroutine call already allocates a
+ // ReadAwaiter object.
+ ReadAwaiter* next_;
+ };
+
+ // An instance of ReadOperation is returned to the caller of MultiGetAsync.
+ // This represents an awaitable that can be started later.
+ template <typename Awaiter>
+ class ReadOperation {
+ public:
+ explicit ReadOperation(AsyncFileReader& reader,
+ RandomAccessFileReader* file, const IOOptions& opts,
+ FSReadRequest* read_reqs, size_t num_reqs,
+ AlignedBuf* aligned_buf) noexcept
+ : reader_(reader),
+ file_(file),
+ opts_(opts),
+ read_reqs_(read_reqs),
+ num_reqs_(num_reqs),
+ aligned_buf_(aligned_buf) {}
+
+ auto viaIfAsync(folly::Executor::KeepAlive<> executor) const {
+ return folly::coro::co_viaIfAsync(
+ std::move(executor),
+ Awaiter{reader_, file_, opts_, read_reqs_, num_reqs_, aligned_buf_});
+ }
+
+ private:
+ AsyncFileReader& reader_;
+ RandomAccessFileReader* file_;
+ const IOOptions& opts_;
+ FSReadRequest* read_reqs_;
+ size_t num_reqs_;
+ AlignedBuf* aligned_buf_;
+ };
+
+ // This function does the actual work when this awaitable starts execution
+ bool MultiReadAsyncImpl(ReadAwaiter* awaiter);
+
+ // Called by the SingleThreadExecutor to poll for async IO completion.
+ // This also resumes the awaiting coroutines.
+ void Wait();
+
+ // Head of the queue of awaiters waiting for async IO completion
+ ReadAwaiter* head_ = nullptr;
+ // Tail of the awaiter queue
+ ReadAwaiter* tail_ = nullptr;
+ // Total number of pending async IOs
+ size_t num_reqs_ = 0;
+ FileSystem* fs_;
+ Statistics* stats_;
+};
+} // namespace ROCKSDB_NAMESPACE
+#endif // USE_COROUTINES