diff options
Diffstat (limited to 'third_party/libwebrtc/rtc_base/task_utils')
5 files changed, 954 insertions, 0 deletions
diff --git a/third_party/libwebrtc/rtc_base/task_utils/BUILD.gn b/third_party/libwebrtc/rtc_base/task_utils/BUILD.gn new file mode 100644 index 0000000000..5fcf25ef0b --- /dev/null +++ b/third_party/libwebrtc/rtc_base/task_utils/BUILD.gn @@ -0,0 +1,47 @@ +# Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +import("../../webrtc.gni") + +rtc_library("repeating_task") { + sources = [ + "repeating_task.cc", + "repeating_task.h", + ] + deps = [ + "..:logging", + "..:timeutils", + "../../api:sequence_checker", + "../../api/task_queue", + "../../api/task_queue:pending_task_safety_flag", + "../../api/units:time_delta", + "../../api/units:timestamp", + "../../system_wrappers:system_wrappers", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/functional:any_invocable" ] +} + +if (rtc_include_tests) { + rtc_library("repeating_task_unittests") { + testonly = true + sources = [ "repeating_task_unittest.cc" ] + deps = [ + ":repeating_task", + "..:rtc_event", + "..:rtc_task_queue", + "..:task_queue_for_test", + "../../api/task_queue", + "../../api/task_queue/test:mock_task_queue_base", + "../../api/units:time_delta", + "../../api/units:timestamp", + "../../system_wrappers:system_wrappers", + "../../test:test_support", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/functional:any_invocable" ] + } +} diff --git a/third_party/libwebrtc/rtc_base/task_utils/repeating_task.cc b/third_party/libwebrtc/rtc_base/task_utils/repeating_task.cc new file mode 100644 index 0000000000..6b76d7673a --- /dev/null +++ b/third_party/libwebrtc/rtc_base/task_utils/repeating_task.cc @@ -0,0 +1,140 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/task_utils/repeating_task.h" + +#include "absl/functional/any_invocable.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "rtc_base/logging.h" + +namespace webrtc { +namespace { + +class RepeatingTask { + public: + RepeatingTask(TaskQueueBase* task_queue, + TaskQueueBase::DelayPrecision precision, + TimeDelta first_delay, + absl::AnyInvocable<TimeDelta()> task, + Clock* clock, + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag, + const Location& location); + RepeatingTask(RepeatingTask&&) = default; + RepeatingTask& operator=(RepeatingTask&&) = delete; + ~RepeatingTask() = default; + + void operator()() &&; + + private: + TaskQueueBase* const task_queue_; + const TaskQueueBase::DelayPrecision precision_; + Clock* const clock_; + const Location location_; + absl::AnyInvocable<TimeDelta()> task_; + // This is always finite. + Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_); + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag_ + RTC_GUARDED_BY(task_queue_); +}; + +RepeatingTask::RepeatingTask( + TaskQueueBase* task_queue, + TaskQueueBase::DelayPrecision precision, + TimeDelta first_delay, + absl::AnyInvocable<TimeDelta()> task, + Clock* clock, + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag, + const Location& location) + : task_queue_(task_queue), + precision_(precision), + clock_(clock), + location_(location), + task_(std::move(task)), + next_run_time_(clock_->CurrentTime() + first_delay), + alive_flag_(std::move(alive_flag)) {} + +void RepeatingTask::operator()() && { + RTC_DCHECK_RUN_ON(task_queue_); + if (!alive_flag_->alive()) + return; + + webrtc_repeating_task_impl::RepeatingTaskImplDTraceProbeRun(); + TimeDelta delay = task_(); + RTC_DCHECK_GE(delay, TimeDelta::Zero()); + + // A delay of +infinity means that the task should not be run again. + // Alternatively, the closure might have stopped this task. + if (delay.IsPlusInfinity() || !alive_flag_->alive()) + return; + + TimeDelta lost_time = clock_->CurrentTime() - next_run_time_; + next_run_time_ += delay; + delay -= lost_time; + delay = std::max(delay, TimeDelta::Zero()); + + task_queue_->PostDelayedTaskWithPrecision(precision_, std::move(*this), delay, + location_); +} + +} // namespace + +RepeatingTaskHandle RepeatingTaskHandle::Start( + TaskQueueBase* task_queue, + absl::AnyInvocable<TimeDelta()> closure, + TaskQueueBase::DelayPrecision precision, + Clock* clock, + const Location& location) { + auto alive_flag = PendingTaskSafetyFlag::CreateDetached(); + webrtc_repeating_task_impl::RepeatingTaskHandleDTraceProbeStart(); + task_queue->PostTask( + RepeatingTask(task_queue, precision, TimeDelta::Zero(), + std::move(closure), clock, alive_flag, location), + location); + return RepeatingTaskHandle(std::move(alive_flag)); +} + +// DelayedStart is equivalent to Start except that the first invocation of the +// closure will be delayed by the given amount. +RepeatingTaskHandle RepeatingTaskHandle::DelayedStart( + TaskQueueBase* task_queue, + TimeDelta first_delay, + absl::AnyInvocable<TimeDelta()> closure, + TaskQueueBase::DelayPrecision precision, + Clock* clock, + const Location& location) { + auto alive_flag = PendingTaskSafetyFlag::CreateDetached(); + webrtc_repeating_task_impl::RepeatingTaskHandleDTraceProbeDelayedStart(); + task_queue->PostDelayedTaskWithPrecision( + precision, + RepeatingTask(task_queue, precision, first_delay, std::move(closure), + clock, alive_flag, location), + first_delay, location); + return RepeatingTaskHandle(std::move(alive_flag)); +} + +void RepeatingTaskHandle::Stop() { + if (repeating_task_) { + repeating_task_->SetNotAlive(); + repeating_task_ = nullptr; + } +} + +bool RepeatingTaskHandle::Running() const { + return repeating_task_ != nullptr; +} + +namespace webrtc_repeating_task_impl { +// These methods are empty, but can be externally equipped with actions using +// dtrace. +void RepeatingTaskHandleDTraceProbeStart() {} +void RepeatingTaskHandleDTraceProbeDelayedStart() {} +void RepeatingTaskImplDTraceProbeRun() {} +} // namespace webrtc_repeating_task_impl +} // namespace webrtc diff --git a/third_party/libwebrtc/rtc_base/task_utils/repeating_task.h b/third_party/libwebrtc/rtc_base/task_utils/repeating_task.h new file mode 100644 index 0000000000..28c691c3de --- /dev/null +++ b/third_party/libwebrtc/rtc_base/task_utils/repeating_task.h @@ -0,0 +1,92 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TASK_UTILS_REPEATING_TASK_H_ +#define RTC_BASE_TASK_UTILS_REPEATING_TASK_H_ + +#include <memory> +#include <type_traits> +#include <utility> + +#include "absl/functional/any_invocable.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" +#include "system_wrappers/include/clock.h" + +namespace webrtc { + +namespace webrtc_repeating_task_impl { + +// Methods simplifying external tracing of RepeatingTaskHandle operations. +void RepeatingTaskHandleDTraceProbeStart(); +void RepeatingTaskHandleDTraceProbeDelayedStart(); +void RepeatingTaskImplDTraceProbeRun(); + +} // namespace webrtc_repeating_task_impl + +// Allows starting tasks that repeat themselves on a TaskQueue indefinately +// until they are stopped or the TaskQueue is destroyed. It allows starting and +// stopping multiple times, but you must stop one task before starting another +// and it can only be stopped when in the running state. The public interface is +// not thread safe. +class RepeatingTaskHandle { + public: + RepeatingTaskHandle() = default; + ~RepeatingTaskHandle() = default; + RepeatingTaskHandle(RepeatingTaskHandle&& other) = default; + RepeatingTaskHandle& operator=(RepeatingTaskHandle&& other) = default; + RepeatingTaskHandle(const RepeatingTaskHandle&) = delete; + RepeatingTaskHandle& operator=(const RepeatingTaskHandle&) = delete; + + // Start can be used to start a task that will be reposted with a delay + // determined by the return value of the provided closure. The actual task is + // owned by the TaskQueue and will live until it has been stopped or the + // TaskQueue deletes it. It's perfectly fine to destroy the handle while the + // task is running, since the repeated task is owned by the TaskQueue. + // The tasks are scheduled onto the task queue using the specified precision. + static RepeatingTaskHandle Start( + TaskQueueBase* task_queue, + absl::AnyInvocable<TimeDelta()> closure, + TaskQueueBase::DelayPrecision precision = + TaskQueueBase::DelayPrecision::kLow, + Clock* clock = Clock::GetRealTimeClockRaw(), + const Location& location = Location::Current()); + + // DelayedStart is equivalent to Start except that the first invocation of the + // closure will be delayed by the given amount. + static RepeatingTaskHandle DelayedStart( + TaskQueueBase* task_queue, + TimeDelta first_delay, + absl::AnyInvocable<TimeDelta()> closure, + TaskQueueBase::DelayPrecision precision = + TaskQueueBase::DelayPrecision::kLow, + Clock* clock = Clock::GetRealTimeClockRaw(), + const Location& location = Location::Current()); + + // Stops future invocations of the repeating task closure. Can only be called + // from the TaskQueue where the task is running. The closure is guaranteed to + // not be running after Stop() returns unless Stop() is called from the + // closure itself. + void Stop(); + + // Returns true until Stop() was called. + // Can only be called from the TaskQueue where the task is running. + bool Running() const; + + private: + explicit RepeatingTaskHandle( + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag) + : repeating_task_(std::move(alive_flag)) {} + rtc::scoped_refptr<PendingTaskSafetyFlag> repeating_task_; +}; + +} // namespace webrtc +#endif // RTC_BASE_TASK_UTILS_REPEATING_TASK_H_ diff --git a/third_party/libwebrtc/rtc_base/task_utils/repeating_task_gn/moz.build b/third_party/libwebrtc/rtc_base/task_utils/repeating_task_gn/moz.build new file mode 100644 index 0000000000..224f776828 --- /dev/null +++ b/third_party/libwebrtc/rtc_base/task_utils/repeating_task_gn/moz.build @@ -0,0 +1,236 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + + ### This moz.build was AUTOMATICALLY GENERATED from a GN config, ### + ### DO NOT edit it by hand. ### + +COMPILE_FLAGS["OS_INCLUDES"] = [] +AllowCompilerWarnings() + +DEFINES["ABSL_ALLOCATOR_NOTHROW"] = "1" +DEFINES["RTC_DAV1D_IN_INTERNAL_DECODER_FACTORY"] = True +DEFINES["RTC_ENABLE_VP9"] = True +DEFINES["WEBRTC_ENABLE_PROTOBUF"] = "0" +DEFINES["WEBRTC_LIBRARY_IMPL"] = True +DEFINES["WEBRTC_MOZILLA_BUILD"] = True +DEFINES["WEBRTC_NON_STATIC_TRACE_EVENT_HANDLERS"] = "0" +DEFINES["WEBRTC_STRICT_FIELD_TRIALS"] = "0" + +FINAL_LIBRARY = "webrtc" + + +LOCAL_INCLUDES += [ + "!/ipc/ipdl/_ipdlheaders", + "!/third_party/libwebrtc/gen", + "/ipc/chromium/src", + "/third_party/libwebrtc/", + "/third_party/libwebrtc/third_party/abseil-cpp/", + "/tools/profiler/public" +] + +UNIFIED_SOURCES += [ + "/third_party/libwebrtc/rtc_base/task_utils/repeating_task.cc" +] + +if not CONFIG["MOZ_DEBUG"]: + + DEFINES["DYNAMIC_ANNOTATIONS_ENABLED"] = "0" + DEFINES["NDEBUG"] = True + DEFINES["NVALGRIND"] = True + +if CONFIG["MOZ_DEBUG"] == "1": + + DEFINES["DYNAMIC_ANNOTATIONS_ENABLED"] = "1" + +if CONFIG["OS_TARGET"] == "Android": + + DEFINES["ANDROID"] = True + DEFINES["ANDROID_NDK_VERSION_ROLL"] = "r22_1" + DEFINES["HAVE_SYS_UIO_H"] = True + DEFINES["WEBRTC_ANDROID"] = True + DEFINES["WEBRTC_ANDROID_OPENSLES"] = True + DEFINES["WEBRTC_ENABLE_LIBEVENT"] = True + DEFINES["WEBRTC_LINUX"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_GNU_SOURCE"] = True + DEFINES["__STDC_CONSTANT_MACROS"] = True + DEFINES["__STDC_FORMAT_MACROS"] = True + + OS_LIBS += [ + "log" + ] + +if CONFIG["OS_TARGET"] == "Darwin": + + DEFINES["WEBRTC_MAC"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_LIBCPP_HAS_NO_ALIGNED_ALLOCATION"] = True + DEFINES["__ASSERT_MACROS_DEFINE_VERSIONS_WITHOUT_UNDERSCORES"] = "0" + DEFINES["__STDC_CONSTANT_MACROS"] = True + DEFINES["__STDC_FORMAT_MACROS"] = True + +if CONFIG["OS_TARGET"] == "Linux": + + DEFINES["USE_AURA"] = "1" + DEFINES["USE_GLIB"] = "1" + DEFINES["USE_NSS_CERTS"] = "1" + DEFINES["USE_OZONE"] = "1" + DEFINES["USE_UDEV"] = True + DEFINES["WEBRTC_ENABLE_LIBEVENT"] = True + DEFINES["WEBRTC_LINUX"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_FILE_OFFSET_BITS"] = "64" + DEFINES["_LARGEFILE64_SOURCE"] = True + DEFINES["_LARGEFILE_SOURCE"] = True + DEFINES["__STDC_CONSTANT_MACROS"] = True + DEFINES["__STDC_FORMAT_MACROS"] = True + + OS_LIBS += [ + "rt" + ] + +if CONFIG["OS_TARGET"] == "OpenBSD": + + DEFINES["USE_GLIB"] = "1" + DEFINES["USE_OZONE"] = "1" + DEFINES["USE_X11"] = "1" + DEFINES["WEBRTC_BSD"] = True + DEFINES["WEBRTC_ENABLE_LIBEVENT"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_FILE_OFFSET_BITS"] = "64" + DEFINES["_LARGEFILE64_SOURCE"] = True + DEFINES["_LARGEFILE_SOURCE"] = True + DEFINES["__STDC_CONSTANT_MACROS"] = True + DEFINES["__STDC_FORMAT_MACROS"] = True + +if CONFIG["OS_TARGET"] == "WINNT": + + DEFINES["CERT_CHAIN_PARA_HAS_EXTRA_FIELDS"] = True + DEFINES["NOMINMAX"] = True + DEFINES["NTDDI_VERSION"] = "0x0A000000" + DEFINES["PSAPI_VERSION"] = "2" + DEFINES["RTC_ENABLE_WIN_WGC"] = True + DEFINES["UNICODE"] = True + DEFINES["USE_AURA"] = "1" + DEFINES["WEBRTC_WIN"] = True + DEFINES["WIN32"] = True + DEFINES["WIN32_LEAN_AND_MEAN"] = True + DEFINES["WINAPI_FAMILY"] = "WINAPI_FAMILY_DESKTOP_APP" + DEFINES["WINVER"] = "0x0A00" + DEFINES["_ATL_NO_OPENGL"] = True + DEFINES["_CRT_RAND_S"] = True + DEFINES["_CRT_SECURE_NO_DEPRECATE"] = True + DEFINES["_ENABLE_EXTENDED_ALIGNED_STORAGE"] = True + DEFINES["_HAS_EXCEPTIONS"] = "0" + DEFINES["_HAS_NODISCARD"] = True + DEFINES["_SCL_SECURE_NO_DEPRECATE"] = True + DEFINES["_SECURE_ATL"] = True + DEFINES["_UNICODE"] = True + DEFINES["_WIN32_WINNT"] = "0x0A00" + DEFINES["_WINDOWS"] = True + DEFINES["__STD_C"] = True + + OS_LIBS += [ + "crypt32", + "iphlpapi", + "secur32", + "winmm" + ] + +if CONFIG["TARGET_CPU"] == "aarch64": + + DEFINES["WEBRTC_ARCH_ARM64"] = True + DEFINES["WEBRTC_HAS_NEON"] = True + +if CONFIG["TARGET_CPU"] == "arm": + + CXXFLAGS += [ + "-mfpu=neon" + ] + + DEFINES["WEBRTC_ARCH_ARM"] = True + DEFINES["WEBRTC_ARCH_ARM_V7"] = True + DEFINES["WEBRTC_HAS_NEON"] = True + +if CONFIG["TARGET_CPU"] == "mips32": + + DEFINES["MIPS32_LE"] = True + DEFINES["MIPS_FPU_LE"] = True + DEFINES["_GNU_SOURCE"] = True + +if CONFIG["TARGET_CPU"] == "mips64": + + DEFINES["_GNU_SOURCE"] = True + +if CONFIG["TARGET_CPU"] == "x86": + + DEFINES["WEBRTC_ENABLE_AVX2"] = True + +if CONFIG["TARGET_CPU"] == "x86_64": + + DEFINES["WEBRTC_ENABLE_AVX2"] = True + +if CONFIG["MOZ_DEBUG"] == "1" and CONFIG["OS_TARGET"] == "Android": + + DEFINES["_DEBUG"] = True + +if CONFIG["MOZ_DEBUG"] == "1" and CONFIG["OS_TARGET"] == "Darwin": + + DEFINES["_DEBUG"] = True + +if CONFIG["MOZ_DEBUG"] == "1" and CONFIG["OS_TARGET"] == "Linux": + + DEFINES["_DEBUG"] = True + +if CONFIG["MOZ_DEBUG"] == "1" and CONFIG["OS_TARGET"] == "OpenBSD": + + DEFINES["_DEBUG"] = True + +if CONFIG["MOZ_DEBUG"] == "1" and CONFIG["OS_TARGET"] == "WINNT": + + DEFINES["_HAS_ITERATOR_DEBUGGING"] = "0" + +if CONFIG["MOZ_X11"] == "1" and CONFIG["OS_TARGET"] == "Linux": + + DEFINES["USE_X11"] = "1" + +if CONFIG["OS_TARGET"] == "Android" and CONFIG["TARGET_CPU"] == "arm": + + OS_LIBS += [ + "android_support", + "unwind" + ] + +if CONFIG["OS_TARGET"] == "Android" and CONFIG["TARGET_CPU"] == "x86": + + CXXFLAGS += [ + "-msse2" + ] + + OS_LIBS += [ + "android_support" + ] + +if CONFIG["OS_TARGET"] == "Linux" and CONFIG["TARGET_CPU"] == "aarch64": + + DEFINES["_GNU_SOURCE"] = True + +if CONFIG["OS_TARGET"] == "Linux" and CONFIG["TARGET_CPU"] == "arm": + + DEFINES["_GNU_SOURCE"] = True + +if CONFIG["OS_TARGET"] == "Linux" and CONFIG["TARGET_CPU"] == "x86": + + CXXFLAGS += [ + "-msse2" + ] + + DEFINES["_GNU_SOURCE"] = True + +if CONFIG["OS_TARGET"] == "Linux" and CONFIG["TARGET_CPU"] == "x86_64": + + DEFINES["_GNU_SOURCE"] = True + +Library("repeating_task_gn") diff --git a/third_party/libwebrtc/rtc_base/task_utils/repeating_task_unittest.cc b/third_party/libwebrtc/rtc_base/task_utils/repeating_task_unittest.cc new file mode 100644 index 0000000000..2c269b43bc --- /dev/null +++ b/third_party/libwebrtc/rtc_base/task_utils/repeating_task_unittest.cc @@ -0,0 +1,439 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/task_utils/repeating_task.h" + +#include <atomic> +#include <memory> + +#include "absl/functional/any_invocable.h" +#include "api/task_queue/task_queue_base.h" +#include "api/task_queue/test/mock_task_queue_base.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "rtc_base/event.h" +#include "rtc_base/task_queue_for_test.h" +#include "system_wrappers/include/clock.h" +#include "test/gmock.h" +#include "test/gtest.h" + +// NOTE: Since these tests rely on real time behavior, they will be flaky +// if run on heavily loaded systems. +namespace webrtc { +namespace { +using ::testing::AtLeast; +using ::testing::Invoke; +using ::testing::MockFunction; +using ::testing::NiceMock; +using ::testing::Return; +using ::testing::WithArg; + +constexpr TimeDelta kTimeout = TimeDelta::Millis(1000); + +class MockClosure { + public: + MOCK_METHOD(TimeDelta, Call, ()); + MOCK_METHOD(void, Delete, ()); +}; + +class MockTaskQueue : public MockTaskQueueBase { + public: + MockTaskQueue() : task_queue_setter_(this) {} + + private: + CurrentTaskQueueSetter task_queue_setter_; +}; + +class FakeTaskQueue : public TaskQueueBase { + public: + explicit FakeTaskQueue(SimulatedClock* clock) + : task_queue_setter_(this), clock_(clock) {} + + void Delete() override {} + + void PostTaskImpl(absl::AnyInvocable<void() &&> task, + const PostTaskTraits& /*traits*/, + const Location& /*location*/) override { + last_task_ = std::move(task); + last_precision_ = absl::nullopt; + last_delay_ = TimeDelta::Zero(); + } + + void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task, + TimeDelta delay, + const PostDelayedTaskTraits& traits, + const Location& /*location*/) override { + last_task_ = std::move(task); + last_precision_ = traits.high_precision + ? TaskQueueBase::DelayPrecision::kHigh + : TaskQueueBase::DelayPrecision::kLow; + last_delay_ = delay; + } + + bool AdvanceTimeAndRunLastTask() { + EXPECT_TRUE(last_task_); + EXPECT_TRUE(last_delay_.IsFinite()); + clock_->AdvanceTime(last_delay_); + last_delay_ = TimeDelta::MinusInfinity(); + auto task = std::move(last_task_); + std::move(task)(); + return last_task_ == nullptr; + } + + bool IsTaskQueued() { return !!last_task_; } + + TimeDelta last_delay() const { + EXPECT_TRUE(last_delay_.IsFinite()); + return last_delay_; + } + + absl::optional<TaskQueueBase::DelayPrecision> last_precision() const { + return last_precision_; + } + + private: + CurrentTaskQueueSetter task_queue_setter_; + SimulatedClock* clock_; + absl::AnyInvocable<void() &&> last_task_; + TimeDelta last_delay_ = TimeDelta::MinusInfinity(); + absl::optional<TaskQueueBase::DelayPrecision> last_precision_; +}; + +// NOTE: Since this utility class holds a raw pointer to a variable that likely +// lives on the stack, it's important that any repeating tasks that use this +// class be explicitly stopped when the test criteria have been met. If the +// task is not stopped, an instance of this class can be deleted when the +// pointed-to MockClosure has been deleted and we end up trying to call a +// virtual method on a deleted object in the dtor. +class MoveOnlyClosure { + public: + explicit MoveOnlyClosure(MockClosure* mock) : mock_(mock) {} + MoveOnlyClosure(const MoveOnlyClosure&) = delete; + MoveOnlyClosure(MoveOnlyClosure&& other) : mock_(other.mock_) { + other.mock_ = nullptr; + } + ~MoveOnlyClosure() { + if (mock_) + mock_->Delete(); + } + TimeDelta operator()() { return mock_->Call(); } + + private: + MockClosure* mock_; +}; +} // namespace + +TEST(RepeatingTaskTest, TaskIsStoppedOnStop) { + const TimeDelta kShortInterval = TimeDelta::Millis(50); + + SimulatedClock clock(Timestamp::Zero()); + FakeTaskQueue task_queue(&clock); + std::atomic_int counter(0); + auto handle = RepeatingTaskHandle::Start( + &task_queue, + [&] { + counter++; + return kShortInterval; + }, + TaskQueueBase::DelayPrecision::kLow, &clock); + EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero()); + EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); + EXPECT_EQ(counter.load(), 1); + + // The handle reposted at the short interval. + EXPECT_EQ(task_queue.last_delay(), kShortInterval); + + // Stop the handle. This prevernts the counter from incrementing. + handle.Stop(); + EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask()); + EXPECT_EQ(counter.load(), 1); +} + +TEST(RepeatingTaskTest, CompensatesForLongRunTime) { + const TimeDelta kRepeatInterval = TimeDelta::Millis(2); + // Sleeping inside the task for longer than the repeat interval once, should + // be compensated for by repeating the task faster to catch up. + const TimeDelta kSleepDuration = TimeDelta::Millis(20); + + std::atomic_int counter(0); + SimulatedClock clock(Timestamp::Zero()); + FakeTaskQueue task_queue(&clock); + RepeatingTaskHandle::Start( + &task_queue, + [&] { + ++counter; + // Task takes longer than the repeat duration. + clock.AdvanceTime(kSleepDuration); + return kRepeatInterval; + }, + TaskQueueBase::DelayPrecision::kLow, &clock); + + EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero()); + EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); + + // Task is posted right away since it took longer to run then the repeat + // interval. + EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero()); + EXPECT_EQ(counter.load(), 1); +} + +TEST(RepeatingTaskTest, CompensatesForShortRunTime) { + SimulatedClock clock(Timestamp::Zero()); + FakeTaskQueue task_queue(&clock); + std::atomic_int counter(0); + RepeatingTaskHandle::Start( + &task_queue, + [&] { + // Simulate the task taking 100ms, which should be compensated for. + counter++; + clock.AdvanceTime(TimeDelta::Millis(100)); + return TimeDelta::Millis(300); + }, + TaskQueueBase::DelayPrecision::kLow, &clock); + + // Expect instant post task. + EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero()); + // Task should be retained by the handler since it is not cancelled. + EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); + // New delay should be 200ms since repeat delay was 300ms but task took 100ms. + EXPECT_EQ(task_queue.last_delay(), TimeDelta::Millis(200)); +} + +TEST(RepeatingTaskTest, CancelDelayedTaskBeforeItRuns) { + rtc::Event done; + MockClosure mock; + EXPECT_CALL(mock, Call).Times(0); + EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); + TaskQueueForTest task_queue("queue"); + auto handle = RepeatingTaskHandle::DelayedStart( + task_queue.Get(), TimeDelta::Millis(100), MoveOnlyClosure(&mock)); + task_queue.PostTask( + [handle = std::move(handle)]() mutable { handle.Stop(); }); + EXPECT_TRUE(done.Wait(kTimeout)); +} + +TEST(RepeatingTaskTest, CancelTaskAfterItRuns) { + rtc::Event done; + MockClosure mock; + EXPECT_CALL(mock, Call).WillOnce(Return(TimeDelta::Millis(100))); + EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); + TaskQueueForTest task_queue("queue"); + auto handle = + RepeatingTaskHandle::Start(task_queue.Get(), MoveOnlyClosure(&mock)); + task_queue.PostTask( + [handle = std::move(handle)]() mutable { handle.Stop(); }); + EXPECT_TRUE(done.Wait(kTimeout)); +} + +TEST(RepeatingTaskTest, TaskCanStopItself) { + std::atomic_int counter(0); + SimulatedClock clock(Timestamp::Zero()); + FakeTaskQueue task_queue(&clock); + RepeatingTaskHandle handle = RepeatingTaskHandle::Start(&task_queue, [&] { + ++counter; + handle.Stop(); + return TimeDelta::Millis(2); + }); + EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero()); + // Task cancelled itself so wants to be released. + EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask()); + EXPECT_EQ(counter.load(), 1); +} + +TEST(RepeatingTaskTest, TaskCanStopItselfByReturningInfinity) { + std::atomic_int counter(0); + SimulatedClock clock(Timestamp::Zero()); + FakeTaskQueue task_queue(&clock); + RepeatingTaskHandle handle = RepeatingTaskHandle::Start(&task_queue, [&] { + ++counter; + return TimeDelta::PlusInfinity(); + }); + EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero()); + // Task cancelled itself so wants to be released. + EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask()); + EXPECT_EQ(counter.load(), 1); +} + +TEST(RepeatingTaskTest, ZeroReturnValueRepostsTheTask) { + NiceMock<MockClosure> closure; + rtc::Event done; + EXPECT_CALL(closure, Call()) + .WillOnce(Return(TimeDelta::Zero())) + .WillOnce(Invoke([&] { + done.Set(); + return TimeDelta::PlusInfinity(); + })); + TaskQueueForTest task_queue("queue"); + RepeatingTaskHandle::Start(task_queue.Get(), MoveOnlyClosure(&closure)); + EXPECT_TRUE(done.Wait(kTimeout)); +} + +TEST(RepeatingTaskTest, StartPeriodicTask) { + MockFunction<TimeDelta()> closure; + rtc::Event done; + EXPECT_CALL(closure, Call()) + .WillOnce(Return(TimeDelta::Millis(20))) + .WillOnce(Return(TimeDelta::Millis(20))) + .WillOnce(Invoke([&] { + done.Set(); + return TimeDelta::PlusInfinity(); + })); + TaskQueueForTest task_queue("queue"); + RepeatingTaskHandle::Start(task_queue.Get(), closure.AsStdFunction()); + EXPECT_TRUE(done.Wait(kTimeout)); +} + +TEST(RepeatingTaskTest, Example) { + class ObjectOnTaskQueue { + public: + void DoPeriodicTask() {} + TimeDelta TimeUntilNextRun() { return TimeDelta::Millis(100); } + void StartPeriodicTask(RepeatingTaskHandle* handle, + TaskQueueBase* task_queue) { + *handle = RepeatingTaskHandle::Start(task_queue, [this] { + DoPeriodicTask(); + return TimeUntilNextRun(); + }); + } + }; + TaskQueueForTest task_queue("queue"); + auto object = std::make_unique<ObjectOnTaskQueue>(); + // Create and start the periodic task. + RepeatingTaskHandle handle; + object->StartPeriodicTask(&handle, task_queue.Get()); + // Restart the task + task_queue.PostTask( + [handle = std::move(handle)]() mutable { handle.Stop(); }); + object->StartPeriodicTask(&handle, task_queue.Get()); + task_queue.PostTask( + [handle = std::move(handle)]() mutable { handle.Stop(); }); + struct Destructor { + void operator()() { object.reset(); } + std::unique_ptr<ObjectOnTaskQueue> object; + }; + task_queue.PostTask(Destructor{std::move(object)}); + // Do not wait for the destructor closure in order to create a race between + // task queue destruction and running the desctructor closure. +} + +TEST(RepeatingTaskTest, ClockIntegration) { + absl::AnyInvocable<void() &&> delayed_task; + TimeDelta expected_delay = TimeDelta::Zero(); + SimulatedClock clock(Timestamp::Zero()); + + NiceMock<MockTaskQueue> task_queue; + ON_CALL(task_queue, PostDelayedTaskImpl) + .WillByDefault([&](absl::AnyInvocable<void() &&> task, TimeDelta delay, + const MockTaskQueue::PostDelayedTaskTraits&, + const Location&) { + EXPECT_EQ(delay, expected_delay); + delayed_task = std::move(task); + }); + + expected_delay = TimeDelta::Millis(100); + RepeatingTaskHandle handle = RepeatingTaskHandle::DelayedStart( + &task_queue, TimeDelta::Millis(100), + [&clock]() { + EXPECT_EQ(Timestamp::Millis(100), clock.CurrentTime()); + // Simulate work happening for 10ms. + clock.AdvanceTimeMilliseconds(10); + return TimeDelta::Millis(100); + }, + TaskQueueBase::DelayPrecision::kLow, &clock); + + clock.AdvanceTimeMilliseconds(100); + absl::AnyInvocable<void()&&> task_to_run = std::move(delayed_task); + expected_delay = TimeDelta::Millis(90); + std::move(task_to_run)(); + EXPECT_NE(delayed_task, nullptr); + handle.Stop(); +} + +TEST(RepeatingTaskTest, CanBeStoppedAfterTaskQueueDeletedTheRepeatingTask) { + absl::AnyInvocable<void() &&> repeating_task; + + MockTaskQueue task_queue; + EXPECT_CALL(task_queue, PostDelayedTaskImpl) + .WillOnce(WithArg<0>([&](absl::AnyInvocable<void() &&> task) { + repeating_task = std::move(task); + })); + + RepeatingTaskHandle handle = + RepeatingTaskHandle::DelayedStart(&task_queue, TimeDelta::Millis(100), + [] { return TimeDelta::Millis(100); }); + + // shutdown task queue: delete all pending tasks and run 'regular' task. + repeating_task = nullptr; + handle.Stop(); +} + +TEST(RepeatingTaskTest, DefaultPrecisionIsLow) { + SimulatedClock clock(Timestamp::Zero()); + FakeTaskQueue task_queue(&clock); + // Closure that repeats twice. + MockFunction<TimeDelta()> closure; + EXPECT_CALL(closure, Call()) + .WillOnce(Return(TimeDelta::Millis(1))) + .WillOnce(Return(TimeDelta::PlusInfinity())); + RepeatingTaskHandle::Start(&task_queue, closure.AsStdFunction()); + // Initial task is a PostTask(). + EXPECT_FALSE(task_queue.last_precision().has_value()); + EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); + // Repeated task is a delayed task with the default precision: low. + EXPECT_TRUE(task_queue.last_precision().has_value()); + EXPECT_EQ(task_queue.last_precision().value(), + TaskQueueBase::DelayPrecision::kLow); + // No more tasks. + EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask()); +} + +TEST(RepeatingTaskTest, CanSpecifyToPostTasksWithLowPrecision) { + SimulatedClock clock(Timestamp::Zero()); + FakeTaskQueue task_queue(&clock); + // Closure that repeats twice. + MockFunction<TimeDelta()> closure; + EXPECT_CALL(closure, Call()) + .WillOnce(Return(TimeDelta::Millis(1))) + .WillOnce(Return(TimeDelta::PlusInfinity())); + RepeatingTaskHandle::Start(&task_queue, closure.AsStdFunction(), + TaskQueueBase::DelayPrecision::kLow); + // Initial task is a PostTask(). + EXPECT_FALSE(task_queue.last_precision().has_value()); + EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); + // Repeated task is a delayed task with the specified precision. + EXPECT_TRUE(task_queue.last_precision().has_value()); + EXPECT_EQ(task_queue.last_precision().value(), + TaskQueueBase::DelayPrecision::kLow); + // No more tasks. + EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask()); +} + +TEST(RepeatingTaskTest, CanSpecifyToPostTasksWithHighPrecision) { + SimulatedClock clock(Timestamp::Zero()); + FakeTaskQueue task_queue(&clock); + // Closure that repeats twice. + MockFunction<TimeDelta()> closure; + EXPECT_CALL(closure, Call()) + .WillOnce(Return(TimeDelta::Millis(1))) + .WillOnce(Return(TimeDelta::PlusInfinity())); + RepeatingTaskHandle::Start(&task_queue, closure.AsStdFunction(), + TaskQueueBase::DelayPrecision::kHigh); + // Initial task is a PostTask(). + EXPECT_FALSE(task_queue.last_precision().has_value()); + EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); + // Repeated task is a delayed task with the specified precision. + EXPECT_TRUE(task_queue.last_precision().has_value()); + EXPECT_EQ(task_queue.last_precision().value(), + TaskQueueBase::DelayPrecision::kHigh); + // No more tasks. + EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask()); +} + +} // namespace webrtc |