use futures::channel::oneshot; use futures::executor::{block_on, LocalPool}; use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt}; use futures::task::LocalSpawn; use std::cell::{Cell, RefCell}; use std::panic::AssertUnwindSafe; use std::rc::Rc; use std::task::Poll; use std::thread; struct CountClone(Rc>); impl Clone for CountClone { fn clone(&self) -> Self { self.0.set(self.0.get() + 1); Self(self.0.clone()) } } fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { let (tx, rx) = oneshot::channel::(); let f = rx.shared(); let join_handles = (0..threads_number) .map(|_| { let cloned_future = f.clone(); thread::spawn(move || { assert_eq!(block_on(cloned_future).unwrap(), 6); }) }) .collect::>(); tx.send(6).unwrap(); assert_eq!(block_on(f).unwrap(), 6); for join_handle in join_handles { join_handle.join().unwrap(); } } #[test] fn one_thread() { send_shared_oneshot_and_wait_on_multiple_threads(1); } #[test] fn two_threads() { send_shared_oneshot_and_wait_on_multiple_threads(2); } #[test] fn many_threads() { send_shared_oneshot_and_wait_on_multiple_threads(1000); } #[test] fn drop_on_one_task_ok() { let (tx, rx) = oneshot::channel::(); let f1 = rx.shared(); let f2 = f1.clone(); let (tx2, rx2) = oneshot::channel::(); let t1 = thread::spawn(|| { let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ())); drop(block_on(f)); }); let (tx3, rx3) = oneshot::channel::(); let t2 = thread::spawn(|| { let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ())); }); tx2.send(11).unwrap(); // cancel `f1` t1.join().unwrap(); tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved. let result = block_on(rx3).unwrap(); assert_eq!(result, 42); t2.join().unwrap(); } #[test] fn drop_in_poll() { let slot1 = Rc::new(RefCell::new(None)); let slot2 = slot1.clone(); let future1 = future::lazy(move |_| { slot2.replace(None); // Drop future 1 }) .shared(); let future2 = LocalFutureObj::new(Box::new(future1.clone())); slot1.replace(Some(future2)); assert_eq!(block_on(future1), 1); } #[test] fn peek() { let mut local_pool = LocalPool::new(); let spawn = &mut local_pool.spawner(); let (tx0, rx0) = oneshot::channel::(); let f1 = rx0.shared(); let f2 = f1.clone(); // Repeated calls on the original or clone do not change the outcome. for _ in 0..2 { assert!(f1.peek().is_none()); assert!(f2.peek().is_none()); } // Completing the underlying future has no effect, because the value has not been `poll`ed in. tx0.send(42).unwrap(); for _ in 0..2 { assert!(f1.peek().is_none()); assert!(f2.peek().is_none()); } // Once the Shared has been polled, the value is peekable on the clone. spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap(); local_pool.run(); for _ in 0..2 { assert_eq!(*f2.peek().unwrap(), Ok(42)); } } #[test] fn downgrade() { let (tx, rx) = oneshot::channel::(); let shared = rx.shared(); // Since there are outstanding `Shared`s, we can get a `WeakShared`. let weak = shared.downgrade().unwrap(); // It should upgrade fine right now. let mut shared2 = weak.upgrade().unwrap(); tx.send(42).unwrap(); assert_eq!(block_on(shared).unwrap(), 42); // We should still be able to get a new `WeakShared` and upgrade it // because `shared2` is outstanding. assert!(shared2.downgrade().is_some()); assert!(weak.upgrade().is_some()); assert_eq!(block_on(&mut shared2).unwrap(), 42); // Now that all `Shared`s have been exhausted, we should not be able // to get a new `WeakShared` or upgrade an existing one. assert!(weak.upgrade().is_none()); assert!(shared2.downgrade().is_none()); } #[test] fn dont_clone_in_single_owner_shared_future() { let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); let rx = rx.shared(); tx.send(counter).ok().unwrap(); assert_eq!(block_on(rx).unwrap().0.get(), 0); } #[test] fn dont_do_unnecessary_clones_on_output() { let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); let rx = rx.shared(); tx.send(counter).ok().unwrap(); assert_eq!(block_on(rx.clone()).unwrap().0.get(), 1); assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2); assert_eq!(block_on(rx).unwrap().0.get(), 2); } #[test] fn shared_future_that_wakes_itself_until_pending_is_returned() { let proceed = Cell::new(false); let fut = futures::future::poll_fn(|cx| { if proceed.get() { Poll::Ready(()) } else { cx.waker().wake_by_ref(); Poll::Pending } }) .shared(); // The join future can only complete if the second future gets a chance to run after the first // has returned pending assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ())); } #[test] #[should_panic(expected = "inner future panicked during poll")] fn panic_while_poll() { let fut = futures::future::poll_fn::(|_cx| panic!("test")).shared(); let fut_captured = fut.clone(); std::panic::catch_unwind(AssertUnwindSafe(|| { block_on(fut_captured); })) .unwrap_err(); block_on(fut); } #[test] #[should_panic(expected = "test_marker")] fn poll_while_panic() { struct S; impl Drop for S { fn drop(&mut self) { let fut = futures::future::ready(1).shared(); assert_eq!(block_on(fut.clone()), 1); assert_eq!(block_on(fut), 1); } } let _s = S {}; panic!("test_marker"); }