extern crate futures; use futures::prelude::*; use futures::future; use futures::unsync::oneshot::{channel, Canceled, spawn}; mod support; use support::local_executor; #[test] fn smoke() { let (tx, rx) = channel(); tx.send(33).unwrap(); assert_eq!(rx.wait().unwrap(), 33); } #[test] fn canceled() { let (_, rx) = channel::<()>(); assert_eq!(rx.wait().unwrap_err(), Canceled); } #[test] fn poll_cancel() { let (mut tx, _) = channel::<()>(); assert!(tx.poll_cancel().unwrap().is_ready()); } #[test] fn tx_complete_rx_unparked() { let (tx, rx) = channel(); let res = rx.join(future::lazy(move || { tx.send(55).unwrap(); Ok(11) })); assert_eq!(res.wait().unwrap(), (55, 11)); } #[test] fn tx_dropped_rx_unparked() { let (tx, rx) = channel::(); let res = rx.join(future::lazy(move || { let _tx = tx; Ok(11) })); assert_eq!(res.wait().unwrap_err(), Canceled); } #[test] fn is_canceled() { let (tx, rx) = channel::(); assert!(!tx.is_canceled()); drop(rx); assert!(tx.is_canceled()); } #[test] fn spawn_sends_items() { let core = local_executor::Core::new(); let future = future::ok::<_, ()>(1); let rx = spawn(future, &core); assert_eq!(core.run(rx).unwrap(), 1); } #[test] fn spawn_kill_dead_stream() { use std::thread; use std::time::Duration; use futures::future::Either; use futures::sync::oneshot; // a future which never returns anything (forever accepting incoming // connections), but dropping it leads to observable side effects // (like closing listening sockets, releasing limited resources, // ...) #[derive(Debug)] struct Dead { // when dropped you should get Err(oneshot::Canceled) on the // receiving end done: oneshot::Sender<()>, } impl Future for Dead { type Item = (); type Error = (); fn poll(&mut self) -> Poll { Ok(Async::NotReady) } } // need to implement a timeout for the test, as it would hang // forever right now let (timeout_tx, timeout_rx) = oneshot::channel(); thread::spawn(move || { thread::sleep(Duration::from_millis(1000)); let _ = timeout_tx.send(()); }); let core = local_executor::Core::new(); let (done_tx, done_rx) = oneshot::channel(); let future = Dead{done: done_tx}; let rx = spawn(future, &core); let res = core.run( Ok::<_, ()>(()) .into_future() .then(move |_| { // now drop the spawned future: maybe some timeout exceeded, // or some connection on this end was closed by the remote // end. drop(rx); // and wait for the spawned future to release its resources done_rx }) .select2(timeout_rx) ); match res { Err(Either::A((oneshot::Canceled, _))) => (), Ok(Either::B(((), _))) => { panic!("dead future wasn't canceled (timeout)"); }, _ => { panic!("dead future wasn't canceled (unexpected result)"); }, } } #[test] fn spawn_dont_kill_forgot_dead_stream() { use std::thread; use std::time::Duration; use futures::future::Either; use futures::sync::oneshot; // a future which never returns anything (forever accepting incoming // connections), but dropping it leads to observable side effects // (like closing listening sockets, releasing limited resources, // ...) #[derive(Debug)] struct Dead { // when dropped you should get Err(oneshot::Canceled) on the // receiving end done: oneshot::Sender<()>, } impl Future for Dead { type Item = (); type Error = (); fn poll(&mut self) -> Poll { Ok(Async::NotReady) } } // need to implement a timeout for the test, as it would hang // forever right now let (timeout_tx, timeout_rx) = oneshot::channel(); thread::spawn(move || { thread::sleep(Duration::from_millis(1000)); let _ = timeout_tx.send(()); }); let core = local_executor::Core::new(); let (done_tx, done_rx) = oneshot::channel(); let future = Dead{done: done_tx}; let rx = spawn(future, &core); let res = core.run( Ok::<_, ()>(()) .into_future() .then(move |_| { // forget the spawned future: should keep running, i.e. hit // the timeout below. rx.forget(); // and wait for the spawned future to release its resources done_rx }) .select2(timeout_rx) ); match res { Err(Either::A((oneshot::Canceled, _))) => { panic!("forgotten dead future was canceled"); }, Ok(Either::B(((), _))) => (), // reached timeout _ => { panic!("forgotten dead future was canceled (unexpected result)"); }, } }