summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/rtc_base/task_queue_libevent.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/libwebrtc/rtc_base/task_queue_libevent.cc
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/rtc_base/task_queue_libevent.cc')
-rw-r--r--third_party/libwebrtc/rtc_base/task_queue_libevent.cc336
1 files changed, 336 insertions, 0 deletions
diff --git a/third_party/libwebrtc/rtc_base/task_queue_libevent.cc b/third_party/libwebrtc/rtc_base/task_queue_libevent.cc
new file mode 100644
index 0000000000..e27c6eff9f
--- /dev/null
+++ b/third_party/libwebrtc/rtc_base/task_queue_libevent.cc
@@ -0,0 +1,336 @@
+/*
+ * Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/task_queue_libevent.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdint.h>
+#include <time.h>
+#include <unistd.h>
+
+#include <list>
+#include <memory>
+#include <type_traits>
+#include <utility>
+
+#include "absl/container/inlined_vector.h"
+#include "absl/functional/any_invocable.h"
+#include "absl/strings/string_view.h"
+#include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/numerics/safe_conversions.h"
+#include "rtc_base/platform_thread.h"
+#include "rtc_base/platform_thread_types.h"
+#include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/thread_annotations.h"
+#include "rtc_base/time_utils.h"
+#include "third_party/libevent/event.h"
+
+namespace webrtc {
+namespace {
+constexpr char kQuit = 1;
+constexpr char kRunTasks = 2;
+
+using Priority = TaskQueueFactory::Priority;
+
+// This ignores the SIGPIPE signal on the calling thread.
+// This signal can be fired when trying to write() to a pipe that's being
+// closed or while closing a pipe that's being written to.
+// We can run into that situation so we ignore this signal and continue as
+// normal.
+// As a side note for this implementation, it would be great if we could safely
+// restore the sigmask, but unfortunately the operation of restoring it, can
+// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
+// The SIGPIPE signal by default causes the process to be terminated, so we
+// don't want to risk that.
+// An alternative to this approach is to ignore the signal for the whole
+// process:
+// signal(SIGPIPE, SIG_IGN);
+void IgnoreSigPipeSignalOnCurrentThread() {
+ sigset_t sigpipe_mask;
+ sigemptyset(&sigpipe_mask);
+ sigaddset(&sigpipe_mask, SIGPIPE);
+ pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
+}
+
+bool SetNonBlocking(int fd) {
+ const int flags = fcntl(fd, F_GETFL);
+ RTC_CHECK(flags != -1);
+ return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
+}
+
+// TODO(tommi): This is a hack to support two versions of libevent that we're
+// compatible with. The method we really want to call is event_assign(),
+// since event_set() has been marked as deprecated (and doesn't accept
+// passing event_base__ as a parameter). However, the version of libevent
+// that we have in Chromium, doesn't have event_assign(), so we need to call
+// event_set() there.
+void EventAssign(struct event* ev,
+ struct event_base* base,
+ int fd,
+ short events,
+ void (*callback)(int, short, void*),
+ void* arg) {
+#if defined(_EVENT2_EVENT_H_)
+ RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
+#else
+ event_set(ev, fd, events, callback, arg);
+ RTC_CHECK_EQ(0, event_base_set(base, ev));
+#endif
+}
+
+rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
+ switch (priority) {
+ case Priority::HIGH:
+ return rtc::ThreadPriority::kRealtime;
+ case Priority::LOW:
+ return rtc::ThreadPriority::kLow;
+ case Priority::NORMAL:
+ return rtc::ThreadPriority::kNormal;
+ }
+}
+
+class TaskQueueLibevent final : public TaskQueueBase {
+ public:
+ TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
+
+ void Delete() override;
+
+ protected:
+ void PostTaskImpl(absl::AnyInvocable<void() &&> task,
+ const PostTaskTraits& traits,
+ const Location& location) override;
+ void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay,
+ const PostDelayedTaskTraits& traits,
+ const Location& location) override;
+
+ private:
+ struct TimerEvent;
+
+ void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay);
+
+ ~TaskQueueLibevent() override = default;
+
+ static void OnWakeup(int socket, short flags, void* context); // NOLINT
+ static void RunTimer(int fd, short flags, void* context); // NOLINT
+
+ bool is_active_ = true;
+ int wakeup_pipe_in_ = -1;
+ int wakeup_pipe_out_ = -1;
+ event_base* event_base_;
+ event wakeup_event_;
+ rtc::PlatformThread thread_;
+ Mutex pending_lock_;
+ absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
+ RTC_GUARDED_BY(pending_lock_);
+ // Holds a list of events pending timers for cleanup when the loop exits.
+ std::list<TimerEvent*> pending_timers_;
+};
+
+struct TaskQueueLibevent::TimerEvent {
+ TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
+ : task_queue(task_queue), task(std::move(task)) {}
+ ~TimerEvent() { event_del(&ev); }
+
+ event ev;
+ TaskQueueLibevent* task_queue;
+ absl::AnyInvocable<void() &&> task;
+};
+
+TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
+ rtc::ThreadPriority priority)
+ : event_base_(event_base_new()) {
+ int fds[2];
+ RTC_CHECK(pipe(fds) == 0);
+ SetNonBlocking(fds[0]);
+ SetNonBlocking(fds[1]);
+ wakeup_pipe_out_ = fds[0];
+ wakeup_pipe_in_ = fds[1];
+
+ EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
+ EV_READ | EV_PERSIST, OnWakeup, this);
+ event_add(&wakeup_event_, 0);
+ thread_ = rtc::PlatformThread::SpawnJoinable(
+ [this] {
+ {
+ CurrentTaskQueueSetter set_current(this);
+ while (is_active_)
+ event_base_loop(event_base_, 0);
+
+ // Ensure remaining deleted tasks are destroyed with Current() set up
+ // to this task queue.
+ absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending;
+ MutexLock lock(&pending_lock_);
+ pending_.swap(pending);
+ }
+ for (TimerEvent* timer : pending_timers_)
+ delete timer;
+
+#if RTC_DCHECK_IS_ON
+ MutexLock lock(&pending_lock_);
+ RTC_DCHECK(pending_.empty());
+#endif
+ },
+ queue_name, rtc::ThreadAttributes().SetPriority(priority));
+}
+
+void TaskQueueLibevent::Delete() {
+ RTC_DCHECK(!IsCurrent());
+ struct timespec ts;
+ char message = kQuit;
+ while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
+ // The queue is full, so we have no choice but to wait and retry.
+ RTC_CHECK_EQ(EAGAIN, errno);
+ ts.tv_sec = 0;
+ ts.tv_nsec = 1000000;
+ nanosleep(&ts, nullptr);
+ }
+
+ thread_.Finalize();
+
+ event_del(&wakeup_event_);
+
+ IgnoreSigPipeSignalOnCurrentThread();
+
+ close(wakeup_pipe_in_);
+ close(wakeup_pipe_out_);
+ wakeup_pipe_in_ = -1;
+ wakeup_pipe_out_ = -1;
+
+ event_base_free(event_base_);
+ delete this;
+}
+
+void TaskQueueLibevent::PostTaskImpl(absl::AnyInvocable<void() &&> task,
+ const PostTaskTraits& traits,
+ const Location& location) {
+ {
+ MutexLock lock(&pending_lock_);
+ bool had_pending_tasks = !pending_.empty();
+ pending_.push_back(std::move(task));
+
+ // Only write to the pipe if there were no pending tasks before this one
+ // since the thread could be sleeping. If there were already pending tasks
+ // then we know there's either a pending write in the pipe or the thread has
+ // not yet processed the pending tasks. In either case, the thread will
+ // eventually wake up and process all pending tasks including this one.
+ if (had_pending_tasks) {
+ return;
+ }
+ }
+
+ // Note: This behvior outlined above ensures we never fill up the pipe write
+ // buffer since there will only ever be 1 byte pending.
+ char message = kRunTasks;
+ RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
+ sizeof(message));
+}
+
+void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
+ absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) {
+ // libevent api is not thread safe by default, thus event_add need to be
+ // called on the `thread_`.
+ RTC_DCHECK(IsCurrent());
+
+ TimerEvent* timer = new TimerEvent(this, std::move(task));
+ EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
+ timer);
+ pending_timers_.push_back(timer);
+ timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
+ .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
+ event_add(&timer->ev, &tv);
+}
+
+void TaskQueueLibevent::PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay,
+ const PostDelayedTaskTraits& traits,
+ const Location& location) {
+ if (IsCurrent()) {
+ PostDelayedTaskOnTaskQueue(std::move(task), delay);
+ } else {
+ int64_t posted_us = rtc::TimeMicros();
+ PostTask([posted_us, delay, task = std::move(task), this]() mutable {
+ // Compensate for the time that has passed since the posting.
+ TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
+ PostDelayedTaskOnTaskQueue(
+ std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
+ });
+ }
+}
+
+// static
+void TaskQueueLibevent::OnWakeup(int socket,
+ short flags, // NOLINT
+ void* context) {
+ TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
+ RTC_DCHECK(me->wakeup_pipe_out_ == socket);
+ char buf;
+ RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
+ switch (buf) {
+ case kQuit:
+ me->is_active_ = false;
+ event_base_loopbreak(me->event_base_);
+ break;
+ case kRunTasks: {
+ absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
+ {
+ MutexLock lock(&me->pending_lock_);
+ tasks.swap(me->pending_);
+ }
+ RTC_DCHECK(!tasks.empty());
+ for (auto& task : tasks) {
+ std::move(task)();
+ // Prefer to delete the `task` before running the next one.
+ task = nullptr;
+ }
+ break;
+ }
+ default:
+ RTC_DCHECK_NOTREACHED();
+ break;
+ }
+}
+
+// static
+void TaskQueueLibevent::RunTimer(int fd,
+ short flags, // NOLINT
+ void* context) {
+ TimerEvent* timer = static_cast<TimerEvent*>(context);
+ std::move(timer->task)();
+ timer->task_queue->pending_timers_.remove(timer);
+ delete timer;
+}
+
+class TaskQueueLibeventFactory final : public TaskQueueFactory {
+ public:
+ std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
+ absl::string_view name,
+ Priority priority) const override {
+ return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
+ new TaskQueueLibevent(name,
+ TaskQueuePriorityToThreadPriority(priority)));
+ }
+};
+
+} // namespace
+
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
+ return std::make_unique<TaskQueueLibeventFactory>();
+}
+
+} // namespace webrtc