diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/util/threadpool_imp.cc | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/util/threadpool_imp.cc | 551 |
1 files changed, 551 insertions, 0 deletions
diff --git a/src/rocksdb/util/threadpool_imp.cc b/src/rocksdb/util/threadpool_imp.cc new file mode 100644 index 000000000..09706cac5 --- /dev/null +++ b/src/rocksdb/util/threadpool_imp.cc @@ -0,0 +1,551 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/threadpool_imp.h" + +#ifndef OS_WIN +#include <unistd.h> +#endif + +#ifdef OS_LINUX +#include <sys/resource.h> +#include <sys/syscall.h> +#endif + +#include <stdlib.h> + +#include <algorithm> +#include <atomic> +#include <condition_variable> +#include <deque> +#include <mutex> +#include <sstream> +#include <thread> +#include <vector> + +#include "monitoring/thread_status_util.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +void ThreadPoolImpl::PthreadCall(const char* label, int result) { + if (result != 0) { + fprintf(stderr, "pthread %s: %s\n", label, errnoStr(result).c_str()); + abort(); + } +} + +struct ThreadPoolImpl::Impl { + Impl(); + ~Impl(); + + void JoinThreads(bool wait_for_jobs_to_complete); + + void SetBackgroundThreadsInternal(int num, bool allow_reduce); + int GetBackgroundThreads(); + + unsigned int GetQueueLen() const { + return queue_len_.load(std::memory_order_relaxed); + } + + void LowerIOPriority(); + + void LowerCPUPriority(CpuPriority pri); + + void WakeUpAllThreads() { bgsignal_.notify_all(); } + + void BGThread(size_t thread_id); + + void StartBGThreads(); + + void Submit(std::function<void()>&& schedule, + std::function<void()>&& unschedule, void* tag); + + int UnSchedule(void* arg); + + void SetHostEnv(Env* env) { env_ = env; } + + Env* GetHostEnv() const { return env_; } + + bool HasExcessiveThread() const { + return static_cast<int>(bgthreads_.size()) > total_threads_limit_; + } + + // Return true iff the current thread is the excessive thread to terminate. + // Always terminate the running thread that is added last, even if there are + // more than one thread to terminate. + bool IsLastExcessiveThread(size_t thread_id) const { + return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; + } + + bool IsExcessiveThread(size_t thread_id) const { + return static_cast<int>(thread_id) >= total_threads_limit_; + } + + // Return the thread priority. + // This would allow its member-thread to know its priority. + Env::Priority GetThreadPriority() const { return priority_; } + + // Set the thread priority. + void SetThreadPriority(Env::Priority priority) { priority_ = priority; } + + int ReserveThreads(int threads_to_be_reserved) { + std::unique_lock<std::mutex> lock(mu_); + // We can reserve at most num_waiting_threads_ in total so the number of + // threads that can be reserved might be fewer than the desired one. In + // rare cases, num_waiting_threads_ could be less than reserved_threads + // due to SetBackgroundThreadInternal or last excessive threads. If that + // happens, we cannot reserve any other threads. + int reserved_threads_in_success = + std::min(std::max(num_waiting_threads_ - reserved_threads_, 0), + threads_to_be_reserved); + reserved_threads_ += reserved_threads_in_success; + return reserved_threads_in_success; + } + + int ReleaseThreads(int threads_to_be_released) { + std::unique_lock<std::mutex> lock(mu_); + // We cannot release more than reserved_threads_ + int released_threads_in_success = + std::min(reserved_threads_, threads_to_be_released); + reserved_threads_ -= released_threads_in_success; + WakeUpAllThreads(); + return released_threads_in_success; + } + + private: + static void BGThreadWrapper(void* arg); + + bool low_io_priority_; + CpuPriority cpu_priority_; + Env::Priority priority_; + Env* env_; + + int total_threads_limit_; + std::atomic_uint queue_len_; // Queue length. Used for stats reporting + // Number of reserved threads, managed by ReserveThreads(..) and + // ReleaseThreads(..), if num_waiting_threads_ is no larger than + // reserved_threads_, its thread will be blocked to ensure the reservation + // mechanism + int reserved_threads_; + // Number of waiting threads (Maximum number of threads that can be + // reserved), in rare cases, num_waiting_threads_ could be less than + // reserved_threads due to SetBackgroundThreadInternal or last + // excessive threads. + int num_waiting_threads_; + bool exit_all_threads_; + bool wait_for_jobs_to_complete_; + + // Entry per Schedule()/Submit() call + struct BGItem { + void* tag = nullptr; + std::function<void()> function; + std::function<void()> unschedFunction; + }; + + using BGQueue = std::deque<BGItem>; + BGQueue queue_; + + std::mutex mu_; + std::condition_variable bgsignal_; + std::vector<port::Thread> bgthreads_; +}; + +inline ThreadPoolImpl::Impl::Impl() + : low_io_priority_(false), + cpu_priority_(CpuPriority::kNormal), + priority_(Env::LOW), + env_(nullptr), + total_threads_limit_(0), + queue_len_(), + reserved_threads_(0), + num_waiting_threads_(0), + exit_all_threads_(false), + wait_for_jobs_to_complete_(false), + queue_(), + mu_(), + bgsignal_(), + bgthreads_() {} + +inline ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); } + +void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { + std::unique_lock<std::mutex> lock(mu_); + assert(!exit_all_threads_); + + wait_for_jobs_to_complete_ = wait_for_jobs_to_complete; + exit_all_threads_ = true; + // prevent threads from being recreated right after they're joined, in case + // the user is concurrently submitting jobs. + total_threads_limit_ = 0; + reserved_threads_ = 0; + num_waiting_threads_ = 0; + + lock.unlock(); + + bgsignal_.notify_all(); + + for (auto& th : bgthreads_) { + th.join(); + } + + bgthreads_.clear(); + + exit_all_threads_ = false; + wait_for_jobs_to_complete_ = false; +} + +inline void ThreadPoolImpl::Impl::LowerIOPriority() { + std::lock_guard<std::mutex> lock(mu_); + low_io_priority_ = true; +} + +inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) { + std::lock_guard<std::mutex> lock(mu_); + cpu_priority_ = pri; +} + +void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { + bool low_io_priority = false; + CpuPriority current_cpu_priority = CpuPriority::kNormal; + + while (true) { + // Wait until there is an item that is ready to run + std::unique_lock<std::mutex> lock(mu_); + // Stop waiting if the thread needs to do work or needs to terminate. + // Increase num_waiting_threads_ once this task has started waiting + num_waiting_threads_++; + + TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc"); + TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id); + // When not exist_all_threads and the current thread id is not the last + // excessive thread, it may be blocked due to 3 reasons: 1) queue is empty + // 2) it is the excessive thread (not the last one) + // 3) the number of waiting threads is not greater than reserved threads + // (i.e, no available threads due to full reservation") + while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && + (queue_.empty() || IsExcessiveThread(thread_id) || + num_waiting_threads_ <= reserved_threads_)) { + bgsignal_.wait(lock); + } + // Decrease num_waiting_threads_ once the thread is not waiting + num_waiting_threads_--; + + if (exit_all_threads_) { // mechanism to let BG threads exit safely + + if (!wait_for_jobs_to_complete_ || queue_.empty()) { + break; + } + } else if (IsLastExcessiveThread(thread_id)) { + // Current thread is the last generated one and is excessive. + // We always terminate excessive thread in the reverse order of + // generation time. But not when `exit_all_threads_ == true`, + // otherwise `JoinThreads()` could try to `join()` a `detach()`ed + // thread. + auto& terminating_thread = bgthreads_.back(); + terminating_thread.detach(); + bgthreads_.pop_back(); + if (HasExcessiveThread()) { + // There is still at least more excessive thread to terminate. + WakeUpAllThreads(); + } + TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th", + thread_id); + TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination"); + break; + } + + auto func = std::move(queue_.front().function); + queue_.pop_front(); + + queue_len_.store(static_cast<unsigned int>(queue_.size()), + std::memory_order_relaxed); + + bool decrease_io_priority = (low_io_priority != low_io_priority_); + CpuPriority cpu_priority = cpu_priority_; + lock.unlock(); + + if (cpu_priority < current_cpu_priority) { + TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority", + ¤t_cpu_priority); + // 0 means current thread. + port::SetCpuPriority(0, cpu_priority); + current_cpu_priority = cpu_priority; + TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority", + ¤t_cpu_priority); + } + +#ifdef OS_LINUX + if (decrease_io_priority) { +#define IOPRIO_CLASS_SHIFT (13) +#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) + // Put schedule into IOPRIO_CLASS_IDLE class (lowest) + // These system calls only have an effect when used in conjunction + // with an I/O scheduler that supports I/O priorities. As at + // kernel 2.6.17 the only such scheduler is the Completely + // Fair Queuing (CFQ) I/O scheduler. + // To change scheduler: + // echo cfq > /sys/block/<device_name>/queue/schedule + // Tunables to consider: + // /sys/block/<device_name>/queue/slice_idle + // /sys/block/<device_name>/queue/slice_sync + syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS + 0, // current thread + IOPRIO_PRIO_VALUE(3, 0)); + low_io_priority = true; + } +#else + (void)decrease_io_priority; // avoid 'unused variable' error +#endif + + TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun", + &priority_); + + func(); + } +} + +// Helper struct for passing arguments when creating threads. +struct BGThreadMetadata { + ThreadPoolImpl::Impl* thread_pool_; + size_t thread_id_; // Thread count in the thread. + BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id) + : thread_pool_(thread_pool), thread_id_(thread_id) {} +}; + +void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) { + BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg); + size_t thread_id = meta->thread_id_; + ThreadPoolImpl::Impl* tp = meta->thread_pool_; +#ifdef ROCKSDB_USING_THREAD_STATUS + // initialize it because compiler isn't good enough to see we don't use it + // uninitialized + ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES; + switch (tp->GetThreadPriority()) { + case Env::Priority::HIGH: + thread_type = ThreadStatus::HIGH_PRIORITY; + break; + case Env::Priority::LOW: + thread_type = ThreadStatus::LOW_PRIORITY; + break; + case Env::Priority::BOTTOM: + thread_type = ThreadStatus::BOTTOM_PRIORITY; + break; + case Env::Priority::USER: + thread_type = ThreadStatus::USER; + break; + case Env::Priority::TOTAL: + assert(false); + return; + } + assert(thread_type != ThreadStatus::NUM_THREAD_TYPES); + ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type); +#endif + delete meta; + tp->BGThread(thread_id); +#ifdef ROCKSDB_USING_THREAD_STATUS + ThreadStatusUtil::UnregisterThread(); +#endif + return; +} + +void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num, + bool allow_reduce) { + std::lock_guard<std::mutex> lock(mu_); + if (exit_all_threads_) { + return; + } + if (num > total_threads_limit_ || + (num < total_threads_limit_ && allow_reduce)) { + total_threads_limit_ = std::max(0, num); + WakeUpAllThreads(); + StartBGThreads(); + } +} + +int ThreadPoolImpl::Impl::GetBackgroundThreads() { + std::unique_lock<std::mutex> lock(mu_); + return total_threads_limit_; +} + +void ThreadPoolImpl::Impl::StartBGThreads() { + // Start background thread if necessary + while ((int)bgthreads_.size() < total_threads_limit_) { + port::Thread p_t(&BGThreadWrapper, + new BGThreadMetadata(this, bgthreads_.size())); + +// Set the thread name to aid debugging +#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) +#if __GLIBC_PREREQ(2, 12) + auto th_handle = p_t.native_handle(); + std::string thread_priority = Env::PriorityToString(GetThreadPriority()); + std::ostringstream thread_name_stream; + thread_name_stream << "rocksdb:"; + for (char c : thread_priority) { + thread_name_stream << static_cast<char>(tolower(c)); + } + pthread_setname_np(th_handle, thread_name_stream.str().c_str()); +#endif +#endif + bgthreads_.push_back(std::move(p_t)); + } +} + +void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule, + std::function<void()>&& unschedule, + void* tag) { + std::lock_guard<std::mutex> lock(mu_); + + if (exit_all_threads_) { + return; + } + + StartBGThreads(); + + // Add to priority queue + queue_.push_back(BGItem()); + TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue"); + auto& item = queue_.back(); + item.tag = tag; + item.function = std::move(schedule); + item.unschedFunction = std::move(unschedule); + + queue_len_.store(static_cast<unsigned int>(queue_.size()), + std::memory_order_relaxed); + + if (!HasExcessiveThread()) { + // Wake up at least one waiting thread. + bgsignal_.notify_one(); + } else { + // Need to wake up all threads to make sure the one woken + // up is not the one to terminate. + WakeUpAllThreads(); + } +} + +int ThreadPoolImpl::Impl::UnSchedule(void* arg) { + int count = 0; + + std::vector<std::function<void()>> candidates; + { + std::lock_guard<std::mutex> lock(mu_); + + // Remove from priority queue + BGQueue::iterator it = queue_.begin(); + while (it != queue_.end()) { + if (arg == (*it).tag) { + if (it->unschedFunction) { + candidates.push_back(std::move(it->unschedFunction)); + } + it = queue_.erase(it); + count++; + } else { + ++it; + } + } + queue_len_.store(static_cast<unsigned int>(queue_.size()), + std::memory_order_relaxed); + } + + // Run unschedule functions outside the mutex + for (auto& f : candidates) { + f(); + } + + return count; +} + +ThreadPoolImpl::ThreadPoolImpl() : impl_(new Impl()) {} + +ThreadPoolImpl::~ThreadPoolImpl() {} + +void ThreadPoolImpl::JoinAllThreads() { impl_->JoinThreads(false); } + +void ThreadPoolImpl::SetBackgroundThreads(int num) { + impl_->SetBackgroundThreadsInternal(num, true); +} + +int ThreadPoolImpl::GetBackgroundThreads() { + return impl_->GetBackgroundThreads(); +} + +unsigned int ThreadPoolImpl::GetQueueLen() const { + return impl_->GetQueueLen(); +} + +void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() { + impl_->JoinThreads(true); +} + +void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); } + +void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) { + impl_->LowerCPUPriority(pri); +} + +void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { + impl_->SetBackgroundThreadsInternal(num, false); +} + +void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) { + auto copy(job); + impl_->Submit(std::move(copy), std::function<void()>(), nullptr); +} + +void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) { + impl_->Submit(std::move(job), std::function<void()>(), nullptr); +} + +void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg, + void* tag, void (*unschedFunction)(void* arg)) { + if (unschedFunction == nullptr) { + impl_->Submit(std::bind(function, arg), std::function<void()>(), tag); + } else { + impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg), + tag); + } +} + +int ThreadPoolImpl::UnSchedule(void* arg) { return impl_->UnSchedule(arg); } + +void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); } + +Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); } + +// Return the thread priority. +// This would allow its member-thread to know its priority. +Env::Priority ThreadPoolImpl::GetThreadPriority() const { + return impl_->GetThreadPriority(); +} + +// Set the thread priority. +void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) { + impl_->SetThreadPriority(priority); +} + +// Reserve a specific number of threads, prevent them from running other +// functions The number of reserved threads could be fewer than the desired one +int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) { + return impl_->ReserveThreads(threads_to_be_reserved); +} + +// Release a specific number of threads +int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) { + return impl_->ReleaseThreads(threads_to_be_released); +} + +ThreadPool* NewThreadPool(int num_threads) { + ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); + thread_pool->SetBackgroundThreads(num_threads); + return thread_pool; +} + +} // namespace ROCKSDB_NAMESPACE |