summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/threadpool_imp.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/util/threadpool_imp.cc551
1 files changed, 551 insertions, 0 deletions
diff --git a/src/rocksdb/util/threadpool_imp.cc b/src/rocksdb/util/threadpool_imp.cc
new file mode 100644
index 000000000..09706cac5
--- /dev/null
+++ b/src/rocksdb/util/threadpool_imp.cc
@@ -0,0 +1,551 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "util/threadpool_imp.h"
+
+#ifndef OS_WIN
+#include <unistd.h>
+#endif
+
+#ifdef OS_LINUX
+#include <sys/resource.h>
+#include <sys/syscall.h>
+#endif
+
+#include <stdlib.h>
+
+#include <algorithm>
+#include <atomic>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <vector>
+
+#include "monitoring/thread_status_util.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+void ThreadPoolImpl::PthreadCall(const char* label, int result) {
+ if (result != 0) {
+ fprintf(stderr, "pthread %s: %s\n", label, errnoStr(result).c_str());
+ abort();
+ }
+}
+
+struct ThreadPoolImpl::Impl {
+ Impl();
+ ~Impl();
+
+ void JoinThreads(bool wait_for_jobs_to_complete);
+
+ void SetBackgroundThreadsInternal(int num, bool allow_reduce);
+ int GetBackgroundThreads();
+
+ unsigned int GetQueueLen() const {
+ return queue_len_.load(std::memory_order_relaxed);
+ }
+
+ void LowerIOPriority();
+
+ void LowerCPUPriority(CpuPriority pri);
+
+ void WakeUpAllThreads() { bgsignal_.notify_all(); }
+
+ void BGThread(size_t thread_id);
+
+ void StartBGThreads();
+
+ void Submit(std::function<void()>&& schedule,
+ std::function<void()>&& unschedule, void* tag);
+
+ int UnSchedule(void* arg);
+
+ void SetHostEnv(Env* env) { env_ = env; }
+
+ Env* GetHostEnv() const { return env_; }
+
+ bool HasExcessiveThread() const {
+ return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
+ }
+
+ // Return true iff the current thread is the excessive thread to terminate.
+ // Always terminate the running thread that is added last, even if there are
+ // more than one thread to terminate.
+ bool IsLastExcessiveThread(size_t thread_id) const {
+ return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
+ }
+
+ bool IsExcessiveThread(size_t thread_id) const {
+ return static_cast<int>(thread_id) >= total_threads_limit_;
+ }
+
+ // Return the thread priority.
+ // This would allow its member-thread to know its priority.
+ Env::Priority GetThreadPriority() const { return priority_; }
+
+ // Set the thread priority.
+ void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
+
+ int ReserveThreads(int threads_to_be_reserved) {
+ std::unique_lock<std::mutex> lock(mu_);
+ // We can reserve at most num_waiting_threads_ in total so the number of
+ // threads that can be reserved might be fewer than the desired one. In
+ // rare cases, num_waiting_threads_ could be less than reserved_threads
+ // due to SetBackgroundThreadInternal or last excessive threads. If that
+ // happens, we cannot reserve any other threads.
+ int reserved_threads_in_success =
+ std::min(std::max(num_waiting_threads_ - reserved_threads_, 0),
+ threads_to_be_reserved);
+ reserved_threads_ += reserved_threads_in_success;
+ return reserved_threads_in_success;
+ }
+
+ int ReleaseThreads(int threads_to_be_released) {
+ std::unique_lock<std::mutex> lock(mu_);
+ // We cannot release more than reserved_threads_
+ int released_threads_in_success =
+ std::min(reserved_threads_, threads_to_be_released);
+ reserved_threads_ -= released_threads_in_success;
+ WakeUpAllThreads();
+ return released_threads_in_success;
+ }
+
+ private:
+ static void BGThreadWrapper(void* arg);
+
+ bool low_io_priority_;
+ CpuPriority cpu_priority_;
+ Env::Priority priority_;
+ Env* env_;
+
+ int total_threads_limit_;
+ std::atomic_uint queue_len_; // Queue length. Used for stats reporting
+ // Number of reserved threads, managed by ReserveThreads(..) and
+ // ReleaseThreads(..), if num_waiting_threads_ is no larger than
+ // reserved_threads_, its thread will be blocked to ensure the reservation
+ // mechanism
+ int reserved_threads_;
+ // Number of waiting threads (Maximum number of threads that can be
+ // reserved), in rare cases, num_waiting_threads_ could be less than
+ // reserved_threads due to SetBackgroundThreadInternal or last
+ // excessive threads.
+ int num_waiting_threads_;
+ bool exit_all_threads_;
+ bool wait_for_jobs_to_complete_;
+
+ // Entry per Schedule()/Submit() call
+ struct BGItem {
+ void* tag = nullptr;
+ std::function<void()> function;
+ std::function<void()> unschedFunction;
+ };
+
+ using BGQueue = std::deque<BGItem>;
+ BGQueue queue_;
+
+ std::mutex mu_;
+ std::condition_variable bgsignal_;
+ std::vector<port::Thread> bgthreads_;
+};
+
+inline ThreadPoolImpl::Impl::Impl()
+ : low_io_priority_(false),
+ cpu_priority_(CpuPriority::kNormal),
+ priority_(Env::LOW),
+ env_(nullptr),
+ total_threads_limit_(0),
+ queue_len_(),
+ reserved_threads_(0),
+ num_waiting_threads_(0),
+ exit_all_threads_(false),
+ wait_for_jobs_to_complete_(false),
+ queue_(),
+ mu_(),
+ bgsignal_(),
+ bgthreads_() {}
+
+inline ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
+
+void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
+ std::unique_lock<std::mutex> lock(mu_);
+ assert(!exit_all_threads_);
+
+ wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
+ exit_all_threads_ = true;
+ // prevent threads from being recreated right after they're joined, in case
+ // the user is concurrently submitting jobs.
+ total_threads_limit_ = 0;
+ reserved_threads_ = 0;
+ num_waiting_threads_ = 0;
+
+ lock.unlock();
+
+ bgsignal_.notify_all();
+
+ for (auto& th : bgthreads_) {
+ th.join();
+ }
+
+ bgthreads_.clear();
+
+ exit_all_threads_ = false;
+ wait_for_jobs_to_complete_ = false;
+}
+
+inline void ThreadPoolImpl::Impl::LowerIOPriority() {
+ std::lock_guard<std::mutex> lock(mu_);
+ low_io_priority_ = true;
+}
+
+inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) {
+ std::lock_guard<std::mutex> lock(mu_);
+ cpu_priority_ = pri;
+}
+
+void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
+ bool low_io_priority = false;
+ CpuPriority current_cpu_priority = CpuPriority::kNormal;
+
+ while (true) {
+ // Wait until there is an item that is ready to run
+ std::unique_lock<std::mutex> lock(mu_);
+ // Stop waiting if the thread needs to do work or needs to terminate.
+ // Increase num_waiting_threads_ once this task has started waiting
+ num_waiting_threads_++;
+
+ TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc");
+ TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id);
+ // When not exist_all_threads and the current thread id is not the last
+ // excessive thread, it may be blocked due to 3 reasons: 1) queue is empty
+ // 2) it is the excessive thread (not the last one)
+ // 3) the number of waiting threads is not greater than reserved threads
+ // (i.e, no available threads due to full reservation")
+ while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
+ (queue_.empty() || IsExcessiveThread(thread_id) ||
+ num_waiting_threads_ <= reserved_threads_)) {
+ bgsignal_.wait(lock);
+ }
+ // Decrease num_waiting_threads_ once the thread is not waiting
+ num_waiting_threads_--;
+
+ if (exit_all_threads_) { // mechanism to let BG threads exit safely
+
+ if (!wait_for_jobs_to_complete_ || queue_.empty()) {
+ break;
+ }
+ } else if (IsLastExcessiveThread(thread_id)) {
+ // Current thread is the last generated one and is excessive.
+ // We always terminate excessive thread in the reverse order of
+ // generation time. But not when `exit_all_threads_ == true`,
+ // otherwise `JoinThreads()` could try to `join()` a `detach()`ed
+ // thread.
+ auto& terminating_thread = bgthreads_.back();
+ terminating_thread.detach();
+ bgthreads_.pop_back();
+ if (HasExcessiveThread()) {
+ // There is still at least more excessive thread to terminate.
+ WakeUpAllThreads();
+ }
+ TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th",
+ thread_id);
+ TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination");
+ break;
+ }
+
+ auto func = std::move(queue_.front().function);
+ queue_.pop_front();
+
+ queue_len_.store(static_cast<unsigned int>(queue_.size()),
+ std::memory_order_relaxed);
+
+ bool decrease_io_priority = (low_io_priority != low_io_priority_);
+ CpuPriority cpu_priority = cpu_priority_;
+ lock.unlock();
+
+ if (cpu_priority < current_cpu_priority) {
+ TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority",
+ &current_cpu_priority);
+ // 0 means current thread.
+ port::SetCpuPriority(0, cpu_priority);
+ current_cpu_priority = cpu_priority;
+ TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority",
+ &current_cpu_priority);
+ }
+
+#ifdef OS_LINUX
+ if (decrease_io_priority) {
+#define IOPRIO_CLASS_SHIFT (13)
+#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
+ // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
+ // These system calls only have an effect when used in conjunction
+ // with an I/O scheduler that supports I/O priorities. As at
+ // kernel 2.6.17 the only such scheduler is the Completely
+ // Fair Queuing (CFQ) I/O scheduler.
+ // To change scheduler:
+ // echo cfq > /sys/block/<device_name>/queue/schedule
+ // Tunables to consider:
+ // /sys/block/<device_name>/queue/slice_idle
+ // /sys/block/<device_name>/queue/slice_sync
+ syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
+ 0, // current thread
+ IOPRIO_PRIO_VALUE(3, 0));
+ low_io_priority = true;
+ }
+#else
+ (void)decrease_io_priority; // avoid 'unused variable' error
+#endif
+
+ TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun",
+ &priority_);
+
+ func();
+ }
+}
+
+// Helper struct for passing arguments when creating threads.
+struct BGThreadMetadata {
+ ThreadPoolImpl::Impl* thread_pool_;
+ size_t thread_id_; // Thread count in the thread.
+ BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
+ : thread_pool_(thread_pool), thread_id_(thread_id) {}
+};
+
+void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
+ BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
+ size_t thread_id = meta->thread_id_;
+ ThreadPoolImpl::Impl* tp = meta->thread_pool_;
+#ifdef ROCKSDB_USING_THREAD_STATUS
+ // initialize it because compiler isn't good enough to see we don't use it
+ // uninitialized
+ ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
+ switch (tp->GetThreadPriority()) {
+ case Env::Priority::HIGH:
+ thread_type = ThreadStatus::HIGH_PRIORITY;
+ break;
+ case Env::Priority::LOW:
+ thread_type = ThreadStatus::LOW_PRIORITY;
+ break;
+ case Env::Priority::BOTTOM:
+ thread_type = ThreadStatus::BOTTOM_PRIORITY;
+ break;
+ case Env::Priority::USER:
+ thread_type = ThreadStatus::USER;
+ break;
+ case Env::Priority::TOTAL:
+ assert(false);
+ return;
+ }
+ assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
+ ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
+#endif
+ delete meta;
+ tp->BGThread(thread_id);
+#ifdef ROCKSDB_USING_THREAD_STATUS
+ ThreadStatusUtil::UnregisterThread();
+#endif
+ return;
+}
+
+void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
+ bool allow_reduce) {
+ std::lock_guard<std::mutex> lock(mu_);
+ if (exit_all_threads_) {
+ return;
+ }
+ if (num > total_threads_limit_ ||
+ (num < total_threads_limit_ && allow_reduce)) {
+ total_threads_limit_ = std::max(0, num);
+ WakeUpAllThreads();
+ StartBGThreads();
+ }
+}
+
+int ThreadPoolImpl::Impl::GetBackgroundThreads() {
+ std::unique_lock<std::mutex> lock(mu_);
+ return total_threads_limit_;
+}
+
+void ThreadPoolImpl::Impl::StartBGThreads() {
+ // Start background thread if necessary
+ while ((int)bgthreads_.size() < total_threads_limit_) {
+ port::Thread p_t(&BGThreadWrapper,
+ new BGThreadMetadata(this, bgthreads_.size()));
+
+// Set the thread name to aid debugging
+#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
+#if __GLIBC_PREREQ(2, 12)
+ auto th_handle = p_t.native_handle();
+ std::string thread_priority = Env::PriorityToString(GetThreadPriority());
+ std::ostringstream thread_name_stream;
+ thread_name_stream << "rocksdb:";
+ for (char c : thread_priority) {
+ thread_name_stream << static_cast<char>(tolower(c));
+ }
+ pthread_setname_np(th_handle, thread_name_stream.str().c_str());
+#endif
+#endif
+ bgthreads_.push_back(std::move(p_t));
+ }
+}
+
+void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
+ std::function<void()>&& unschedule,
+ void* tag) {
+ std::lock_guard<std::mutex> lock(mu_);
+
+ if (exit_all_threads_) {
+ return;
+ }
+
+ StartBGThreads();
+
+ // Add to priority queue
+ queue_.push_back(BGItem());
+ TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue");
+ auto& item = queue_.back();
+ item.tag = tag;
+ item.function = std::move(schedule);
+ item.unschedFunction = std::move(unschedule);
+
+ queue_len_.store(static_cast<unsigned int>(queue_.size()),
+ std::memory_order_relaxed);
+
+ if (!HasExcessiveThread()) {
+ // Wake up at least one waiting thread.
+ bgsignal_.notify_one();
+ } else {
+ // Need to wake up all threads to make sure the one woken
+ // up is not the one to terminate.
+ WakeUpAllThreads();
+ }
+}
+
+int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
+ int count = 0;
+
+ std::vector<std::function<void()>> candidates;
+ {
+ std::lock_guard<std::mutex> lock(mu_);
+
+ // Remove from priority queue
+ BGQueue::iterator it = queue_.begin();
+ while (it != queue_.end()) {
+ if (arg == (*it).tag) {
+ if (it->unschedFunction) {
+ candidates.push_back(std::move(it->unschedFunction));
+ }
+ it = queue_.erase(it);
+ count++;
+ } else {
+ ++it;
+ }
+ }
+ queue_len_.store(static_cast<unsigned int>(queue_.size()),
+ std::memory_order_relaxed);
+ }
+
+ // Run unschedule functions outside the mutex
+ for (auto& f : candidates) {
+ f();
+ }
+
+ return count;
+}
+
+ThreadPoolImpl::ThreadPoolImpl() : impl_(new Impl()) {}
+
+ThreadPoolImpl::~ThreadPoolImpl() {}
+
+void ThreadPoolImpl::JoinAllThreads() { impl_->JoinThreads(false); }
+
+void ThreadPoolImpl::SetBackgroundThreads(int num) {
+ impl_->SetBackgroundThreadsInternal(num, true);
+}
+
+int ThreadPoolImpl::GetBackgroundThreads() {
+ return impl_->GetBackgroundThreads();
+}
+
+unsigned int ThreadPoolImpl::GetQueueLen() const {
+ return impl_->GetQueueLen();
+}
+
+void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
+ impl_->JoinThreads(true);
+}
+
+void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); }
+
+void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) {
+ impl_->LowerCPUPriority(pri);
+}
+
+void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
+ impl_->SetBackgroundThreadsInternal(num, false);
+}
+
+void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
+ auto copy(job);
+ impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
+}
+
+void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
+ impl_->Submit(std::move(job), std::function<void()>(), nullptr);
+}
+
+void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg,
+ void* tag, void (*unschedFunction)(void* arg)) {
+ if (unschedFunction == nullptr) {
+ impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);
+ } else {
+ impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),
+ tag);
+ }
+}
+
+int ThreadPoolImpl::UnSchedule(void* arg) { return impl_->UnSchedule(arg); }
+
+void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
+
+Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
+
+// Return the thread priority.
+// This would allow its member-thread to know its priority.
+Env::Priority ThreadPoolImpl::GetThreadPriority() const {
+ return impl_->GetThreadPriority();
+}
+
+// Set the thread priority.
+void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
+ impl_->SetThreadPriority(priority);
+}
+
+// Reserve a specific number of threads, prevent them from running other
+// functions The number of reserved threads could be fewer than the desired one
+int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) {
+ return impl_->ReserveThreads(threads_to_be_reserved);
+}
+
+// Release a specific number of threads
+int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) {
+ return impl_->ReleaseThreads(threads_to_be_released);
+}
+
+ThreadPool* NewThreadPool(int num_threads) {
+ ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
+ thread_pool->SetBackgroundThreads(num_threads);
+ return thread_pool;
+}
+
+} // namespace ROCKSDB_NAMESPACE