diff options
Diffstat (limited to 'third_party/rust/tokio/tests/stream_stream_map.rs')
-rw-r--r-- | third_party/rust/tokio/tests/stream_stream_map.rs | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/stream_stream_map.rs b/third_party/rust/tokio/tests/stream_stream_map.rs new file mode 100644 index 0000000000..6b49803234 --- /dev/null +++ b/third_party/rust/tokio/tests/stream_stream_map.rs @@ -0,0 +1,374 @@ +use tokio::stream::{self, pending, Stream, StreamExt, StreamMap}; +use tokio::sync::mpsc; +use tokio_test::{assert_ok, assert_pending, assert_ready, task}; + +use std::pin::Pin; + +macro_rules! assert_ready_some { + ($($t:tt)*) => { + match assert_ready!($($t)*) { + Some(v) => v, + None => panic!("expected `Some`, got `None`"), + } + }; +} + +macro_rules! assert_ready_none { + ($($t:tt)*) => { + match assert_ready!($($t)*) { + None => {} + Some(v) => panic!("expected `None`, got `Some({:?})`", v), + } + }; +} + +#[tokio::test] +async fn empty() { + let mut map = StreamMap::<&str, stream::Pending<()>>::new(); + + assert_eq!(map.len(), 0); + assert!(map.is_empty()); + + assert!(map.next().await.is_none()); + assert!(map.next().await.is_none()); + + assert!(map.remove("foo").is_none()); +} + +#[tokio::test] +async fn single_entry() { + let mut map = task::spawn(StreamMap::new()); + let (tx, rx) = mpsc::unbounded_channel(); + + assert_ready_none!(map.poll_next()); + + assert!(map.insert("foo", rx).is_none()); + assert!(map.contains_key("foo")); + assert!(!map.contains_key("bar")); + + assert_eq!(map.len(), 1); + assert!(!map.is_empty()); + + assert_pending!(map.poll_next()); + + assert_ok!(tx.send(1)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 1); + + assert_pending!(map.poll_next()); + + assert_ok!(tx.send(2)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 2); + + assert_pending!(map.poll_next()); + drop(tx); + assert!(map.is_woken()); + assert_ready_none!(map.poll_next()); +} + +#[tokio::test] +async fn multiple_entries() { + let mut map = task::spawn(StreamMap::new()); + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + + map.insert("foo", rx1); + map.insert("bar", rx2); + + assert_pending!(map.poll_next()); + + assert_ok!(tx1.send(1)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 1); + + assert_pending!(map.poll_next()); + + assert_ok!(tx2.send(2)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "bar"); + assert_eq!(v, 2); + + assert_pending!(map.poll_next()); + + assert_ok!(tx1.send(3)); + assert_ok!(tx2.send(4)); + + assert!(map.is_woken()); + + // Given the randomization, there is no guarantee what order the values will + // be received in. + let mut v = (0..2) + .map(|_| assert_ready_some!(map.poll_next())) + .collect::<Vec<_>>(); + + assert_pending!(map.poll_next()); + + v.sort(); + assert_eq!(v[0].0, "bar"); + assert_eq!(v[0].1, 4); + assert_eq!(v[1].0, "foo"); + assert_eq!(v[1].1, 3); + + drop(tx1); + assert!(map.is_woken()); + assert_pending!(map.poll_next()); + drop(tx2); + + assert_ready_none!(map.poll_next()); +} + +#[tokio::test] +async fn insert_remove() { + let mut map = task::spawn(StreamMap::new()); + let (tx, rx) = mpsc::unbounded_channel(); + + assert_ready_none!(map.poll_next()); + + assert!(map.insert("foo", rx).is_none()); + let rx = map.remove("foo").unwrap(); + + assert_ok!(tx.send(1)); + + assert!(!map.is_woken()); + assert_ready_none!(map.poll_next()); + + assert!(map.insert("bar", rx).is_none()); + + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v.0, "bar"); + assert_eq!(v.1, 1); + + assert!(map.remove("bar").is_some()); + assert_ready_none!(map.poll_next()); + + assert!(map.is_empty()); + assert_eq!(0, map.len()); +} + +#[tokio::test] +async fn replace() { + let mut map = task::spawn(StreamMap::new()); + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + + assert!(map.insert("foo", rx1).is_none()); + + assert_pending!(map.poll_next()); + + let _rx1 = map.insert("foo", rx2).unwrap(); + + assert_pending!(map.poll_next()); + + tx1.send(1).unwrap(); + assert_pending!(map.poll_next()); + + tx2.send(2).unwrap(); + assert!(map.is_woken()); + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v.0, "foo"); + assert_eq!(v.1, 2); +} + +#[test] +fn size_hint_with_upper() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + assert_eq!(3, map.len()); + assert!(!map.is_empty()); + + let size_hint = map.size_hint(); + assert_eq!(size_hint, (6, Some(6))); +} + +#[test] +fn size_hint_without_upper() { + let mut map = StreamMap::new(); + + map.insert("a", pin_box(stream::iter(vec![1]))); + map.insert("b", pin_box(stream::iter(vec![1, 2]))); + map.insert("c", pin_box(pending())); + + let size_hint = map.size_hint(); + assert_eq!(size_hint, (3, None)); +} + +#[test] +fn new_capacity_zero() { + let map = StreamMap::<&str, stream::Pending<()>>::new(); + assert_eq!(0, map.capacity()); + + let keys = map.keys().collect::<Vec<_>>(); + assert!(keys.is_empty()); +} + +#[test] +fn with_capacity() { + let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10); + assert!(10 <= map.capacity()); + + let keys = map.keys().collect::<Vec<_>>(); + assert!(keys.is_empty()); +} + +#[test] +fn iter_keys() { + let mut map = StreamMap::new(); + + map.insert("a", pending::<i32>()); + map.insert("b", pending()); + map.insert("c", pending()); + + let mut keys = map.keys().collect::<Vec<_>>(); + keys.sort(); + + assert_eq!(&keys[..], &[&"a", &"b", &"c"]); +} + +#[test] +fn iter_values() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>(); + + size_hints.sort(); + + assert_eq!(&size_hints[..], &[1, 2, 3]); +} + +#[test] +fn iter_values_mut() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + let mut size_hints = map + .values_mut() + .map(|s: &mut _| s.size_hint().0) + .collect::<Vec<_>>(); + + size_hints.sort(); + + assert_eq!(&size_hints[..], &[1, 2, 3]); +} + +#[test] +fn clear() { + let mut map = task::spawn(StreamMap::new()); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + assert_ready_some!(map.poll_next()); + + map.clear(); + + assert_ready_none!(map.poll_next()); + assert!(map.is_empty()); +} + +#[test] +fn contains_key_borrow() { + let mut map = StreamMap::new(); + map.insert("foo".to_string(), pending::<()>()); + + assert!(map.contains_key("foo")); +} + +#[test] +fn one_ready_many_none() { + // Run a few times because of randomness + for _ in 0..100 { + let mut map = task::spawn(StreamMap::new()); + + map.insert(0, pin_box(stream::empty())); + map.insert(1, pin_box(stream::empty())); + map.insert(2, pin_box(stream::once("hello"))); + map.insert(3, pin_box(stream::pending())); + + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v, (2, "hello")); + } +} + +proptest::proptest! { + #[test] + fn fuzz_pending_complete_mix(kinds: Vec<bool>) { + use std::task::{Context, Poll}; + + struct DidPoll<T> { + did_poll: bool, + inner: T, + } + + impl<T: Stream + Unpin> Stream for DidPoll<T> { + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll<Option<T::Item>> + { + self.did_poll = true; + Pin::new(&mut self.inner).poll_next(cx) + } + } + + for _ in 0..10 { + let mut map = task::spawn(StreamMap::new()); + let mut expect = 0; + + for (i, &is_empty) in kinds.iter().enumerate() { + let inner = if is_empty { + pin_box(stream::empty::<()>()) + } else { + expect += 1; + pin_box(stream::pending::<()>()) + }; + + let stream = DidPoll { + did_poll: false, + inner, + }; + + map.insert(i, stream); + } + + if expect == 0 { + assert_ready_none!(map.poll_next()); + } else { + assert_pending!(map.poll_next()); + + assert_eq!(expect, map.values().count()); + + for stream in map.values() { + assert!(stream.did_poll); + } + } + } + } +} + +fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> { + Box::pin(s) +} |