extern crate futures; mod support; use support::*; use std::sync::mpsc; use std::thread; use futures::prelude::*; use futures::future::{ok, err}; use futures::sync::oneshot; #[test] fn and_then1() { let (tx, rx) = mpsc::channel(); let tx2 = tx.clone(); let p1 = ok::<_, i32>("a").then(move |t| { tx2.send("first").unwrap(); t }); let tx2 = tx.clone(); let p2 = ok("b").then(move |t| { tx2.send("second").unwrap(); t }); let f = p1.and_then(|_| p2); assert!(rx.try_recv().is_err()); f.map(move |s| tx.send(s).unwrap()).forget(); assert_eq!(rx.recv(), Ok("first")); assert_eq!(rx.recv(), Ok("second")); assert_eq!(rx.recv(), Ok("b")); assert!(rx.recv().is_err()); } #[test] fn and_then2() { let (tx, rx) = mpsc::channel(); let tx2 = tx.clone(); let p1 = err::(2).then(move |t| { tx2.send("first").unwrap(); t }); let tx2 = tx.clone(); let p2 = ok("b").then(move |t| { tx2.send("second").unwrap(); t }); let f = p1.and_then(|_| p2); assert!(rx.try_recv().is_err()); f.map_err(|_| drop(tx)).forget(); assert_eq!(rx.recv(), Ok("first")); assert!(rx.recv().is_err()); } #[test] fn oneshot1() { let (c, p) = oneshot::channel::(); let t = thread::spawn(|| c.send(1).unwrap()); let (tx, rx) = mpsc::channel(); p.map(move |e| tx.send(e).unwrap()).forget(); assert_eq!(rx.recv(), Ok(1)); t.join().unwrap(); } #[test] fn oneshot2() { let (c, p) = oneshot::channel::(); let t = thread::spawn(|| c.send(1).unwrap()); t.join().unwrap(); let (tx, rx) = mpsc::channel(); p.map(move |e| tx.send(e).unwrap()).forget(); assert_eq!(rx.recv(), Ok(1)); } #[test] fn oneshot3() { let (c, p) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p.map(move |e| tx.send(e).unwrap()).forget(); let t = thread::spawn(|| c.send(1).unwrap()); t.join().unwrap(); assert_eq!(rx.recv(), Ok(1)); } #[test] fn oneshot4() { let (c, p) = oneshot::channel::(); drop(c); let (tx, rx) = mpsc::channel(); p.map(move |e| tx.send(e).unwrap()).forget(); assert!(rx.recv().is_err()); } #[test] fn oneshot5() { let (c, p) = oneshot::channel::(); let t = thread::spawn(|| drop(c)); let (tx, rx) = mpsc::channel(); p.map(move |t| tx.send(t).unwrap()).forget(); t.join().unwrap(); assert!(rx.recv().is_err()); } #[test] fn oneshot6() { let (c, p) = oneshot::channel::(); drop(p); c.send(2).unwrap_err(); } #[test] fn cancel1() { let (c, p) = oneshot::channel::(); drop(c); p.map(|_| panic!()).forget(); } #[test] fn map_err1() { ok::(1).map_err(|_| panic!()).forget(); } #[test] fn map_err2() { let (tx, rx) = mpsc::channel(); err::(1).map_err(move |v| tx.send(v).unwrap()).forget(); assert_eq!(rx.recv(), Ok(1)); assert!(rx.recv().is_err()); } #[test] fn map_err3() { let (c, p) = oneshot::channel::(); p.map_err(|_| {}).forget(); drop(c); } #[test] fn or_else1() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); let tx2 = tx.clone(); let p1 = p1.map_err(move |i| { tx2.send(2).unwrap(); i }); let tx2 = tx.clone(); let p2 = p2.map(move |i| { tx2.send(i).unwrap(); i }); assert!(rx.try_recv().is_err()); drop(c1); c2.send(3).unwrap(); p1.or_else(|_| p2).map(move |v| tx.send(v).unwrap()).forget(); assert_eq!(rx.recv(), Ok(2)); assert_eq!(rx.recv(), Ok(3)); assert_eq!(rx.recv(), Ok(3)); assert!(rx.recv().is_err()); } #[test] fn or_else2() { let (c1, p1) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p1.or_else(move |_| { tx.send(()).unwrap(); ok::(1) }).forget(); c1.send(2).unwrap(); assert!(rx.recv().is_err()); } #[test] fn join1() { let (tx, rx) = mpsc::channel(); ok::(1).join(ok(2)) .map(move |v| tx.send(v).unwrap()) .forget(); assert_eq!(rx.recv(), Ok((1, 2))); assert!(rx.recv().is_err()); } #[test] fn join2() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p1.join(p2).map(move |v| tx.send(v).unwrap()).forget(); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); assert!(rx.try_recv().is_err()); c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok((1, 2))); assert!(rx.recv().is_err()); } #[test] fn join3() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p1.join(p2).map_err(move |_v| tx.send(1).unwrap()).forget(); assert!(rx.try_recv().is_err()); drop(c1); assert_eq!(rx.recv(), Ok(1)); assert!(rx.recv().is_err()); drop(c2); } #[test] fn join4() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p1.join(p2).map_err(move |v| tx.send(v).unwrap()).forget(); assert!(rx.try_recv().is_err()); drop(c1); assert!(rx.recv().is_ok()); drop(c2); assert!(rx.recv().is_err()); } #[test] fn join5() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (c3, p3) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p1.join(p2).join(p3).map(move |v| tx.send(v).unwrap()).forget(); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); assert!(rx.try_recv().is_err()); c2.send(2).unwrap(); assert!(rx.try_recv().is_err()); c3.send(3).unwrap(); assert_eq!(rx.recv(), Ok(((1, 2), 3))); assert!(rx.recv().is_err()); } #[test] fn select1() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p1.select(p2).map(move |v| tx.send(v).unwrap()).forget(); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); let (v, p2) = rx.recv().unwrap(); assert_eq!(v, 1); assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); p2.map(move |v| tx.send(v).unwrap()).forget(); c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); } #[test] fn select2() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p1.select(p2).map_err(move |v| tx.send((1, v.1)).unwrap()).forget(); assert!(rx.try_recv().is_err()); drop(c1); let (v, p2) = rx.recv().unwrap(); assert_eq!(v, 1); assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); p2.map(move |v| tx.send(v).unwrap()).forget(); c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); } #[test] fn select3() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); p1.select(p2).map_err(move |v| tx.send((1, v.1)).unwrap()).forget(); assert!(rx.try_recv().is_err()); drop(c1); let (v, p2) = rx.recv().unwrap(); assert_eq!(v, 1); assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); p2.map_err(move |_v| tx.send(2).unwrap()).forget(); drop(c2); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); } #[test] fn select4() { let (tx, rx) = mpsc::channel::>(); let t = thread::spawn(move || { for c in rx { c.send(1).unwrap(); } }); let (tx2, rx2) = mpsc::channel(); for _ in 0..10000 { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let tx3 = tx2.clone(); p1.select(p2).map(move |_| tx3.send(()).unwrap()).forget(); tx.send(c1).unwrap(); rx2.recv().unwrap(); drop(c2); } drop(tx); t.join().unwrap(); }