diff options
Diffstat (limited to 'third_party/rust/tokio/tests/task_abort.rs')
-rw-r--r-- | third_party/rust/tokio/tests/task_abort.rs | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/task_abort.rs b/third_party/rust/tokio/tests/task_abort.rs new file mode 100644 index 0000000000..fe6b50cd46 --- /dev/null +++ b/third_party/rust/tokio/tests/task_abort.rs @@ -0,0 +1,224 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::sync::Arc; +use std::thread::sleep; +use tokio::time::Duration; + +use tokio::runtime::Builder; + +struct PanicOnDrop; + +impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("Well what did you expect would happen..."); + } +} + +/// Checks that a suspended task can be aborted without panicking as reported in +/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>. +#[test] +fn test_abort_without_panic_3157() { + let rt = Builder::new_multi_thread() + .enable_time() + .worker_threads(1) + .build() + .unwrap(); + + rt.block_on(async move { + let handle = tokio::spawn(async move { + println!("task started"); + tokio::time::sleep(Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(Duration::from_millis(10)).await; + + handle.abort(); + let _ = handle.await; + }); +} + +/// Checks that a suspended task can be aborted inside of a current_thread +/// executor without panicking as reported in issue #3662: +/// <https://github.com/tokio-rs/tokio/issues/3662>. +#[test] +fn test_abort_without_panic_3662() { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + struct DropCheck(Arc<AtomicBool>); + + impl Drop for DropCheck { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } + } + + let rt = Builder::new_current_thread().build().unwrap(); + + rt.block_on(async move { + let drop_flag = Arc::new(AtomicBool::new(false)); + let drop_check = DropCheck(drop_flag.clone()); + + let j = tokio::spawn(async move { + // NB: just grab the drop check here so that it becomes part of the + // task. + let _drop_check = drop_check; + futures::future::pending::<()>().await; + }); + + let drop_flag2 = drop_flag.clone(); + + let task = std::thread::spawn(move || { + // This runs in a separate thread so it doesn't have immediate + // thread-local access to the executor. It does however transition + // the underlying task to be completed, which will cause it to be + // dropped (but not in this thread). + assert!(!drop_flag2.load(Ordering::SeqCst)); + j.abort(); + j + }) + .join() + .unwrap(); + + let result = task.await; + assert!(drop_flag.load(Ordering::SeqCst)); + assert!(result.unwrap_err().is_cancelled()); + + // Note: We do the following to trigger a deferred task cleanup. + // + // The relevant piece of code you want to look at is in: + // `Inner::block_on` of `basic_scheduler.rs`. + // + // We cause the cleanup to happen by having a poll return Pending once + // so that the scheduler can go into the "auxiliary tasks" mode, at + // which point the task is removed from the scheduler. + let i = tokio::spawn(async move { + tokio::task::yield_now().await; + }); + + i.await.unwrap(); + }); +} + +/// Checks that a suspended LocalSet task can be aborted from a remote thread +/// without panicking and without running the tasks destructor on the wrong thread. +/// <https://github.com/tokio-rs/tokio/issues/3929> +#[test] +fn remote_abort_local_set_3929() { + struct DropCheck { + created_on: std::thread::ThreadId, + not_send: std::marker::PhantomData<*const ()>, + } + + impl DropCheck { + fn new() -> Self { + Self { + created_on: std::thread::current().id(), + not_send: std::marker::PhantomData, + } + } + } + impl Drop for DropCheck { + fn drop(&mut self) { + if std::thread::current().id() != self.created_on { + panic!("non-Send value dropped in another thread!"); + } + } + } + + let rt = Builder::new_current_thread().build().unwrap(); + let local = tokio::task::LocalSet::new(); + + let check = DropCheck::new(); + let jh = local.spawn_local(async move { + futures::future::pending::<()>().await; + drop(check); + }); + + let jh2 = std::thread::spawn(move || { + sleep(Duration::from_millis(10)); + jh.abort(); + }); + + rt.block_on(local); + jh2.join().unwrap(); +} + +/// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped. +/// issue #3964: <https://github.com/tokio-rs/tokio/issues/3964>. +#[test] +fn test_abort_wakes_task_3964() { + let rt = Builder::new_current_thread().enable_time().build().unwrap(); + + rt.block_on(async move { + let notify_dropped = Arc::new(()); + let weak_notify_dropped = Arc::downgrade(¬ify_dropped); + + let handle = tokio::spawn(async move { + // Make sure the Arc is moved into the task + let _notify_dropped = notify_dropped; + println!("task started"); + tokio::time::sleep(Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(Duration::from_millis(10)).await; + + handle.abort(); + drop(handle); + + // wait for task to abort. + tokio::time::sleep(Duration::from_millis(10)).await; + + // Check that the Arc has been dropped. + assert!(weak_notify_dropped.upgrade().is_none()); + }); +} + +/// Checks that aborting a task whose destructor panics does not allow the +/// panic to escape the task. +#[test] +fn test_abort_task_that_panics_on_drop_contained() { + let rt = Builder::new_current_thread().enable_time().build().unwrap(); + + rt.block_on(async move { + let handle = tokio::spawn(async move { + // Make sure the Arc is moved into the task + let _panic_dropped = PanicOnDrop; + println!("task started"); + tokio::time::sleep(Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(Duration::from_millis(10)).await; + + handle.abort(); + drop(handle); + + // wait for task to abort. + tokio::time::sleep(Duration::from_millis(10)).await; + }); +} + +/// Checks that aborting a task whose destructor panics has the expected result. +#[test] +fn test_abort_task_that_panics_on_drop_returned() { + let rt = Builder::new_current_thread().enable_time().build().unwrap(); + + rt.block_on(async move { + let handle = tokio::spawn(async move { + // Make sure the Arc is moved into the task + let _panic_dropped = PanicOnDrop; + println!("task started"); + tokio::time::sleep(Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(Duration::from_millis(10)).await; + + handle.abort(); + assert!(handle.await.unwrap_err().is_panic()); + }); +} |