/* * Copyright 2019 The WebRTC Project Authors. All rights reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "rtc_base/operations_chain.h" #include #include #include #include #include #include "rtc_base/event.h" #include "rtc_base/gunit.h" #include "rtc_base/thread.h" #include "test/gmock.h" #include "test/gtest.h" namespace rtc { using ::testing::ElementsAre; namespace { constexpr int kDefaultTimeout = 3000; } // namespace class OperationTracker { public: OperationTracker() : background_thread_(Thread::Create()) { background_thread_->Start(); } // The caller is responsible for ensuring that no operations are pending. ~OperationTracker() {} // Creates a binding for the synchronous operation (see // StartSynchronousOperation() below). std::function)> BindSynchronousOperation( Event* operation_complete_event) { return [this, operation_complete_event](std::function callback) { StartSynchronousOperation(operation_complete_event, std::move(callback)); }; } // Creates a binding for the asynchronous operation (see // StartAsynchronousOperation() below). std::function)> BindAsynchronousOperation( Event* unblock_operation_event, Event* operation_complete_event) { return [this, unblock_operation_event, operation_complete_event](std::function callback) { StartAsynchronousOperation(unblock_operation_event, operation_complete_event, std::move(callback)); }; } // When an operation is completed, its associated Event* is added to this // list, in chronological order. This allows you to verify the order that // operations are executed. const std::vector& completed_operation_events() const { return completed_operation_events_; } private: // This operation is completed synchronously; the callback is invoked before // the function returns. void StartSynchronousOperation(Event* operation_complete_event, std::function callback) { completed_operation_events_.push_back(operation_complete_event); operation_complete_event->Set(); callback(); } // This operation is completed asynchronously; it pings `background_thread_`, // blocking that thread until `unblock_operation_event` is signaled and then // completes upon posting back to the thread that the operation started on. // Note that this requires the starting thread to be executing tasks (handle // messages), i.e. must not be blocked. void StartAsynchronousOperation(Event* unblock_operation_event, Event* operation_complete_event, std::function callback) { Thread* current_thread = Thread::Current(); background_thread_->PostTask([this, current_thread, unblock_operation_event, operation_complete_event, callback]() { unblock_operation_event->Wait(Event::kForever); current_thread->PostTask([this, operation_complete_event, callback]() { completed_operation_events_.push_back(operation_complete_event); operation_complete_event->Set(); callback(); }); }); } std::unique_ptr background_thread_; std::vector completed_operation_events_; }; // The OperationTrackerProxy ensures all operations are chained on a separate // thread. This allows tests to block while chained operations are posting // between threads. class OperationTrackerProxy { public: OperationTrackerProxy() : operations_chain_thread_(Thread::Create()), operation_tracker_(nullptr), operations_chain_(nullptr) { operations_chain_thread_->Start(); } std::unique_ptr Initialize() { std::unique_ptr event = std::make_unique(); operations_chain_thread_->PostTask([this, event_ptr = event.get()]() { operation_tracker_ = std::make_unique(); operations_chain_ = OperationsChain::Create(); event_ptr->Set(); }); return event; } void SetOnChainEmptyCallback(std::function on_chain_empty_callback) { Event event; operations_chain_thread_->PostTask( [this, &event, on_chain_empty_callback = std::move(on_chain_empty_callback)]() { operations_chain_->SetOnChainEmptyCallback( std::move(on_chain_empty_callback)); event.Set(); }); event.Wait(Event::kForever); } bool IsEmpty() { Event event; bool is_empty = false; operations_chain_thread_->PostTask([this, &event, &is_empty]() { is_empty = operations_chain_->IsEmpty(); event.Set(); }); event.Wait(Event::kForever); return is_empty; } std::unique_ptr ReleaseOperationChain() { std::unique_ptr event = std::make_unique(); operations_chain_thread_->PostTask([this, event_ptr = event.get()]() { operations_chain_ = nullptr; event_ptr->Set(); }); return event; } // Chains a synchronous operation on the operation chain's thread. std::unique_ptr PostSynchronousOperation() { std::unique_ptr operation_complete_event = std::make_unique(); operations_chain_thread_->PostTask( [this, operation_complete_event_ptr = operation_complete_event.get()]() { operations_chain_->ChainOperation( operation_tracker_->BindSynchronousOperation( operation_complete_event_ptr)); }); return operation_complete_event; } // Chains an asynchronous operation on the operation chain's thread. This // involves the operation chain thread and an additional background thread. std::unique_ptr PostAsynchronousOperation( Event* unblock_operation_event) { std::unique_ptr operation_complete_event = std::make_unique(); operations_chain_thread_->PostTask( [this, unblock_operation_event, operation_complete_event_ptr = operation_complete_event.get()]() { operations_chain_->ChainOperation( operation_tracker_->BindAsynchronousOperation( unblock_operation_event, operation_complete_event_ptr)); }); return operation_complete_event; } // The order of completed events. Touches the `operation_tracker_` on the // calling thread, this is only thread safe if all chained operations have // completed. const std::vector& completed_operation_events() const { return operation_tracker_->completed_operation_events(); } private: std::unique_ptr operations_chain_thread_; std::unique_ptr operation_tracker_; scoped_refptr operations_chain_; }; // On destruction, sets a boolean flag to true. class SignalOnDestruction final { public: SignalOnDestruction(bool* destructor_called) : destructor_called_(destructor_called) { RTC_DCHECK(destructor_called_); } ~SignalOnDestruction() { // Moved objects will have `destructor_called_` set to null. Destroying a // moved SignalOnDestruction should not signal. if (destructor_called_) { *destructor_called_ = true; } } SignalOnDestruction(const SignalOnDestruction&) = delete; SignalOnDestruction& operator=(const SignalOnDestruction&) = delete; // Move operators. SignalOnDestruction(SignalOnDestruction&& other) : SignalOnDestruction(other.destructor_called_) { other.destructor_called_ = nullptr; } SignalOnDestruction& operator=(SignalOnDestruction&& other) { destructor_called_ = other.destructor_called_; other.destructor_called_ = nullptr; return *this; } private: bool* destructor_called_; }; TEST(OperationsChainTest, SynchronousOperation) { OperationTrackerProxy operation_tracker_proxy; operation_tracker_proxy.Initialize()->Wait(Event::kForever); operation_tracker_proxy.PostSynchronousOperation()->Wait(Event::kForever); } TEST(OperationsChainTest, AsynchronousOperation) { OperationTrackerProxy operation_tracker_proxy; operation_tracker_proxy.Initialize()->Wait(Event::kForever); Event unblock_async_operation_event; auto async_operation_completed_event = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event); // This should not be signaled until we unblock the operation. EXPECT_FALSE( async_operation_completed_event->Wait(webrtc::TimeDelta::Zero())); // Unblock the operation and wait for it to complete. unblock_async_operation_event.Set(); async_operation_completed_event->Wait(Event::kForever); } TEST(OperationsChainTest, SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty) { // Testing synchonicity must be done without the OperationTrackerProxy to // ensure messages are not processed in parallel. This test has no background // threads. scoped_refptr operations_chain = OperationsChain::Create(); OperationTracker operation_tracker; Event event0; operations_chain->ChainOperation( operation_tracker.BindSynchronousOperation(&event0)); // This should already be signaled. (If it wasn't, waiting wouldn't help, // because we'd be blocking the only thread that exists.) EXPECT_TRUE(event0.Wait(webrtc::TimeDelta::Zero())); // Chaining another operation should also execute immediately because the // chain should already be empty. Event event1; operations_chain->ChainOperation( operation_tracker.BindSynchronousOperation(&event1)); EXPECT_TRUE(event1.Wait(webrtc::TimeDelta::Zero())); } TEST(OperationsChainTest, AsynchronousOperationBlocksSynchronousOperation) { OperationTrackerProxy operation_tracker_proxy; operation_tracker_proxy.Initialize()->Wait(Event::kForever); Event unblock_async_operation_event; auto async_operation_completed_event = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event); auto sync_operation_completed_event = operation_tracker_proxy.PostSynchronousOperation(); unblock_async_operation_event.Set(); sync_operation_completed_event->Wait(Event::kForever); // The asynchronous avent should have blocked the synchronous event, meaning // this should already be signaled. EXPECT_TRUE(async_operation_completed_event->Wait(webrtc::TimeDelta::Zero())); } TEST(OperationsChainTest, OperationsAreExecutedInOrder) { OperationTrackerProxy operation_tracker_proxy; operation_tracker_proxy.Initialize()->Wait(Event::kForever); // Chain a mix of asynchronous and synchronous operations. Event operation0_unblock_event; auto operation0_completed_event = operation_tracker_proxy.PostAsynchronousOperation( &operation0_unblock_event); Event operation1_unblock_event; auto operation1_completed_event = operation_tracker_proxy.PostAsynchronousOperation( &operation1_unblock_event); auto operation2_completed_event = operation_tracker_proxy.PostSynchronousOperation(); auto operation3_completed_event = operation_tracker_proxy.PostSynchronousOperation(); Event operation4_unblock_event; auto operation4_completed_event = operation_tracker_proxy.PostAsynchronousOperation( &operation4_unblock_event); auto operation5_completed_event = operation_tracker_proxy.PostSynchronousOperation(); Event operation6_unblock_event; auto operation6_completed_event = operation_tracker_proxy.PostAsynchronousOperation( &operation6_unblock_event); // Unblock events in reverse order. Operations 5, 3 and 2 are synchronous and // don't need to be unblocked. operation6_unblock_event.Set(); operation4_unblock_event.Set(); operation1_unblock_event.Set(); operation0_unblock_event.Set(); // Await all operations. The await-order shouldn't matter since they all get // executed eventually. operation0_completed_event->Wait(Event::kForever); operation1_completed_event->Wait(Event::kForever); operation2_completed_event->Wait(Event::kForever); operation3_completed_event->Wait(Event::kForever); operation4_completed_event->Wait(Event::kForever); operation5_completed_event->Wait(Event::kForever); operation6_completed_event->Wait(Event::kForever); EXPECT_THAT( operation_tracker_proxy.completed_operation_events(), ElementsAre( operation0_completed_event.get(), operation1_completed_event.get(), operation2_completed_event.get(), operation3_completed_event.get(), operation4_completed_event.get(), operation5_completed_event.get(), operation6_completed_event.get())); } TEST(OperationsChainTest, IsEmpty) { OperationTrackerProxy operation_tracker_proxy; operation_tracker_proxy.Initialize()->Wait(Event::kForever); // The chain is initially empty. EXPECT_TRUE(operation_tracker_proxy.IsEmpty()); // Chain a single event. Event unblock_async_operation_event0; auto async_operation_completed_event0 = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event0); // The chain is not empty while an event is pending. EXPECT_FALSE(operation_tracker_proxy.IsEmpty()); // Completing the operation empties the chain. unblock_async_operation_event0.Set(); async_operation_completed_event0->Wait(Event::kForever); EXPECT_TRUE(operation_tracker_proxy.IsEmpty()); // Chain multiple events. Event unblock_async_operation_event1; auto async_operation_completed_event1 = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event1); Event unblock_async_operation_event2; auto async_operation_completed_event2 = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event2); // Again, the chain is not empty while an event is pending. EXPECT_FALSE(operation_tracker_proxy.IsEmpty()); // Upon completing the first event, the chain is still not empty. unblock_async_operation_event1.Set(); async_operation_completed_event1->Wait(Event::kForever); EXPECT_FALSE(operation_tracker_proxy.IsEmpty()); // Completing the last evenet empties the chain. unblock_async_operation_event2.Set(); async_operation_completed_event2->Wait(Event::kForever); EXPECT_TRUE(operation_tracker_proxy.IsEmpty()); } TEST(OperationsChainTest, OnChainEmptyCallback) { rtc::AutoThread main_thread; OperationTrackerProxy operation_tracker_proxy; operation_tracker_proxy.Initialize()->Wait(Event::kForever); std::atomic on_empty_callback_counter(0u); operation_tracker_proxy.SetOnChainEmptyCallback( [&on_empty_callback_counter] { ++on_empty_callback_counter; }); // Chain a single event. Event unblock_async_operation_event0; auto async_operation_completed_event0 = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event0); // The callback is not invoked until the operation has completed. EXPECT_EQ(0u, on_empty_callback_counter); // Completing the operation empties the chain, invoking the callback. unblock_async_operation_event0.Set(); async_operation_completed_event0->Wait(Event::kForever); EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout); // Chain multiple events. Event unblock_async_operation_event1; auto async_operation_completed_event1 = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event1); Event unblock_async_operation_event2; auto async_operation_completed_event2 = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event2); // Again, the callback is not invoked until the operation has completed. EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout); // Upon completing the first event, the chain is still not empty, so the // callback must not be invoked yet. unblock_async_operation_event1.Set(); async_operation_completed_event1->Wait(Event::kForever); EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout); // Completing the last evenet empties the chain, invoking the callback. unblock_async_operation_event2.Set(); async_operation_completed_event2->Wait(Event::kForever); EXPECT_TRUE_WAIT(2u == on_empty_callback_counter, kDefaultTimeout); } TEST(OperationsChainTest, SafeToReleaseReferenceToOperationChainWhileOperationIsPending) { OperationTrackerProxy operation_tracker_proxy; operation_tracker_proxy.Initialize()->Wait(Event::kForever); Event unblock_async_operation_event; auto async_operation_completed_event = operation_tracker_proxy.PostAsynchronousOperation( &unblock_async_operation_event); // Pending operations keep the OperationChain alive, making it safe for the // test to release any references before unblocking the async operation. operation_tracker_proxy.ReleaseOperationChain()->Wait(Event::kForever); unblock_async_operation_event.Set(); async_operation_completed_event->Wait(Event::kForever); } TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) { scoped_refptr operations_chain = OperationsChain::Create(); bool destructor_called = false; SignalOnDestruction signal_on_destruction(&destructor_called); operations_chain->ChainOperation( [signal_on_destruction = std::move(signal_on_destruction), &destructor_called](std::function callback) { EXPECT_FALSE(destructor_called); // Invoking the callback marks the operation as complete, popping the // Operation object from the OperationsChain internal queue. callback(); // Even though the internal Operation object has been destroyed, // variables captured by this lambda expression must still be valid (the // associated functor must not be deleted while executing). EXPECT_FALSE(destructor_called); }); // The lambda having executed synchronously and completed, its captured // variables should now have been deleted. EXPECT_TRUE(destructor_called); } #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) TEST(OperationsChainDeathTest, OperationNotInvokingCallbackShouldCrash) { scoped_refptr operations_chain = OperationsChain::Create(); EXPECT_DEATH( operations_chain->ChainOperation([](std::function callback) {}), ""); } TEST(OperationsChainDeathTest, OperationInvokingCallbackMultipleTimesShouldCrash) { scoped_refptr operations_chain = OperationsChain::Create(); EXPECT_DEATH( operations_chain->ChainOperation([](std::function callback) { // Signal that the operation has completed multiple times. callback(); callback(); }), ""); } #endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) } // namespace rtc