use futures::channel::{mpsc, oneshot}; use futures::executor::block_on; use futures::future::{self, poll_fn, FutureExt}; use futures::sink::SinkExt; use futures::stream::StreamExt; use futures::task::{Context, Poll}; use futures::{ join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join, }; use std::mem; #[test] fn poll_and_pending() { let pending_once = async { pending!() }; block_on(async { pin_mut!(pending_once); assert_eq!(Poll::Pending, poll!(&mut pending_once)); assert_eq!(Poll::Ready(()), poll!(&mut pending_once)); }); } #[test] fn join() { let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = oneshot::channel::(); let fut = async { let res = join!(rx1, rx2); assert_eq!((Ok(1), Ok(2)), res); }; block_on(async { pin_mut!(fut); assert_eq!(Poll::Pending, poll!(&mut fut)); tx1.send(1).unwrap(); assert_eq!(Poll::Pending, poll!(&mut fut)); tx2.send(2).unwrap(); assert_eq!(Poll::Ready(()), poll!(&mut fut)); }); } #[test] fn select() { let (tx1, rx1) = oneshot::channel::(); let (_tx2, rx2) = oneshot::channel::(); tx1.send(1).unwrap(); let mut ran = false; block_on(async { select! { res = rx1.fuse() => { assert_eq!(Ok(1), res); ran = true; }, _ = rx2.fuse() => unreachable!(), } }); assert!(ran); } #[test] fn select_biased() { let (tx1, rx1) = oneshot::channel::(); let (_tx2, rx2) = oneshot::channel::(); tx1.send(1).unwrap(); let mut ran = false; block_on(async { select_biased! { res = rx1.fuse() => { assert_eq!(Ok(1), res); ran = true; }, _ = rx2.fuse() => unreachable!(), } }); assert!(ran); } #[test] fn select_streams() { let (mut tx1, rx1) = mpsc::channel::(1); let (mut tx2, rx2) = mpsc::channel::(1); let mut rx1 = rx1.fuse(); let mut rx2 = rx2.fuse(); let mut ran = false; let mut total = 0; block_on(async { let mut tx1_opt; let mut tx2_opt; select! { _ = rx1.next() => panic!(), _ = rx2.next() => panic!(), default => { tx1.send(2).await.unwrap(); tx2.send(3).await.unwrap(); tx1_opt = Some(tx1); tx2_opt = Some(tx2); } complete => panic!(), } loop { select! { // runs first and again after default x = rx1.next() => if let Some(x) = x { total += x; }, // runs second and again after default x = rx2.next() => if let Some(x) = x { total += x; }, // runs third default => { assert_eq!(total, 5); ran = true; drop(tx1_opt.take().unwrap()); drop(tx2_opt.take().unwrap()); }, // runs last complete => break, }; } }); assert!(ran); } #[test] fn select_can_move_uncompleted_futures() { let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = oneshot::channel::(); tx1.send(1).unwrap(); tx2.send(2).unwrap(); let mut ran = false; let mut rx1 = rx1.fuse(); let mut rx2 = rx2.fuse(); block_on(async { select! { res = rx1 => { assert_eq!(Ok(1), res); assert_eq!(Ok(2), rx2.await); ran = true; }, res = rx2 => { assert_eq!(Ok(2), res); assert_eq!(Ok(1), rx1.await); ran = true; }, } }); assert!(ran); } #[test] fn select_nested() { let mut outer_fut = future::ready(1); let mut inner_fut = future::ready(2); let res = block_on(async { select! { x = outer_fut => { select! { y = inner_fut => x + y, } } } }); assert_eq!(res, 3); } #[cfg_attr(not(target_pointer_width = "64"), ignore)] #[test] fn select_size() { let fut = async { let mut ready = future::ready(0i32); select! { _ = ready => {}, } }; assert_eq!(mem::size_of_val(&fut), 24); let fut = async { let mut ready1 = future::ready(0i32); let mut ready2 = future::ready(0i32); select! { _ = ready1 => {}, _ = ready2 => {}, } }; assert_eq!(mem::size_of_val(&fut), 40); } #[test] fn select_on_non_unpin_expressions() { // The returned Future is !Unpin let make_non_unpin_fut = || async { 5 }; let res = block_on(async { let select_res; select! { value_1 = make_non_unpin_fut().fuse() => select_res = value_1, value_2 = make_non_unpin_fut().fuse() => select_res = value_2, }; select_res }); assert_eq!(res, 5); } #[test] fn select_on_non_unpin_expressions_with_default() { // The returned Future is !Unpin let make_non_unpin_fut = || async { 5 }; let res = block_on(async { let select_res; select! { value_1 = make_non_unpin_fut().fuse() => select_res = value_1, value_2 = make_non_unpin_fut().fuse() => select_res = value_2, default => select_res = 7, }; select_res }); assert_eq!(res, 5); } #[cfg_attr(not(target_pointer_width = "64"), ignore)] #[test] fn select_on_non_unpin_size() { // The returned Future is !Unpin let make_non_unpin_fut = || async { 5 }; let fut = async { let select_res; select! { value_1 = make_non_unpin_fut().fuse() => select_res = value_1, value_2 = make_non_unpin_fut().fuse() => select_res = value_2, }; select_res }; assert_eq!(32, mem::size_of_val(&fut)); } #[test] fn select_can_be_used_as_expression() { block_on(async { let res = select! { x = future::ready(7) => x, y = future::ready(3) => y + 1, }; assert!(res == 7 || res == 4); }); } #[test] fn select_with_default_can_be_used_as_expression() { fn poll_always_pending(_cx: &mut Context<'_>) -> Poll { Poll::Pending } block_on(async { let res = select! { x = poll_fn(poll_always_pending::).fuse() => x, y = poll_fn(poll_always_pending::).fuse() => y + 1, default => 99, }; assert_eq!(res, 99); }); } #[test] fn select_with_complete_can_be_used_as_expression() { block_on(async { let res = select! { x = future::pending::() => x, y = future::pending::() => y + 1, default => 99, complete => 237, }; assert_eq!(res, 237); }); } #[test] #[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { async fn require_mutable(_: &mut i32) {} async fn async_noop() {} block_on(async { let mut value = 234; select! { _ = require_mutable(&mut value).fuse() => { }, _ = async_noop().fuse() => { value += 5; }, } }); } #[test] #[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { async fn require_mutable(_: &mut i32) {} async fn async_noop() {} block_on(async { let mut value = 234; select! { _ = require_mutable(&mut value).fuse() => { }, _ = async_noop().fuse() => { value += 5; }, default => { value += 27; }, } }); } #[test] #[allow(unused_assignments)] fn stream_select() { // stream_select! macro block_on(async { let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()); let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending()); assert_eq!(endless_ones.next().await, Some(1)); assert_eq!(endless_ones.next().await, Some(1)); let mut finite_list = stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter())); assert_eq!(finite_list.next().await, Some(1)); assert_eq!(finite_list.next().await, Some(1)); assert_eq!(finite_list.next().await, None); let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3)); // Take 1000, and assert a somewhat even distribution of values. // The fairness is randomized, but over 1000 samples we should be pretty close to even. // This test may be a bit flaky. Feel free to adjust the margins as you see fit. let mut count = 0; let results = endless_mixed .take_while(move |_| { count += 1; let ret = count < 1000; async move { ret } }) .collect::>() .await; assert!(results.iter().filter(|x| **x == 1).count() >= 299); assert!(results.iter().filter(|x| **x == 2).count() >= 299); assert!(results.iter().filter(|x| **x == 3).count() >= 299); }); } #[cfg_attr(not(target_pointer_width = "64"), ignore)] #[test] fn join_size() { let fut = async { let ready = future::ready(0i32); join!(ready) }; assert_eq!(mem::size_of_val(&fut), 24); let fut = async { let ready1 = future::ready(0i32); let ready2 = future::ready(0i32); join!(ready1, ready2) }; assert_eq!(mem::size_of_val(&fut), 40); } #[cfg_attr(not(target_pointer_width = "64"), ignore)] #[test] fn try_join_size() { let fut = async { let ready = future::ready(Ok::(0)); try_join!(ready) }; assert_eq!(mem::size_of_val(&fut), 24); let fut = async { let ready1 = future::ready(Ok::(0)); let ready2 = future::ready(Ok::(0)); try_join!(ready1, ready2) }; assert_eq!(mem::size_of_val(&fut), 48); } #[allow(clippy::let_underscore_future)] #[test] fn join_doesnt_require_unpin() { let _ = async { join!(async {}, async {}) }; } #[allow(clippy::let_underscore_future)] #[test] fn try_join_doesnt_require_unpin() { let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) }; }