#![cfg(feature = "use_std")] extern crate futures; mod support; use futures::prelude::*; use futures::unsync::oneshot; use futures::unsync::mpsc::{self, SendError}; use futures::future::lazy; use futures::stream::{iter_ok, unfold}; use support::local_executor::Core; #[test] fn mpsc_send_recv() { let (tx, rx) = mpsc::channel::(1); let mut rx = rx.wait(); tx.send(42).wait().unwrap(); assert_eq!(rx.next(), Some(Ok(42))); assert_eq!(rx.next(), None); } #[test] fn mpsc_rx_notready() { let (_tx, mut rx) = mpsc::channel::(1); lazy(|| { assert_eq!(rx.poll().unwrap(), Async::NotReady); Ok(()) as Result<(), ()> }).wait().unwrap(); } #[test] fn mpsc_rx_end() { let (_, mut rx) = mpsc::channel::(1); lazy(|| { assert_eq!(rx.poll().unwrap(), Async::Ready(None)); Ok(()) as Result<(), ()> }).wait().unwrap(); } #[test] fn mpsc_tx_clone_weak_rc() { let (tx, mut rx) = mpsc::channel::(1); // rc = 1 let tx_clone = tx.clone(); // rc = 2 lazy(|| { assert_eq!(rx.poll().unwrap(), Async::NotReady); Ok(()) as Result<(), ()> }).wait().unwrap(); drop(tx); // rc = 1 lazy(|| { assert_eq!(rx.poll().unwrap(), Async::NotReady); Ok(()) as Result<(), ()> }).wait().unwrap(); drop(tx_clone); // rc = 0 lazy(|| { assert_eq!(rx.poll().unwrap(), Async::Ready(None)); Ok(()) as Result<(), ()> }).wait().unwrap(); } #[test] fn mpsc_tx_notready() { let (tx, _rx) = mpsc::channel::(1); let tx = tx.send(1).wait().unwrap(); lazy(move || { assert!(tx.send(2).poll().unwrap().is_not_ready()); Ok(()) as Result<(), ()> }).wait().unwrap(); } #[test] fn mpsc_tx_err() { let (tx, _) = mpsc::channel::(1); lazy(move || { assert!(tx.send(2).poll().is_err()); Ok(()) as Result<(), ()> }).wait().unwrap(); } #[test] fn mpsc_backpressure() { let (tx, rx) = mpsc::channel::(1); lazy(move || { iter_ok(vec![1, 2, 3]) .forward(tx) .map_err(|e: SendError| panic!("{}", e)) .join(rx.take(3).collect().map(|xs| { assert_eq!(xs, [1, 2, 3]); })) }).wait().unwrap(); } #[test] fn mpsc_unbounded() { let (tx, rx) = mpsc::unbounded::(); lazy(move || { iter_ok(vec![1, 2, 3]) .forward(tx) .map_err(|e: SendError| panic!("{}", e)) .join(rx.take(3).collect().map(|xs| { assert_eq!(xs, [1, 2, 3]); })) }).wait().unwrap(); } #[test] fn mpsc_recv_unpark() { let core = Core::new(); let (tx, rx) = mpsc::channel::(1); let tx2 = tx.clone(); core.spawn(rx.collect().map(|xs| assert_eq!(xs, [1, 2]))); core.spawn(lazy(move || tx.send(1).map(|_| ()).map_err(|e| panic!("{}", e)))); core.run(lazy(move || tx2.send(2))).unwrap(); } #[test] fn mpsc_send_unpark() { let core = Core::new(); let (tx, rx) = mpsc::channel::(1); let (donetx, donerx) = oneshot::channel(); core.spawn(iter_ok(vec![1, 2]).forward(tx) .then(|x: Result<_, SendError>| { assert!(x.is_err()); donetx.send(()).unwrap(); Ok(()) })); core.spawn(lazy(move || { let _ = rx; Ok(()) })); core.run(donerx).unwrap(); } #[test] fn spawn_sends_items() { let core = Core::new(); let stream = unfold(0, |i| Some(Ok::<_,u8>((i, i + 1)))); let rx = mpsc::spawn(stream, &core, 1); assert_eq!(core.run(rx.take(4).collect()).unwrap(), [0, 1, 2, 3]); } #[test] fn spawn_kill_dead_stream() { use std::thread; use std::time::Duration; use futures::future::Either; // a stream which never returns anything (maybe a remote end isn't // responding), but dropping it leads to observable side effects // (like closing connections, releasing limited resources, ...) #[derive(Debug)] struct Dead { // when dropped you should get Err(oneshot::Canceled) on the // receiving end done: oneshot::Sender<()>, } impl Stream for Dead { type Item = (); type Error = (); fn poll(&mut self) -> Poll, Self::Error> { Ok(Async::NotReady) } } // need to implement a timeout for the test, as it would hang // forever right now let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel(); thread::spawn(move || { thread::sleep(Duration::from_millis(1000)); let _ = timeout_tx.send(()); }); let core = Core::new(); let (done_tx, done_rx) = oneshot::channel(); let stream = Dead{done: done_tx}; let rx = mpsc::spawn(stream, &core, 1); let res = core.run( Ok::<_, ()>(()) .into_future() .then(move |_| { // now drop the spawned stream: maybe some timeout exceeded, // or some connection on this end was closed by the remote // end. drop(rx); // and wait for the spawned stream to release its resources done_rx }) .select2(timeout_rx) ); match res { Err(Either::A((oneshot::Canceled, _))) => (), _ => { panic!("dead stream wasn't canceled"); }, } } /// Test case for PR #768 (issue #766). /// The issue was: /// Given that an empty channel is polled by the Receiver, and the only Sender /// gets dropped without sending anything, then the Receiver would get stuck. #[test] fn dropped_sender_of_unused_channel_notifies_receiver() { let core = Core::new(); type FUTURE = Box>; // Constructs the channel which we want to test, and two futures which // act on that channel. let pair = |reverse| -> Vec { // This is the channel which we want to test. let (tx, rx) = mpsc::channel::(1); let mut futures: Vec = vec![ Box::new(futures::stream::iter_ok(vec![]) .forward(tx) .map_err(|_: mpsc::SendError| ()) .map(|_| 42) ), Box::new(rx.fold((), |_, _| Ok(())) .map(|_| 24) ), ]; if reverse { futures.reverse(); } futures }; let make_test_future = |reverse| -> Box, Error=()>> { let f = futures::future::join_all(pair(reverse)); // Use a timeout. This is not meant to test the `sync::oneshot` but // merely uses it to implement this timeout. let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel::>(); std::thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(1000)); let x = timeout_tx.send(vec![0]); assert!(x.is_err(), "Test timed out."); }); Box::new(f.select(timeout_rx.map_err(|_|())) .map_err(|x| x.0) .map(|x| x.0) ) }; // The order of the tested futures is important to test fix of PR #768. // We want future_2 to poll on the Receiver before the Sender is dropped. let result = core.run(make_test_future(false)); assert!(result.is_ok()); assert_eq!(vec![42, 24], result.unwrap()); // Test also the other ordering: let result = core.run(make_test_future(true)); assert!(result.is_ok()); assert_eq!(vec![24, 42], result.unwrap()); }