summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-stream/tests/chunks_timeout.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio-stream/tests/chunks_timeout.rs
parentInitial commit. (diff)
downloadfirefox-esr-upstream.tar.xz
firefox-esr-upstream.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-stream/tests/chunks_timeout.rs')
-rw-r--r--third_party/rust/tokio-stream/tests/chunks_timeout.rs84
1 files changed, 84 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/tests/chunks_timeout.rs b/third_party/rust/tokio-stream/tests/chunks_timeout.rs
new file mode 100644
index 0000000000..ffc7deadd7
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/chunks_timeout.rs
@@ -0,0 +1,84 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
+
+use tokio::time;
+use tokio_stream::{self as stream, StreamExt};
+use tokio_test::assert_pending;
+use tokio_test::task;
+
+use futures::FutureExt;
+use std::time::Duration;
+
+#[tokio::test(start_paused = true)]
+async fn usage() {
+ let iter = vec![1, 2, 3].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![4].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chunks_timeout(4, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test(start_paused = true)]
+async fn full_chunk_with_timeout() {
+ let iter = vec![1, 2].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![3].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
+
+ let iter = vec![4].into_iter();
+ let stream2 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chain(stream2)
+ .chunks_timeout(3, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test]
+#[ignore]
+async fn real_time() {
+ let iter = vec![1, 2, 3, 4].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![5].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chunks_timeout(3, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+ assert_eq!(chunk_stream.next().await, Some(vec![5]));
+}