diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/libwebrtc/rtc_base/task_queue_stdlib.cc | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/rtc_base/task_queue_stdlib.cc')
-rw-r--r-- | third_party/libwebrtc/rtc_base/task_queue_stdlib.cc | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/third_party/libwebrtc/rtc_base/task_queue_stdlib.cc b/third_party/libwebrtc/rtc_base/task_queue_stdlib.cc new file mode 100644 index 0000000000..f712cfa8c7 --- /dev/null +++ b/third_party/libwebrtc/rtc_base/task_queue_stdlib.cc @@ -0,0 +1,311 @@ +/* + * Copyright 2018 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/task_queue_stdlib.h" + +#include <string.h> + +#include <algorithm> +#include <map> +#include <memory> +#include <queue> +#include <utility> + +#include "absl/functional/any_invocable.h" +#include "absl/strings/string_view.h" +#include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/logging.h" +#include "rtc_base/numerics/divide_round.h" +#include "rtc_base/platform_thread.h" +#include "rtc_base/synchronization/mutex.h" +#include "rtc_base/thread_annotations.h" +#include "rtc_base/time_utils.h" + +namespace webrtc { +namespace { + +rtc::ThreadPriority TaskQueuePriorityToThreadPriority( + TaskQueueFactory::Priority priority) { + switch (priority) { + case TaskQueueFactory::Priority::HIGH: + return rtc::ThreadPriority::kRealtime; + case TaskQueueFactory::Priority::LOW: + return rtc::ThreadPriority::kLow; + case TaskQueueFactory::Priority::NORMAL: + return rtc::ThreadPriority::kNormal; + } +} + +class TaskQueueStdlib final : public TaskQueueBase { + public: + TaskQueueStdlib(absl::string_view queue_name, rtc::ThreadPriority priority); + ~TaskQueueStdlib() override = default; + + void Delete() override; + void PostTask(absl::AnyInvocable<void() &&> task) override; + void PostDelayedTask(absl::AnyInvocable<void() &&> task, + TimeDelta delay) override; + void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task, + TimeDelta delay) override; + + private: + using OrderId = uint64_t; + + struct DelayedEntryTimeout { + int64_t next_fire_at_us{}; + OrderId order{}; + + bool operator<(const DelayedEntryTimeout& o) const { + return std::tie(next_fire_at_us, order) < + std::tie(o.next_fire_at_us, o.order); + } + }; + + struct NextTask { + bool final_task = false; + absl::AnyInvocable<void() &&> run_task; + // TODO(bugs.webrtc.org/14366): While transitioning to TimeDelta, WebRTC and + // Chromium has a different idea about what type rtc::Event::kForever is. + // Code can't assume rtc::Event::kForever is the same type as timed wait + // arguments. + // Change `sleep_time_ms` to be explicit type, default value + // `rtc::Event::kForever` once transition is complete. + absl::optional<int64_t> sleep_time_ms; + }; + + static rtc::PlatformThread InitializeThread(TaskQueueStdlib* me, + absl::string_view queue_name, + rtc::ThreadPriority priority); + + NextTask GetNextTask(); + + void ProcessTasks(); + + void NotifyWake(); + + // Signaled whenever a new task is pending. + rtc::Event flag_notify_; + + Mutex pending_lock_; + + // Indicates if the worker thread needs to shutdown now. + bool thread_should_quit_ RTC_GUARDED_BY(pending_lock_) = false; + + // Holds the next order to use for the next task to be + // put into one of the pending queues. + OrderId thread_posting_order_ RTC_GUARDED_BY(pending_lock_) = 0; + + // The list of all pending tasks that need to be processed in the + // FIFO queue ordering on the worker thread. + std::queue<std::pair<OrderId, absl::AnyInvocable<void() &&>>> pending_queue_ + RTC_GUARDED_BY(pending_lock_); + + // The list of all pending tasks that need to be processed at a future + // time based upon a delay. On the off change the delayed task should + // happen at exactly the same time interval as another task then the + // task is processed based on FIFO ordering. std::priority_queue was + // considered but rejected due to its inability to extract the + // move-only value out of the queue without the presence of a hack. + std::map<DelayedEntryTimeout, absl::AnyInvocable<void() &&>> delayed_queue_ + RTC_GUARDED_BY(pending_lock_); + + // Contains the active worker thread assigned to processing + // tasks (including delayed tasks). + // Placing this last ensures the thread doesn't touch uninitialized attributes + // throughout it's lifetime. + rtc::PlatformThread thread_; +}; + +TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name, + rtc::ThreadPriority priority) + : flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false), + thread_(InitializeThread(this, queue_name, priority)) {} + +// static +rtc::PlatformThread TaskQueueStdlib::InitializeThread( + TaskQueueStdlib* me, + absl::string_view queue_name, + rtc::ThreadPriority priority) { + rtc::Event started; + auto thread = rtc::PlatformThread::SpawnJoinable( + [&started, me] { + CurrentTaskQueueSetter set_current(me); + started.Set(); + me->ProcessTasks(); + }, + queue_name, rtc::ThreadAttributes().SetPriority(priority)); + started.Wait(rtc::Event::kForever); + return thread; +} + +void TaskQueueStdlib::Delete() { + RTC_DCHECK(!IsCurrent()); + + { + MutexLock lock(&pending_lock_); + thread_should_quit_ = true; + } + + NotifyWake(); + + delete this; +} + +void TaskQueueStdlib::PostTask(absl::AnyInvocable<void() &&> task) { + { + MutexLock lock(&pending_lock_); + pending_queue_.push( + std::make_pair(++thread_posting_order_, std::move(task))); + } + + NotifyWake(); +} + +void TaskQueueStdlib::PostDelayedTask(absl::AnyInvocable<void() &&> task, + TimeDelta delay) { + DelayedEntryTimeout delayed_entry; + delayed_entry.next_fire_at_us = rtc::TimeMicros() + delay.us(); + + { + MutexLock lock(&pending_lock_); + delayed_entry.order = ++thread_posting_order_; + delayed_queue_[delayed_entry] = std::move(task); + } + + NotifyWake(); +} + +void TaskQueueStdlib::PostDelayedHighPrecisionTask( + absl::AnyInvocable<void() &&> task, + TimeDelta delay) { + PostDelayedTask(std::move(task), delay); +} + +TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() { + NextTask result; + + const int64_t tick_us = rtc::TimeMicros(); + + MutexLock lock(&pending_lock_); + + if (thread_should_quit_) { + result.final_task = true; + return result; + } + + if (delayed_queue_.size() > 0) { + auto delayed_entry = delayed_queue_.begin(); + const auto& delay_info = delayed_entry->first; + auto& delay_run = delayed_entry->second; + if (tick_us >= delay_info.next_fire_at_us) { + if (pending_queue_.size() > 0) { + auto& entry = pending_queue_.front(); + auto& entry_order = entry.first; + auto& entry_run = entry.second; + if (entry_order < delay_info.order) { + result.run_task = std::move(entry_run); + pending_queue_.pop(); + return result; + } + } + + result.run_task = std::move(delay_run); + delayed_queue_.erase(delayed_entry); + return result; + } + + result.sleep_time_ms = + DivideRoundUp(delay_info.next_fire_at_us - tick_us, 1'000); + } + + if (pending_queue_.size() > 0) { + auto& entry = pending_queue_.front(); + result.run_task = std::move(entry.second); + pending_queue_.pop(); + } + + return result; +} + +void TaskQueueStdlib::ProcessTasks() { + while (true) { + auto task = GetNextTask(); + + if (task.final_task) + break; + + if (task.run_task) { + // process entry immediately then try again + std::move(task.run_task)(); + + // Attempt to run more tasks before going to sleep. + continue; + } + + // TODO(bugs.webrtc.org/14366): While transitioning to TimeDelta, WebRTC and + // Chromium has a different idea about what type rtc::Event::kForever is. + // Code can't assume rtc::Event::kForever is the same type as timed wait + // arguments. + // Simplify after transitioning is complete. + if (task.sleep_time_ms.has_value()) + flag_notify_.Wait(task.sleep_time_ms.value()); + else + flag_notify_.Wait(rtc::Event::kForever); + } +} + +void TaskQueueStdlib::NotifyWake() { + // The queue holds pending tasks to complete. Either tasks are to be + // executed immediately or tasks are to be run at some future delayed time. + // For immediate tasks the task queue's thread is busy running the task and + // the thread will not be waiting on the flag_notify_ event. If no immediate + // tasks are available but a delayed task is pending then the thread will be + // waiting on flag_notify_ with a delayed time-out of the nearest timed task + // to run. If no immediate or pending tasks are available, the thread will + // wait on flag_notify_ until signaled that a task has been added (or the + // thread to be told to shutdown). + + // In all cases, when a new immediate task, delayed task, or request to + // shutdown the thread is added the flag_notify_ is signaled after. If the + // thread was waiting then the thread will wake up immediately and re-assess + // what task needs to be run next (i.e. run a task now, wait for the nearest + // timed delayed task, or shutdown the thread). If the thread was not waiting + // then the thread will remained signaled to wake up the next time any + // attempt to wait on the flag_notify_ event occurs. + + // Any immediate or delayed pending task (or request to shutdown the thread) + // must always be added to the queue prior to signaling flag_notify_ to wake + // up the possibly sleeping thread. This prevents a race condition where the + // thread is notified to wake up but the task queue's thread finds nothing to + // do so it waits once again to be signaled where such a signal may never + // happen. + flag_notify_.Set(); +} + +class TaskQueueStdlibFactory final : public TaskQueueFactory { + public: + std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue( + absl::string_view name, + Priority priority) const override { + return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>( + new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority))); + } +}; + +} // namespace + +std::unique_ptr<TaskQueueFactory> CreateTaskQueueStdlibFactory() { + return std::make_unique<TaskQueueStdlibFactory>(); +} + +} // namespace webrtc |