diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/util/async_file_reader.h | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/util/async_file_reader.h')
-rw-r--r-- | src/rocksdb/util/async_file_reader.h | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/src/rocksdb/util/async_file_reader.h b/src/rocksdb/util/async_file_reader.h new file mode 100644 index 000000000..df69a840e --- /dev/null +++ b/src/rocksdb/util/async_file_reader.h @@ -0,0 +1,144 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory).#pragma once +#pragma once + +#if USE_COROUTINES +#include "file/random_access_file_reader.h" +#include "folly/experimental/coro/ViaIfAsync.h" +#include "port/port.h" +#include "rocksdb/file_system.h" +#include "rocksdb/statistics.h" +#include "util/autovector.h" +#include "util/stop_watch.h" + +namespace ROCKSDB_NAMESPACE { +class SingleThreadExecutor; + +// AsyncFileReader implements the Awaitable concept, which allows calling +// coroutines to co_await it. When the AsyncFileReader Awaitable is +// resumed, it initiates the fie reads requested by the awaiting caller +// by calling RandomAccessFileReader's ReadAsync. It then suspends the +// awaiting coroutine. The suspended awaiter is later resumed by Wait(). +class AsyncFileReader { + class ReadAwaiter; + template <typename Awaiter> + class ReadOperation; + + public: + AsyncFileReader(FileSystem* fs, Statistics* stats) : fs_(fs), stats_(stats) {} + + ~AsyncFileReader() {} + + ReadOperation<ReadAwaiter> MultiReadAsync(RandomAccessFileReader* file, + const IOOptions& opts, + FSReadRequest* read_reqs, + size_t num_reqs, + AlignedBuf* aligned_buf) noexcept { + return ReadOperation<ReadAwaiter>{*this, file, opts, + read_reqs, num_reqs, aligned_buf}; + } + + private: + friend SingleThreadExecutor; + + // Implementation of the Awaitable concept + class ReadAwaiter { + public: + explicit ReadAwaiter(AsyncFileReader& reader, RandomAccessFileReader* file, + const IOOptions& opts, FSReadRequest* read_reqs, + size_t num_reqs, AlignedBuf* /*aligned_buf*/) noexcept + : reader_(reader), + file_(file), + opts_(opts), + read_reqs_(read_reqs), + num_reqs_(num_reqs), + next_(nullptr) {} + + bool await_ready() noexcept { return false; } + + // A return value of true means suspend the awaiter (calling coroutine). The + // awaiting_coro parameter is the handle of the awaiter. The handle can be + // resumed later, so we cache it here. + bool await_suspend( + folly::coro::impl::coroutine_handle<> awaiting_coro) noexcept { + awaiting_coro_ = awaiting_coro; + // MultiReadAsyncImpl always returns true, so caller will be suspended + return reader_.MultiReadAsyncImpl(this); + } + + void await_resume() noexcept {} + + private: + friend AsyncFileReader; + + // The parameters passed to MultiReadAsync are cached here when the caller + // calls MultiReadAsync. Later, when the execution of this awaitable is + // started, these are used to do the actual IO + AsyncFileReader& reader_; + RandomAccessFileReader* file_; + const IOOptions& opts_; + FSReadRequest* read_reqs_; + size_t num_reqs_; + autovector<void*, 32> io_handle_; + autovector<IOHandleDeleter, 32> del_fn_; + folly::coro::impl::coroutine_handle<> awaiting_coro_; + // Use this to link to the next ReadAwaiter in the suspended coroutine + // list. The head and tail of the list are tracked by AsyncFileReader. + // We use this approach rather than an STL container in order to avoid + // extra memory allocations. The coroutine call already allocates a + // ReadAwaiter object. + ReadAwaiter* next_; + }; + + // An instance of ReadOperation is returned to the caller of MultiGetAsync. + // This represents an awaitable that can be started later. + template <typename Awaiter> + class ReadOperation { + public: + explicit ReadOperation(AsyncFileReader& reader, + RandomAccessFileReader* file, const IOOptions& opts, + FSReadRequest* read_reqs, size_t num_reqs, + AlignedBuf* aligned_buf) noexcept + : reader_(reader), + file_(file), + opts_(opts), + read_reqs_(read_reqs), + num_reqs_(num_reqs), + aligned_buf_(aligned_buf) {} + + auto viaIfAsync(folly::Executor::KeepAlive<> executor) const { + return folly::coro::co_viaIfAsync( + std::move(executor), + Awaiter{reader_, file_, opts_, read_reqs_, num_reqs_, aligned_buf_}); + } + + private: + AsyncFileReader& reader_; + RandomAccessFileReader* file_; + const IOOptions& opts_; + FSReadRequest* read_reqs_; + size_t num_reqs_; + AlignedBuf* aligned_buf_; + }; + + // This function does the actual work when this awaitable starts execution + bool MultiReadAsyncImpl(ReadAwaiter* awaiter); + + // Called by the SingleThreadExecutor to poll for async IO completion. + // This also resumes the awaiting coroutines. + void Wait(); + + // Head of the queue of awaiters waiting for async IO completion + ReadAwaiter* head_ = nullptr; + // Tail of the awaiter queue + ReadAwaiter* tail_ = nullptr; + // Total number of pending async IOs + size_t num_reqs_ = 0; + FileSystem* fs_; + Statistics* stats_; +}; +} // namespace ROCKSDB_NAMESPACE +#endif // USE_COROUTINES |