#![warn(rust_2018_idioms)] #![cfg(feature = "full")] use std::sync::Arc; use std::thread::sleep; use tokio::time::Duration; use tokio::runtime::Builder; struct PanicOnDrop; impl Drop for PanicOnDrop { fn drop(&mut self) { panic!("Well what did you expect would happen..."); } } /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: . #[test] fn test_abort_without_panic_3157() { let rt = Builder::new_multi_thread() .enable_time() .worker_threads(1) .build() .unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move { println!("task started"); tokio::time::sleep(Duration::new(100, 0)).await }); // wait for task to sleep. tokio::time::sleep(Duration::from_millis(10)).await; handle.abort(); let _ = handle.await; }); } /// Checks that a suspended task can be aborted inside of a current_thread /// executor without panicking as reported in issue #3662: /// . #[test] fn test_abort_without_panic_3662() { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; struct DropCheck(Arc); impl Drop for DropCheck { fn drop(&mut self) { self.0.store(true, Ordering::SeqCst); } } let rt = Builder::new_current_thread().build().unwrap(); rt.block_on(async move { let drop_flag = Arc::new(AtomicBool::new(false)); let drop_check = DropCheck(drop_flag.clone()); let j = tokio::spawn(async move { // NB: just grab the drop check here so that it becomes part of the // task. let _drop_check = drop_check; futures::future::pending::<()>().await; }); let drop_flag2 = drop_flag.clone(); let task = std::thread::spawn(move || { // This runs in a separate thread so it doesn't have immediate // thread-local access to the executor. It does however transition // the underlying task to be completed, which will cause it to be // dropped (but not in this thread). assert!(!drop_flag2.load(Ordering::SeqCst)); j.abort(); j }) .join() .unwrap(); let result = task.await; assert!(drop_flag.load(Ordering::SeqCst)); assert!(result.unwrap_err().is_cancelled()); // Note: We do the following to trigger a deferred task cleanup. // // The relevant piece of code you want to look at is in: // `Inner::block_on` of `basic_scheduler.rs`. // // We cause the cleanup to happen by having a poll return Pending once // so that the scheduler can go into the "auxiliary tasks" mode, at // which point the task is removed from the scheduler. let i = tokio::spawn(async move { tokio::task::yield_now().await; }); i.await.unwrap(); }); } /// Checks that a suspended LocalSet task can be aborted from a remote thread /// without panicking and without running the tasks destructor on the wrong thread. /// #[test] fn remote_abort_local_set_3929() { struct DropCheck { created_on: std::thread::ThreadId, not_send: std::marker::PhantomData<*const ()>, } impl DropCheck { fn new() -> Self { Self { created_on: std::thread::current().id(), not_send: std::marker::PhantomData, } } } impl Drop for DropCheck { fn drop(&mut self) { if std::thread::current().id() != self.created_on { panic!("non-Send value dropped in another thread!"); } } } let rt = Builder::new_current_thread().build().unwrap(); let local = tokio::task::LocalSet::new(); let check = DropCheck::new(); let jh = local.spawn_local(async move { futures::future::pending::<()>().await; drop(check); }); let jh2 = std::thread::spawn(move || { sleep(Duration::from_millis(10)); jh.abort(); }); rt.block_on(local); jh2.join().unwrap(); } /// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped. /// issue #3964: . #[test] fn test_abort_wakes_task_3964() { let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let notify_dropped = Arc::new(()); let weak_notify_dropped = Arc::downgrade(¬ify_dropped); let handle = tokio::spawn(async move { // Make sure the Arc is moved into the task let _notify_dropped = notify_dropped; println!("task started"); tokio::time::sleep(Duration::new(100, 0)).await }); // wait for task to sleep. tokio::time::sleep(Duration::from_millis(10)).await; handle.abort(); drop(handle); // wait for task to abort. tokio::time::sleep(Duration::from_millis(10)).await; // Check that the Arc has been dropped. assert!(weak_notify_dropped.upgrade().is_none()); }); } /// Checks that aborting a task whose destructor panics does not allow the /// panic to escape the task. #[test] fn test_abort_task_that_panics_on_drop_contained() { let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move { // Make sure the Arc is moved into the task let _panic_dropped = PanicOnDrop; println!("task started"); tokio::time::sleep(Duration::new(100, 0)).await }); // wait for task to sleep. tokio::time::sleep(Duration::from_millis(10)).await; handle.abort(); drop(handle); // wait for task to abort. tokio::time::sleep(Duration::from_millis(10)).await; }); } /// Checks that aborting a task whose destructor panics has the expected result. #[test] fn test_abort_task_that_panics_on_drop_returned() { let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move { // Make sure the Arc is moved into the task let _panic_dropped = PanicOnDrop; println!("task started"); tokio::time::sleep(Duration::new(100, 0)).await }); // wait for task to sleep. tokio::time::sleep(Duration::from_millis(10)).await; handle.abort(); assert!(handle.await.unwrap_err().is_panic()); }); }