diff options
Diffstat (limited to 'third_party/rust/tokio-stream/tests/stream_timeout.rs')
-rw-r--r-- | third_party/rust/tokio-stream/tests/stream_timeout.rs | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/tests/stream_timeout.rs b/third_party/rust/tokio-stream/tests/stream_timeout.rs new file mode 100644 index 0000000000..2338f83358 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_timeout.rs @@ -0,0 +1,109 @@ +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time::{self, sleep, Duration}; +use tokio_stream::{self, StreamExt}; +use tokio_test::*; + +use futures::stream; + +async fn maybe_sleep(idx: i32) -> i32 { + if idx % 2 == 0 { + sleep(ms(200)).await; + } + idx +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} + +#[tokio::test] +async fn basic_usage() { + time::pause(); + + // Items 2 and 4 time out. If we run the stream until it completes, + // we end up with the following items: + // + // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] + + let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100)); + let mut stream = task::spawn(stream); + + // First item completes immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + + // Second item is delayed 200ms, times out after 100ms + assert_pending!(stream.poll_next()); + + time::advance(ms(150)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); + + assert_pending!(stream.poll_next()); + + time::advance(ms(100)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(2))); + + // Third item is ready immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + + // Fourth item is delayed 200ms, times out after 100ms + assert_pending!(stream.poll_next()); + + time::advance(ms(60)).await; + assert_pending!(stream.poll_next()); // nothing ready yet + + time::advance(ms(60)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); // timeout! + + time::advance(ms(120)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(4))); + + // Done. + assert_ready_eq!(stream.poll_next(), None); +} + +#[tokio::test] +async fn return_elapsed_errors_only_once() { + time::pause(); + + let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50)); + let mut stream = task::spawn(stream); + + // First item completes immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + + // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed` + // error is returned. + assert_pending!(stream.poll_next()); + // + time::advance(ms(51)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); // timeout! + + // deadline elapses again, but no error is returned + time::advance(ms(50)).await; + assert_pending!(stream.poll_next()); + + time::advance(ms(100)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(2))); + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + + // Done + assert_ready_eq!(stream.poll_next(), None); +} + +#[tokio::test] +async fn no_timeouts() { + let stream = stream::iter(vec![1, 3, 5]) + .then(maybe_sleep) + .timeout(ms(100)); + + let mut stream = task::spawn(stream); + + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + assert_ready_eq!(stream.poll_next(), Some(Ok(5))); + assert_ready_eq!(stream.poll_next(), None); +} |