diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/tests/mpsc.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/tests/mpsc.rs | 567 |
1 files changed, 567 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/tests/mpsc.rs b/third_party/rust/futures-0.1.31/tests/mpsc.rs new file mode 100644 index 0000000000..9cb83e5952 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/mpsc.rs @@ -0,0 +1,567 @@ +#![cfg(feature = "use_std")] +#![allow(bare_trait_objects, unknown_lints)] + +#[macro_use] +extern crate futures; + +use futures::prelude::*; +use futures::future::{lazy, ok}; +use futures::stream::unfold; +use futures::sync::mpsc; +use futures::sync::oneshot; + +use std::thread; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +mod support; +use support::*; + + +trait AssertSend: Send {} +impl AssertSend for mpsc::Sender<i32> {} +impl AssertSend for mpsc::Receiver<i32> {} + +#[test] +fn send_recv() { + let (tx, rx) = mpsc::channel::<i32>(16); + let mut rx = rx.wait(); + + tx.send(1).wait().unwrap(); + + assert_eq!(rx.next().unwrap(), Ok(1)); +} + +#[test] +fn send_recv_no_buffer() { + let (mut tx, mut rx) = mpsc::channel::<i32>(0); + + // Run on a task context + lazy(move || { + assert!(tx.poll_complete().unwrap().is_ready()); + assert!(tx.poll_ready().unwrap().is_ready()); + + // Send first message + let res = tx.start_send(1).unwrap(); + assert!(is_ready(&res)); + assert!(tx.poll_ready().unwrap().is_not_ready()); + + // Send second message + let res = tx.start_send(2).unwrap(); + assert!(!is_ready(&res)); + + // Take the value + assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1))); + assert!(tx.poll_ready().unwrap().is_ready()); + + let res = tx.start_send(2).unwrap(); + assert!(is_ready(&res)); + assert!(tx.poll_ready().unwrap().is_not_ready()); + + // Take the value + assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2))); + assert!(tx.poll_ready().unwrap().is_ready()); + + Ok::<(), ()>(()) + }).wait().unwrap(); +} + +#[test] +fn send_shared_recv() { + let (tx1, rx) = mpsc::channel::<i32>(16); + let tx2 = tx1.clone(); + let mut rx = rx.wait(); + + tx1.send(1).wait().unwrap(); + assert_eq!(rx.next().unwrap(), Ok(1)); + + tx2.send(2).wait().unwrap(); + assert_eq!(rx.next().unwrap(), Ok(2)); +} + +#[test] +fn send_recv_threads() { + let (tx, rx) = mpsc::channel::<i32>(16); + let mut rx = rx.wait(); + + thread::spawn(move|| { + tx.send(1).wait().unwrap(); + }); + + assert_eq!(rx.next().unwrap(), Ok(1)); +} + +#[test] +fn send_recv_threads_no_capacity() { + let (tx, rx) = mpsc::channel::<i32>(0); + let mut rx = rx.wait(); + + let (readytx, readyrx) = mpsc::channel::<()>(2); + let mut readyrx = readyrx.wait(); + let t = thread::spawn(move|| { + let readytx = readytx.sink_map_err(|_| panic!()); + let (a, b) = tx.send(1).join(readytx.send(())).wait().unwrap(); + a.send(2).join(b.send(())).wait().unwrap(); + }); + + drop(readyrx.next().unwrap()); + assert_eq!(rx.next().unwrap(), Ok(1)); + drop(readyrx.next().unwrap()); + assert_eq!(rx.next().unwrap(), Ok(2)); + + t.join().unwrap(); +} + +#[test] +fn recv_close_gets_none() { + let (mut tx, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + lazy(move || { + rx.close(); + + assert_eq!(rx.poll(), Ok(Async::Ready(None))); + assert!(tx.poll_ready().is_err()); + + drop(tx); + + Ok::<(), ()>(()) + }).wait().unwrap(); +} + + +#[test] +fn tx_close_gets_none() { + let (_, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + lazy(move || { + assert_eq!(rx.poll(), Ok(Async::Ready(None))); + assert_eq!(rx.poll(), Ok(Async::Ready(None))); + + Ok::<(), ()>(()) + }).wait().unwrap(); +} + +#[test] +fn spawn_sends_items() { + let core = local_executor::Core::new(); + let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1)))); + let rx = mpsc::spawn(stream, &core, 1); + assert_eq!(core.run(rx.take(4).collect()).unwrap(), + [0, 1, 2, 3]); +} + +#[test] +fn spawn_kill_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + use futures::sync::oneshot; + + // a stream which never returns anything (maybe a remote end isn't + // responding), but dropping it leads to observable side effects + // (like closing connections, releasing limited resources, ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Stream for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = local_executor::Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let stream = Dead{done: done_tx}; + let rx = mpsc::spawn(stream, &core, 1); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // now drop the spawned stream: maybe some timeout exceeded, + // or some connection on this end was closed by the remote + // end. + drop(rx); + // and wait for the spawned stream to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => (), + _ => { + panic!("dead stream wasn't canceled"); + }, + } +} + +#[test] +fn stress_shared_unbounded() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::unbounded::<i32>(); + let mut rx = rx.wait(); + + let t = thread::spawn(move|| { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.next().unwrap(), Ok(1)); + } + + if rx.next().is_some() { + panic!(); + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + + thread::spawn(move|| { + for _ in 0..AMT { + tx.unbounded_send(1).unwrap(); + } + }); + } + + drop(tx); + + t.join().ok().unwrap(); +} + +#[test] +fn stress_shared_bounded_hard() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::channel::<i32>(0); + let mut rx = rx.wait(); + + let t = thread::spawn(move|| { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.next().unwrap(), Ok(1)); + } + + if rx.next().is_some() { + panic!(); + } + }); + + for _ in 0..NTHREADS { + let mut tx = tx.clone(); + + thread::spawn(move|| { + for _ in 0..AMT { + tx = tx.send(1).wait().unwrap(); + } + }); + } + + drop(tx); + + t.join().ok().unwrap(); +} + +#[test] +fn stress_receiver_multi_task_bounded_hard() { + const AMT: usize = 10_000; + const NTHREADS: u32 = 2; + + let (mut tx, rx) = mpsc::channel::<usize>(0); + let rx = Arc::new(Mutex::new(Some(rx))); + let n = Arc::new(AtomicUsize::new(0)); + + let mut th = vec![]; + + for _ in 0..NTHREADS { + let rx = rx.clone(); + let n = n.clone(); + + let t = thread::spawn(move || { + let mut i = 0; + + loop { + i += 1; + let mut lock = rx.lock().ok().unwrap(); + + match lock.take() { + Some(mut rx) => { + if i % 5 == 0 { + let (item, rest) = rx.into_future().wait().ok().unwrap(); + + if item.is_none() { + break; + } + + n.fetch_add(1, Ordering::Relaxed); + *lock = Some(rest); + } else { + // Just poll + let n = n.clone(); + let r = lazy(move || { + let r = match rx.poll().unwrap() { + Async::Ready(Some(_)) => { + n.fetch_add(1, Ordering::Relaxed); + *lock = Some(rx); + false + } + Async::Ready(None) => { + true + } + Async::NotReady => { + *lock = Some(rx); + false + } + }; + + Ok::<bool, ()>(r) + }).wait().unwrap(); + + if r { + break; + } + } + } + None => break, + } + } + }); + + th.push(t); + } + + for i in 0..AMT { + tx = tx.send(i).wait().unwrap(); + } + + drop(tx); + + for t in th { + t.join().unwrap(); + } + + assert_eq!(AMT, n.load(Ordering::Relaxed)); +} + +/// Stress test that receiver properly receives all the messages +/// after sender dropped. +#[test] +fn stress_drop_sender() { + fn list() -> Box<Stream<Item=i32, Error=u32>> { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)) + .and_then(|tx| tx.send(Ok(2))) + .and_then(|tx| tx.send(Ok(3))) + .forget(); + Box::new(rx.then(|r| r.unwrap())) + } + + for _ in 0..10000 { + assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(), + Ok(vec![1, 2, 3])); + } +} + +/// Stress test that after receiver dropped, +/// no messages are lost. +fn stress_close_receiver_iter() { + let (tx, rx) = mpsc::unbounded(); + let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel(); + let th = thread::spawn(move || { + for i in 1.. { + if let Err(_) = tx.unbounded_send(i) { + unwritten_tx.send(i).expect("unwritten_tx"); + return; + } + } + }); + + let mut rx = rx.wait(); + + // Read one message to make sure thread effectively started + assert_eq!(Some(Ok(1)), rx.next()); + + rx.get_mut().close(); + + for i in 2.. { + match rx.next() { + Some(Ok(r)) => assert!(i == r), + Some(Err(_)) => unreachable!(), + None => { + let unwritten = unwritten_rx.recv().expect("unwritten_rx"); + assert_eq!(unwritten, i); + th.join().unwrap(); + return; + } + } + } +} + +#[test] +fn stress_close_receiver() { + for _ in 0..10000 { + stress_close_receiver_iter(); + } +} + +/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting. +#[test] +fn stress_poll_ready() { + // A task which checks channel capacity using poll_ready, and pushes items onto the channel when + // ready. + struct SenderTask { + sender: mpsc::Sender<u32>, + count: u32, + } + impl Future for SenderTask { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll<(), ()> { + // In a loop, check if the channel is ready. If so, push an item onto the channel + // (asserting that it doesn't attempt to block). + while self.count > 0 { + try_ready!(self.sender.poll_ready().map_err(|_| ())); + assert!(self.sender.start_send(self.count).unwrap().is_ready()); + self.count -= 1; + } + Ok(Async::Ready(())) + } + } + + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + + /// Run a stress test using the specified channel capacity. + fn stress(capacity: usize) { + let (tx, rx) = mpsc::channel(capacity); + let mut threads = Vec::new(); + for _ in 0..NTHREADS { + let sender = tx.clone(); + threads.push(thread::spawn(move || { + SenderTask { + sender: sender, + count: AMT, + }.wait() + })); + } + drop(tx); + + let mut rx = rx.wait(); + for _ in 0..AMT * NTHREADS { + assert!(rx.next().is_some()); + } + + assert!(rx.next().is_none()); + + for thread in threads { + thread.join().unwrap().unwrap(); + } + } + + stress(0); + stress(1); + stress(8); + stress(16); +} + +fn is_ready<T>(res: &AsyncSink<T>) -> bool { + match *res { + AsyncSink::Ready => true, + _ => false, + } +} + +#[test] +fn try_send_1() { + const N: usize = 3000; + let (mut tx, rx) = mpsc::channel(0); + + let t = thread::spawn(move || { + for i in 0..N { + loop { + if tx.try_send(i).is_ok() { + break + } + } + } + }); + for (i, j) in rx.wait().enumerate() { + assert_eq!(i, j.unwrap()); + } + t.join().unwrap(); +} + +#[test] +fn try_send_2() { + let (mut tx, rx) = mpsc::channel(0); + + tx.try_send("hello").unwrap(); + + let (readytx, readyrx) = oneshot::channel::<()>(); + + let th = thread::spawn(|| { + lazy(|| { + assert!(tx.start_send("fail").unwrap().is_not_ready()); + Ok::<_, ()>(()) + }).wait().unwrap(); + + drop(readytx); + tx.send("goodbye").wait().unwrap(); + }); + + let mut rx = rx.wait(); + + drop(readyrx.wait()); + assert_eq!(rx.next(), Some(Ok("hello"))); + assert_eq!(rx.next(), Some(Ok("goodbye"))); + assert!(rx.next().is_none()); + + th.join().unwrap(); +} + +#[test] +fn try_send_fail() { + let (mut tx, rx) = mpsc::channel(0); + let mut rx = rx.wait(); + + tx.try_send("hello").unwrap(); + + // This should fail + assert!(tx.try_send("fail").is_err()); + + assert_eq!(rx.next(), Some(Ok("hello"))); + + tx.try_send("goodbye").unwrap(); + drop(tx); + + assert_eq!(rx.next(), Some(Ok("goodbye"))); + assert!(rx.next().is_none()); +} + +#[test] +fn bounded_is_really_bounded() { + use futures::Async::*; + let (mut tx, mut rx) = mpsc::channel(0); + lazy(|| { + assert!(tx.start_send(1).unwrap().is_ready()); + // Not ready until we receive + assert!(!tx.poll_complete().unwrap().is_ready()); + // Receive the value + assert_eq!(rx.poll().unwrap(), Ready(Some(1))); + // Now the sender is ready + assert!(tx.poll_complete().unwrap().is_ready()); + Ok::<_, ()>(()) + }).wait().unwrap(); +} |