diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio-stream | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
62 files changed, 6041 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/.cargo-checksum.json b/third_party/rust/tokio-stream/.cargo-checksum.json new file mode 100644 index 0000000000..bd01ea1b6c --- /dev/null +++ b/third_party/rust/tokio-stream/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"d9417fd92e48b94d1b8ad262d01b0f7fb28bd4f4984806bdaebf118dfdd66f15","Cargo.toml":"00d1318802c7867352ebfb31e313a4d749d21b392aeadb63a470ab138fd4348e","LICENSE":"1a594f153f129c2de7b15f3262394bdca3dcc2da40058e3ea435c8473eb1f3a0","src/empty.rs":"292148fe9fe821b7a40200b87f3be63e54e881fa031fd1705ce7c2e264457f6b","src/iter.rs":"b2db4e9075cc5254aed52d7b9a93d56adb0e4a717fe5819372314b5fb1999ff4","src/lib.rs":"f3d5f710742a1d6bd9267e8609682749d4c710899d2ce937175b73355493dcb1","src/macros.rs":"8073292109c2f923906966cdeddc6efb56c356a3427deeeabe8205624850b6c4","src/once.rs":"d3bc03c25c20de945355302b6fb8c116e6dd54d532e5c1fdb4aa848736bb6b41","src/pending.rs":"daeac87ee04243da5ab4996d783a116f5bc9028c6a7cadc66ebf05155fece26a","src/stream_ext.rs":"36d3dd9f9ab1ea624c24dd3dcab259b1aa03a99b26e5b514c6f4a345e617a3f8","src/stream_ext/all.rs":"de7b9800ee6c6a3927d7efc3519e56a509b77351fa4c5d7bc3d4b87137088a46","src/stream_ext/any.rs":"4a0e55b517b2fa36f1366d4e98a42bb25b5f861488e6a1a93cf4936df7a0161d","src/stream_ext/chain.rs":"a697b878f50a566c62e6bcf7f2d75c332bec39aef042f8a797a6c307d6057193","src/stream_ext/chunks_timeout.rs":"2071d5ecb8c91b6132ea1d98f86be083c388fa6de10893717986385ed355ef03","src/stream_ext/collect.rs":"054eebab0156cd6ba69edee8d3d967df712d34805f9ba8eabfe6227effb82a41","src/stream_ext/filter.rs":"1972b359807eed2958954221a0fde46822c15caed1cc2dcbd51d364dc91143ec","src/stream_ext/filter_map.rs":"08236304228bf1747cc378ed36bc3e5ca9c98cfe7b2bfeebed01d6301d9dbc8a","src/stream_ext/fold.rs":"5bdece90730309d44b5092a9114afa5399a5f52d1c3c5895396b971ec30a8504","src/stream_ext/fuse.rs":"b62ea9d293db4373b7a97c78cfd070a7ab691e31d597a7005c5e274d913be625","src/stream_ext/map.rs":"ce3f1a17665d9524c599aa8aa94109e4077941b75d5836495def653db5f64f33","src/stream_ext/map_while.rs":"c89d19b0267c2b15c69812b1720df935585be5d49edf08600e9beb4f697d7916","src/stream_ext/merge.rs":"4cc0edcb30b9cc0752709c444c17bf84255b9695d5989ec50159eade5d3ab942","src/stream_ext/next.rs":"0cf1be87372999f4bc710fe06fa9c0f6742e4bc2c1f781e8f5dd065fe795fcec","src/stream_ext/skip.rs":"88cea5a1c314db260abf48cd4ac8e4e4aea23e46bec4cdb82413803c03962686","src/stream_ext/skip_while.rs":"bc571cc68406ab2bb51ef83359274fa5cb0356ca60f24fe5a17a01bc04c38632","src/stream_ext/take.rs":"c7dd38ca72d4162481869e9cd8226594fe3deefd21dee4d0befa1818337bbc3a","src/stream_ext/take_while.rs":"3a3122116a8743f66199a71f733f7c3bfce77d084a752c73b4cc54a3d26365d8","src/stream_ext/then.rs":"3f2e4272ee442afd9e04ab8476785076b62f7e5652241db893669a5d4cce26bf","src/stream_ext/throttle.rs":"4916050a86d334cb5a9ef515156c3e722540fcb19605bbc9cffb66a09d855701","src/stream_ext/timeout.rs":"89d5a2aa395dd67fd0b5ff71d823b0674231563c987e00061cc7f3973f1e4e42","src/stream_ext/try_next.rs":"3258a10910e84452571686427db3d0a477db60466c1bc96ff012530ad469520b","src/stream_map.rs":"73527d3da35aad74f3b9913b39f09ac41ce7e02ebc7baf4312c500459c2542c5","src/wrappers.rs":"c779121d9fc729c9e9e6275de506ce41e9405cd82627589b54e6e28d911676d1","src/wrappers/broadcast.rs":"a78da659df744fc17e9a82d44fed27a82cbcf117e6a9ce730e24bba4acca74db","src/wrappers/interval.rs":"f9abd6ef4abb0bd61064eac2141c2f65b31351dd14b8403625d4471bb7d86040","src/wrappers/lines.rs":"75bd75aa5d22b9eba49c35c7d4dde7e6c5546be0246523a6fa75bd81a5dd2d6c","src/wrappers/mpsc_bounded.rs":"a56b727831246075689124256f9da3c311d362fd39b5f5350dfd123f45ab600e","src/wrappers/mpsc_unbounded.rs":"92d595d15ed1d21710a6c15aee95a7889c3437534cdfcda08325f26c19c6a621","src/wrappers/read_dir.rs":"e2556f80104b424331e045e28417abcc09b9724444c812244516d5213f367aa8","src/wrappers/signal_unix.rs":"44862e50828bb22c7bdfe11fa39e46589e67e27f36de11a8be0a813c8296a4ca","src/wrappers/signal_windows.rs":"9facd73b3692cfd2e7d439be8576351498dbe1789e54eb1d14f71ccdb28da198","src/wrappers/split.rs":"c0db118b774697d1a84a02c53f2ef1e1e597399aa8afda6a4c9008408aa946d1","src/wrappers/tcp_listener.rs":"0a420994f716d62ea5536904adb5ab5f30fd322359651fcc86ae7368e745d98a","src/wrappers/unix_listener.rs":"e206aa40af1c5df68475f045d3cfce7b3f9922db8628b0530aa71ced991e0261","src/wrappers/watch.rs":"a828a0baf8a0c1aca9aeb61f6cae8305ebca731c81720caf28921d4ad4904a5d","tests/async_send_sync.rs":"1fbbf36e28058ce249f5c1d95a9b387f8b2beed6914d0f51f212e9498076ba14","tests/chunks_timeout.rs":"455b441751aa85ccc29e908d66cdfcfddd896bbf9d40cabc43744e21859db67e","tests/stream_chain.rs":"79415d868cac8737dc4da095cf8a7aaf11e628f0daa26fe9322c692db0a3e118","tests/stream_collect.rs":"672597e3a00df5283702ab1c7b98979bb2db397472f639e677d4e83c67ef1050","tests/stream_empty.rs":"4a79c50ccf8c95d3f41fc8eb02720318f3bdd84372530a826b6edfd4b94b953b","tests/stream_fuse.rs":"8975326b01eb0eabf017be3cbd683388970a60f6d9d0d6e7fdc9ae657b2f2ac7","tests/stream_iter.rs":"664fed872dba90449acd992c9840e45cda110273836b700f1f11bf1210d6b0a4","tests/stream_merge.rs":"e7ba9ac03fecd05061aa48ca17c50472ec358693e5d8e07e478e3c5d676af0bd","tests/stream_once.rs":"4ece414df7a39dfe09336cfb2802ea85e41cf49d42f23e35bd3646907be05b0a","tests/stream_panic.rs":"a497857c5ed2bbfd54537b40b9eb081489b9aa873c58f469251d6882a410f9f0","tests/stream_pending.rs":"c43d970af93c79ddf11657ff067bcb3a221e533109a03afe1cccd1bb4b3a1ffe","tests/stream_stream_map.rs":"2ce3cc79520423d6138cda0d74ef760ab654a38a326447146f1c6cd76219316c","tests/stream_timeout.rs":"b8846a24056eb39dd55808f976ccf4c1085a7cb59acf97cf070a465684b3bb82","tests/support/mpsc.rs":"2ac4d35619b5f418fa4c7ae4f5120137e1c8541937204d027f63088410a44ca7","tests/time_throttle.rs":"82f500fb2dba420b1c7b3da1899a92209a4832262b9bf450c7cef79fdaa89754","tests/watch.rs":"573e42a54e73a304b49476c7c75edb5a5901a325998ad9d302825c308147ea9f"},"package":"8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"}
\ No newline at end of file diff --git a/third_party/rust/tokio-stream/CHANGELOG.md b/third_party/rust/tokio-stream/CHANGELOG.md new file mode 100644 index 0000000000..c475c7c398 --- /dev/null +++ b/third_party/rust/tokio-stream/CHANGELOG.md @@ -0,0 +1,131 @@ +# 0.1.12 (January 20, 2022) + +- time: remove `Unpin` bound on `Throttle` methods ([#5105]) +- time: document that `throttle` operates on ms granularity ([#5101]) + +[#5105]: https://github.com/tokio-rs/tokio/pull/5105 +[#5101]: https://github.com/tokio-rs/tokio/pull/5101 + +# 0.1.11 (October 11, 2022) + +- time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036]) + +[#5036]: https://github.com/tokio-rs/tokio/pull/5036 + +# 0.1.10 (Sept 18, 2022) + +- time: add `StreamExt::chunks_timeout` ([#4695]) +- stream: add track_caller to public APIs ([#4786]) + +[#4695]: https://github.com/tokio-rs/tokio/pull/4695 +[#4786]: https://github.com/tokio-rs/tokio/pull/4786 + +# 0.1.9 (June 4, 2022) + +- deps: upgrade `tokio-util` dependency to `0.7.x` ([#3762]) +- stream: add `StreamExt::map_while` ([#4351]) +- stream: add `StreamExt::then` ([#4355]) +- stream: add cancel-safety docs to `StreamExt::next` and `try_next` ([#4715]) +- stream: expose `Elapsed` error ([#4502]) +- stream: expose `Timeout` ([#4601]) +- stream: implement `Extend` for `StreamMap` ([#4272]) +- sync: add `Clone` to `RecvError` types ([#4560]) + +[#3762]: https://github.com/tokio-rs/tokio/pull/3762 +[#4272]: https://github.com/tokio-rs/tokio/pull/4272 +[#4351]: https://github.com/tokio-rs/tokio/pull/4351 +[#4355]: https://github.com/tokio-rs/tokio/pull/4355 +[#4502]: https://github.com/tokio-rs/tokio/pull/4502 +[#4560]: https://github.com/tokio-rs/tokio/pull/4560 +[#4601]: https://github.com/tokio-rs/tokio/pull/4601 +[#4715]: https://github.com/tokio-rs/tokio/pull/4715 + +# 0.1.8 (October 29, 2021) + +- stream: add `From<Receiver<T>>` impl for receiver streams ([#4080]) +- stream: impl `FromIterator` for `StreamMap` ([#4052]) +- signal: make windows docs for signal module show up on unix builds ([#3770]) + +[#3770]: https://github.com/tokio-rs/tokio/pull/3770 +[#4052]: https://github.com/tokio-rs/tokio/pull/4052 +[#4080]: https://github.com/tokio-rs/tokio/pull/4080 + +# 0.1.7 (July 7, 2021) + +### Fixed + +- sync: fix watch wrapper ([#3914]) +- time: fix `Timeout::size_hint` ([#3902]) + +[#3902]: https://github.com/tokio-rs/tokio/pull/3902 +[#3914]: https://github.com/tokio-rs/tokio/pull/3914 + +# 0.1.6 (May 14, 2021) + +### Added + +- stream: implement `Error` and `Display` for `BroadcastStreamRecvError` ([#3745]) + +### Fixed + +- stream: avoid yielding in `AllFuture` and `AnyFuture` ([#3625]) + +[#3745]: https://github.com/tokio-rs/tokio/pull/3745 +[#3625]: https://github.com/tokio-rs/tokio/pull/3625 + +# 0.1.5 (March 20, 2021) + +### Fixed + +- stream: documentation note for throttle `Unpin` ([#3600]) + +[#3600]: https://github.com/tokio-rs/tokio/pull/3600 + +# 0.1.4 (March 9, 2021) + +Added + +- signal: add `Signal` wrapper ([#3510]) + +Fixed + +- stream: remove duplicate `doc_cfg` declaration ([#3561]) +- sync: yield initial value in `WatchStream` ([#3576]) + +[#3510]: https://github.com/tokio-rs/tokio/pull/3510 +[#3561]: https://github.com/tokio-rs/tokio/pull/3561 +[#3576]: https://github.com/tokio-rs/tokio/pull/3576 + +# 0.1.3 (February 5, 2021) + +Added + + - sync: add wrapper for broadcast and watch ([#3384], [#3504]) + +[#3384]: https://github.com/tokio-rs/tokio/pull/3384 +[#3504]: https://github.com/tokio-rs/tokio/pull/3504 + +# 0.1.2 (January 12, 2021) + +Fixed + + - docs: fix some wrappers missing in documentation ([#3378]) + +[#3378]: https://github.com/tokio-rs/tokio/pull/3378 + +# 0.1.1 (January 4, 2021) + +Added + + - add `Stream` wrappers ([#3343]) + +Fixed + + - move `async-stream` to `dev-dependencies` ([#3366]) + +[#3366]: https://github.com/tokio-rs/tokio/pull/3366 +[#3343]: https://github.com/tokio-rs/tokio/pull/3343 + +# 0.1.0 (December 23, 2020) + + - Initial release diff --git a/third_party/rust/tokio-stream/Cargo.toml b/third_party/rust/tokio-stream/Cargo.toml new file mode 100644 index 0000000000..5a3542e8d4 --- /dev/null +++ b/third_party/rust/tokio-stream/Cargo.toml @@ -0,0 +1,78 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +rust-version = "1.49" +name = "tokio-stream" +version = "0.1.12" +authors = ["Tokio Contributors <team@tokio.rs>"] +description = """ +Utilities to work with `Stream` and `tokio`. +""" +homepage = "https://tokio.rs" +categories = ["asynchronous"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = [ + "--cfg", + "docsrs", +] +rustc-args = [ + "--cfg", + "docsrs", +] + +[dependencies.futures-core] +version = "0.3.0" + +[dependencies.pin-project-lite] +version = "0.2.0" + +[dependencies.tokio] +version = "1.8.0" +features = ["sync"] + +[dependencies.tokio-util] +version = "0.7.0" +optional = true + +[dev-dependencies.async-stream] +version = "0.3" + +[dev-dependencies.futures] +version = "0.3" +default-features = false + +[dev-dependencies.parking_lot] +version = "0.12.0" + +[dev-dependencies.tokio] +version = "1.2.0" +features = [ + "full", + "test-util", +] + +[features] +default = ["time"] +fs = ["tokio/fs"] +io-util = ["tokio/io-util"] +net = ["tokio/net"] +signal = ["tokio/signal"] +sync = [ + "tokio/sync", + "tokio-util", +] +time = ["tokio/time"] diff --git a/third_party/rust/tokio-stream/LICENSE b/third_party/rust/tokio-stream/LICENSE new file mode 100644 index 0000000000..8bdf6bd60d --- /dev/null +++ b/third_party/rust/tokio-stream/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2023 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/tokio-stream/src/empty.rs b/third_party/rust/tokio-stream/src/empty.rs new file mode 100644 index 0000000000..965dcf5da7 --- /dev/null +++ b/third_party/rust/tokio-stream/src/empty.rs @@ -0,0 +1,50 @@ +use crate::Stream; + +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`empty`](fn@empty) function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Empty<T>(PhantomData<T>); + +impl<T> Unpin for Empty<T> {} +unsafe impl<T> Send for Empty<T> {} +unsafe impl<T> Sync for Empty<T> {} + +/// Creates a stream that yields nothing. +/// +/// The returned stream is immediately ready and returns `None`. Use +/// [`stream::pending()`](super::pending()) to obtain a stream that is never +/// ready. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ``` +/// use tokio_stream::{self as stream, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut none = stream::empty::<i32>(); +/// +/// assert_eq!(None, none.next().await); +/// } +/// ``` +pub const fn empty<T>() -> Empty<T> { + Empty(PhantomData) +} + +impl<T> Stream for Empty<T> { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { + Poll::Ready(None) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, Some(0)) + } +} diff --git a/third_party/rust/tokio-stream/src/iter.rs b/third_party/rust/tokio-stream/src/iter.rs new file mode 100644 index 0000000000..128be616fc --- /dev/null +++ b/third_party/rust/tokio-stream/src/iter.rs @@ -0,0 +1,67 @@ +use crate::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`iter`](fn@iter) function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Iter<I> { + iter: I, + yield_amt: usize, +} + +impl<I> Unpin for Iter<I> {} + +/// Converts an `Iterator` into a `Stream` which is always ready +/// to yield the next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter +/// simply always calls `iter.next()` and returns that. +/// +/// ``` +/// # async fn dox() { +/// use tokio_stream::{self as stream, StreamExt}; +/// +/// let mut stream = stream::iter(vec![17, 19]); +/// +/// assert_eq!(stream.next().await, Some(17)); +/// assert_eq!(stream.next().await, Some(19)); +/// assert_eq!(stream.next().await, None); +/// # } +/// ``` +pub fn iter<I>(i: I) -> Iter<I::IntoIter> +where + I: IntoIterator, +{ + Iter { + iter: i.into_iter(), + yield_amt: 0, + } +} + +impl<I> Stream for Iter<I> +where + I: Iterator, +{ + type Item = I::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>> { + // TODO: add coop back + if self.yield_amt >= 32 { + self.yield_amt = 0; + + cx.waker().wake_by_ref(); + + Poll::Pending + } else { + self.yield_amt += 1; + + Poll::Ready(self.iter.next()) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } +} diff --git a/third_party/rust/tokio-stream/src/lib.rs b/third_party/rust/tokio-stream/src/lib.rs new file mode 100644 index 0000000000..bbd4cef03e --- /dev/null +++ b/third_party/rust/tokio-stream/src/lib.rs @@ -0,0 +1,100 @@ +#![allow( + clippy::cognitive_complexity, + clippy::large_enum_variant, + clippy::needless_doctest_main +)] +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + unreachable_pub +)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Stream utilities for Tokio. +//! +//! A `Stream` is an asynchronous sequence of values. It can be thought of as +//! an asynchronous version of the standard library's `Iterator` trait. +//! +//! This crate provides helpers to work with them. For examples of usage and a more in-depth +//! description of streams you can also refer to the [streams +//! tutorial](https://tokio.rs/tokio/tutorial/streams) on the tokio website. +//! +//! # Iterating over a Stream +//! +//! Due to similarities with the standard library's `Iterator` trait, some new +//! users may assume that they can use `for in` syntax to iterate over a +//! `Stream`, but this is unfortunately not possible. Instead, you can use a +//! `while let` loop as follows: +//! +//! ```rust +//! use tokio_stream::{self as stream, StreamExt}; +//! +//! #[tokio::main] +//! async fn main() { +//! let mut stream = stream::iter(vec![0, 1, 2]); +//! +//! while let Some(value) = stream.next().await { +//! println!("Got {}", value); +//! } +//! } +//! ``` +//! +//! # Returning a Stream from a function +//! +//! A common way to stream values from a function is to pass in the sender +//! half of a channel and use the receiver as the stream. This requires awaiting +//! both futures to ensure progress is made. Another alternative is the +//! [async-stream] crate, which contains macros that provide a `yield` keyword +//! and allow you to return an `impl Stream`. +//! +//! [async-stream]: https://docs.rs/async-stream +//! +//! # Conversion to and from AsyncRead/AsyncWrite +//! +//! It is often desirable to convert a `Stream` into an [`AsyncRead`], +//! especially when dealing with plaintext formats streamed over the network. +//! The opposite conversion from an [`AsyncRead`] into a `Stream` is also +//! another commonly required feature. To enable these conversions, +//! [`tokio-util`] provides the [`StreamReader`] and [`ReaderStream`] +//! types when the io feature is enabled. +//! +//! [`tokio-util`]: https://docs.rs/tokio-util/0.4/tokio_util/codec/index.html +//! [`tokio::io`]: https://docs.rs/tokio/1.0/tokio/io/index.html +//! [`AsyncRead`]: https://docs.rs/tokio/1.0/tokio/io/trait.AsyncRead.html +//! [`AsyncWrite`]: https://docs.rs/tokio/1.0/tokio/io/trait.AsyncWrite.html +//! [`ReaderStream`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.ReaderStream.html +//! [`StreamReader`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.StreamReader.html + +#[macro_use] +mod macros; + +pub mod wrappers; + +mod stream_ext; +pub use stream_ext::{collect::FromStream, StreamExt}; +cfg_time! { + pub use stream_ext::timeout::{Elapsed, Timeout}; +} + +mod empty; +pub use empty::{empty, Empty}; + +mod iter; +pub use iter::{iter, Iter}; + +mod once; +pub use once::{once, Once}; + +mod pending; +pub use pending::{pending, Pending}; + +mod stream_map; +pub use stream_map::StreamMap; + +#[doc(no_inline)] +pub use futures_core::Stream; diff --git a/third_party/rust/tokio-stream/src/macros.rs b/third_party/rust/tokio-stream/src/macros.rs new file mode 100644 index 0000000000..1e3b61bac7 --- /dev/null +++ b/third_party/rust/tokio-stream/src/macros.rs @@ -0,0 +1,68 @@ +macro_rules! cfg_fs { + ($($item:item)*) => { + $( + #[cfg(feature = "fs")] + #[cfg_attr(docsrs, doc(cfg(feature = "fs")))] + $item + )* + } +} + +macro_rules! cfg_io_util { + ($($item:item)*) => { + $( + #[cfg(feature = "io-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + $item + )* + } +} + +macro_rules! cfg_net { + ($($item:item)*) => { + $( + #[cfg(feature = "net")] + #[cfg_attr(docsrs, doc(cfg(feature = "net")))] + $item + )* + } +} + +macro_rules! cfg_time { + ($($item:item)*) => { + $( + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + $item + )* + } +} + +macro_rules! cfg_sync { + ($($item:item)*) => { + $( + #[cfg(feature = "sync")] + #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] + $item + )* + } +} + +macro_rules! cfg_signal { + ($($item:item)*) => { + $( + #[cfg(feature = "signal")] + #[cfg_attr(docsrs, doc(cfg(feature = "signal")))] + $item + )* + } +} + +macro_rules! ready { + ($e:expr $(,)?) => { + match $e { + std::task::Poll::Ready(t) => t, + std::task::Poll::Pending => return std::task::Poll::Pending, + } + }; +} diff --git a/third_party/rust/tokio-stream/src/once.rs b/third_party/rust/tokio-stream/src/once.rs new file mode 100644 index 0000000000..04b4c052b8 --- /dev/null +++ b/third_party/rust/tokio-stream/src/once.rs @@ -0,0 +1,52 @@ +use crate::{Iter, Stream}; + +use core::option; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`once`](fn@once) function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Once<T> { + iter: Iter<option::IntoIter<T>>, +} + +impl<I> Unpin for Once<I> {} + +/// Creates a stream that emits an element exactly once. +/// +/// The returned stream is immediately ready and emits the provided value once. +/// +/// # Examples +/// +/// ``` +/// use tokio_stream::{self as stream, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// // one is the loneliest number +/// let mut one = stream::once(1); +/// +/// assert_eq!(Some(1), one.next().await); +/// +/// // just one, that's all we get +/// assert_eq!(None, one.next().await); +/// } +/// ``` +pub fn once<T>(value: T) -> Once<T> { + Once { + iter: crate::iter(Some(value).into_iter()), + } +} + +impl<T> Stream for Once<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + Pin::new(&mut self.iter).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } +} diff --git a/third_party/rust/tokio-stream/src/pending.rs b/third_party/rust/tokio-stream/src/pending.rs new file mode 100644 index 0000000000..b50fd33354 --- /dev/null +++ b/third_party/rust/tokio-stream/src/pending.rs @@ -0,0 +1,54 @@ +use crate::Stream; + +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`pending`](fn@pending) function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Pending<T>(PhantomData<T>); + +impl<T> Unpin for Pending<T> {} +unsafe impl<T> Send for Pending<T> {} +unsafe impl<T> Sync for Pending<T> {} + +/// Creates a stream that is never ready +/// +/// The returned stream is never ready. Attempting to call +/// [`next()`](crate::StreamExt::next) will never complete. Use +/// [`stream::empty()`](super::empty()) to obtain a stream that is is +/// immediately empty but returns no values. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ```no_run +/// use tokio_stream::{self as stream, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut never = stream::pending::<i32>(); +/// +/// // This will never complete +/// never.next().await; +/// +/// unreachable!(); +/// } +/// ``` +pub const fn pending<T>() -> Pending<T> { + Pending(PhantomData) +} + +impl<T> Stream for Pending<T> { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, None) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext.rs b/third_party/rust/tokio-stream/src/stream_ext.rs new file mode 100644 index 0000000000..52d32024ff --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext.rs @@ -0,0 +1,1084 @@ +use core::future::Future; +use futures_core::Stream; + +mod all; +use all::AllFuture; + +mod any; +use any::AnyFuture; + +mod chain; +use chain::Chain; + +pub(crate) mod collect; +use collect::{Collect, FromStream}; + +mod filter; +use filter::Filter; + +mod filter_map; +use filter_map::FilterMap; + +mod fold; +use fold::FoldFuture; + +mod fuse; +use fuse::Fuse; + +mod map; +use map::Map; + +mod map_while; +use map_while::MapWhile; + +mod merge; +use merge::Merge; + +mod next; +use next::Next; + +mod skip; +use skip::Skip; + +mod skip_while; +use skip_while::SkipWhile; + +mod take; +use take::Take; + +mod take_while; +use take_while::TakeWhile; + +mod then; +use then::Then; + +mod try_next; +use try_next::TryNext; + +cfg_time! { + pub(crate) mod timeout; + use timeout::Timeout; + use tokio::time::Duration; + mod throttle; + use throttle::{throttle, Throttle}; + mod chunks_timeout; + use chunks_timeout::ChunksTimeout; +} + +/// An extension trait for the [`Stream`] trait that provides a variety of +/// convenient combinator functions. +/// +/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found +/// in the [futures] crate, however both Tokio and futures provide separate +/// `StreamExt` utility traits, and some utilities are only available on one of +/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt` +/// trait in the futures crate. +/// +/// If you need utilities from both `StreamExt` traits, you should prefer to +/// import one of them, and use the other through the fully qualified call +/// syntax. For example: +/// ``` +/// // import one of the traits: +/// use futures::stream::StreamExt; +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() { +/// +/// let a = tokio_stream::iter(vec![1, 3, 5]); +/// let b = tokio_stream::iter(vec![2, 4, 6]); +/// +/// // use the fully qualified call syntax for the other trait: +/// let merged = tokio_stream::StreamExt::merge(a, b); +/// +/// // use normal call notation for futures::stream::StreamExt::collect +/// let output: Vec<_> = merged.collect().await; +/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]); +/// # } +/// ``` +/// +/// [`Stream`]: crate::Stream +/// [futures]: https://docs.rs/futures +/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html +pub trait StreamExt: Stream { + /// Consumes and returns the next value in the stream or `None` if the + /// stream is finished. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn next(&mut self) -> Option<Self::Item>; + /// ``` + /// + /// Note that because `next` doesn't take ownership over the stream, + /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a + /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can + /// be done by boxing the stream using [`Box::pin`] or + /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` + /// crate. + /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=3); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } + + /// Consumes and returns the next item in the stream. If an error is + /// encountered before the next item, the error is returned instead. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn try_next(&mut self) -> Result<Option<T>, E>; + /// ``` + /// + /// This is similar to the [`next`](StreamExt::next) combinator, + /// but returns a [`Result<Option<T>, E>`](Result) rather than + /// an [`Option<Result<T, E>>`](Option), making for easy use + /// with the [`?`](std::ops::Try) operator. + /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(1))); + /// assert_eq!(stream.try_next().await, Ok(Some(2))); + /// assert_eq!(stream.try_next().await, Err("nope")); + /// # } + /// ``` + fn try_next<T, E>(&mut self) -> TryNext<'_, Self> + where + Self: Stream<Item = Result<T, E>> + Unpin, + { + TryNext::new(self) + } + + /// Maps this stream's items to a different type, returning a new stream of + /// the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available. It is executed inline with calls to + /// [`poll_next`](Stream::poll_next). + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let mut stream = stream.map(|x| x + 3); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn map<T, F>(self, f: F) -> Map<Self, F> + where + F: FnMut(Self::Item) -> T, + Self: Sized, + { + Map::new(self, f) + } + + /// Map this stream's items to a different type for as long as determined by + /// the provided closure. A stream of the target type will be returned, + /// which will yield elements until the closure returns `None`. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, until it returns `None`. It is executed inline + /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned, + /// the underlying stream will not be polled again. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the [`Iterator::map_while`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let mut stream = stream.map_while(|x| { + /// if x < 4 { + /// Some(x + 3) + /// } else { + /// None + /// } + /// }); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn map_while<T, F>(self, f: F) -> MapWhile<Self, F> + where + F: FnMut(Self::Item) -> Option<T>, + Self: Sized, + { + MapWhile::new(self, f) + } + + /// Maps this stream's items asynchronously to a different type, returning a + /// new stream of the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, and the returned future is executed. Only one + /// future is executed at the time. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `then` methods in the + /// standard library. + /// + /// Be aware that if the future is not `Unpin`, then neither is the `Stream` + /// returned by this method. To handle this, you can use `tokio::pin!` as in + /// the example below or put the stream in a `Box` with `Box::pin(stream)`. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// async fn do_async_work(value: i32) -> i32 { + /// value + 3 + /// } + /// + /// let stream = stream::iter(1..=3); + /// let stream = stream.then(do_async_work); + /// + /// tokio::pin!(stream); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F> + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + Then::new(self, f) + } + + /// Combine two streams into one by interleaving the output of both as it + /// is produced. + /// + /// Values are produced from the merged stream in the order they arrive from + /// the two source streams. If both source streams provide values + /// simultaneously, the merge stream alternates between them. This provides + /// some level of fairness. You should not chain calls to `merge`, as this + /// will break the fairness of the merging. + /// + /// The merged stream completes once **both** source streams complete. When + /// one source stream completes before the other, the merge stream + /// exclusively polls the remaining stream. + /// + /// For merging multiple streams, consider using [`StreamMap`] instead. + /// + /// [`StreamMap`]: crate::StreamMap + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamExt, Stream}; + /// use tokio::sync::mpsc; + /// use tokio::time; + /// + /// use std::time::Duration; + /// use std::pin::Pin; + /// + /// # /* + /// #[tokio::main] + /// # */ + /// # #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// # time::pause(); + /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); + /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); + /// + /// // Convert the channels to a `Stream`. + /// let rx1 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx1.recv().await { + /// yield item; + /// } + /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; + /// + /// let rx2 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx2.recv().await { + /// yield item; + /// } + /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; + /// + /// let mut rx = rx1.merge(rx2); + /// + /// tokio::spawn(async move { + /// // Send some values immediately + /// tx1.send(1).await.unwrap(); + /// tx1.send(2).await.unwrap(); + /// + /// // Let the other task send values + /// time::sleep(Duration::from_millis(20)).await; + /// + /// tx1.send(4).await.unwrap(); + /// }); + /// + /// tokio::spawn(async move { + /// // Wait for the first task to send values + /// time::sleep(Duration::from_millis(5)).await; + /// + /// tx2.send(3).await.unwrap(); + /// + /// time::sleep(Duration::from_millis(25)).await; + /// + /// // Send the final value + /// tx2.send(5).await.unwrap(); + /// }); + /// + /// assert_eq!(1, rx.next().await.unwrap()); + /// assert_eq!(2, rx.next().await.unwrap()); + /// assert_eq!(3, rx.next().await.unwrap()); + /// assert_eq!(4, rx.next().await.unwrap()); + /// assert_eq!(5, rx.next().await.unwrap()); + /// + /// // The merged stream is consumed + /// assert!(rx.next().await.is_none()); + /// } + /// ``` + fn merge<U>(self, other: U) -> Merge<Self, U> + where + U: Stream<Item = Self::Item>, + Self: Sized, + { + Merge::new(self, other) + } + + /// Filters the values produced by this stream according to the provided + /// predicate. + /// + /// As values of this stream are made available, the provided predicate `f` + /// will be run against them. If the predicate + /// resolves to `true`, then the stream will yield the value, but if the + /// predicate resolves to `false`, then the value + /// will be discarded and the next value will be produced. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter(|x| x % 2 == 0); + /// + /// assert_eq!(Some(2), evens.next().await); + /// assert_eq!(Some(4), evens.next().await); + /// assert_eq!(Some(6), evens.next().await); + /// assert_eq!(Some(8), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter<F>(self, f: F) -> Filter<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + Filter::new(self, f) + } + + /// Filters the values produced by this stream while simultaneously mapping + /// them to a different type according to the provided closure. + /// + /// As values of this stream are made available, the provided function will + /// be run on them. If the predicate `f` resolves to + /// [`Some(item)`](Some) then the stream will yield the value `item`, but if + /// it resolves to [`None`], then the value will be skipped. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter_map`] method in the + /// standard library. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter_map(|x| { + /// if x % 2 == 0 { Some(x + 1) } else { None } + /// }); + /// + /// assert_eq!(Some(3), evens.next().await); + /// assert_eq!(Some(5), evens.next().await); + /// assert_eq!(Some(7), evens.next().await); + /// assert_eq!(Some(9), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> + where + F: FnMut(Self::Item) -> Option<T>, + Self: Sized, + { + FilterMap::new(self, f) + } + + /// Creates a stream which ends after the first `None`. + /// + /// After a stream returns `None`, behavior is undefined. Future calls to + /// `poll_next` may or may not return `Some(T)` again or they may panic. + /// `fuse()` adapts a stream, ensuring that after `None` is given, it will + /// return `None` forever. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{Stream, StreamExt}; + /// + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// // a stream which alternates between Some and None + /// struct Alternate { + /// state: i32, + /// } + /// + /// impl Stream for Alternate { + /// type Item = i32; + /// + /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { + /// let val = self.state; + /// self.state = self.state + 1; + /// + /// // if it's even, Some(i32), else None + /// if val % 2 == 0 { + /// Poll::Ready(Some(val)) + /// } else { + /// Poll::Ready(None) + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let mut stream = Alternate { state: 0 }; + /// + /// // the stream goes back and forth + /// assert_eq!(stream.next().await, Some(0)); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, None); + /// + /// // however, once it is fused + /// let mut stream = stream.fuse(); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, None); + /// + /// // it will always return `None` after the first time. + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn fuse(self) -> Fuse<Self> + where + Self: Sized, + { + Fuse::new(self) + } + + /// Creates a new stream of at most `n` items of the underlying stream. + /// + /// Once `n` items have been yielded from this stream then it will always + /// return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take(3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take(self, n: usize) -> Take<Self> + where + Self: Sized, + { + Take::new(self, n) + } + + /// Take elements from this stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like `Iterator::take_while`, will take elements from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false it will always return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take_while<F>(self, f: F) -> TakeWhile<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + TakeWhile::new(self, f) + } + + /// Creates a new stream that will skip the `n` first items of the + /// underlying stream. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).skip(7); + /// + /// assert_eq!(Some(8), stream.next().await); + /// assert_eq!(Some(9), stream.next().await); + /// assert_eq!(Some(10), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip(self, n: usize) -> Skip<Self> + where + Self: Sized, + { + Skip::new(self, n) + } + + /// Skip elements from the underlying stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like [`Iterator::skip_while`], will ignore elements from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false, the rest of the elements will be yielded. + /// + /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); + /// + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(Some(4), stream.next().await); + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + SkipWhile::new(self, f) + } + + /// Tests if every element of the stream matches a predicate. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn all<F>(&mut self, f: F) -> bool; + /// ``` + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all`. If any of them return `false`, it + /// returns `false`. An empty stream returns `true`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).all(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).all(|&x| x > 2).await); + /// # } + /// ``` + /// + /// Stopping at the first `false`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(!iter.all(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&3)); + /// # } + /// ``` + fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AllFuture::new(self, f) + } + + /// Tests if any element of the stream matches a predicate. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn any<F>(&mut self, f: F) -> bool; + /// ``` + /// + /// `any()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if any of them return + /// `true`, then so does `any()`. If they all return `false`, it + /// returns `false`. + /// + /// `any()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `true`, given that no matter what else happens, + /// the result will also be `true`. + /// + /// An empty stream returns `false`. + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).any(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).any(|&x| x > 5).await); + /// # } + /// ``` + /// + /// Stopping at the first `true`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(iter.any(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&2)); + /// # } + /// ``` + fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AnyFuture::new(self, f) + } + + /// Combine two streams into one by first returning all values from the + /// first stream then all values from the second stream. + /// + /// As long as `self` still has values to emit, no values from `other` are + /// emitted, even if some are ready. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let one = stream::iter(vec![1, 2, 3]); + /// let two = stream::iter(vec![4, 5, 6]); + /// + /// let mut stream = one.chain(two); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn chain<U>(self, other: U) -> Chain<Self, U> + where + U: Stream<Item = Self::Item>, + Self: Sized, + { + Chain::new(self, other) + } + + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn fold<B, F>(self, init: B, f: F) -> B; + /// ``` + /// + /// # Examples + /// Basic usage: + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, *}; + /// + /// let s = stream::iter(vec![1u8, 2, 3]); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # } + /// ``` + fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + FoldFuture::new(self, init, f) + } + + /// Drain stream pushing all emitted values into a collection. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn collect<T>(self) -> T; + /// ``` + /// + /// `collect` streams all values, awaiting as needed. Values are pushed into + /// a collection. A number of different target collection types are + /// supported, including [`Vec`](std::vec::Vec), + /// [`String`](std::string::String), and [`Bytes`]. + /// + /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html + /// + /// # `Result` + /// + /// `collect()` can also be used with streams of type `Result<T, E>` where + /// `T: FromStream<_>`. In this case, `collect()` will stream as long as + /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, + /// streaming is terminated and `collect()` returns the `Err`. + /// + /// # Notes + /// + /// `FromStream` is currently a sealed trait. Stabilization is pending + /// enhancements to the Rust language. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let doubled: Vec<i32> = + /// stream::iter(vec![1, 2, 3]) + /// .map(|x| x * 2) + /// .collect() + /// .await; + /// + /// assert_eq!(vec![2, 4, 6], doubled); + /// } + /// ``` + /// + /// Collecting a stream of `Result` values + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// // A stream containing only `Ok` values will be collected + /// let values: Result<Vec<i32>, &str> = + /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) + /// .collect() + /// .await; + /// + /// assert_eq!(Ok(vec![1, 2, 3]), values); + /// + /// // A stream containing `Err` values will return the first error. + /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; + /// + /// let values: Result<Vec<i32>, &str> = + /// stream::iter(results) + /// .collect() + /// .await; + /// + /// assert_eq!(Err("no"), values); + /// } + /// ``` + fn collect<T>(self) -> Collect<Self, T> + where + T: FromStream<Self::Item>, + Self: Sized, + { + Collect::new(self) + } + + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout()` takes a `Duration` that represents the maximum amount of + /// time each element of the stream has to complete before timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let int_stream = int_stream.timeout(Duration::from_secs(1)); + /// tokio::pin!(int_stream); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout(self, duration: Duration) -> Timeout<Self> + where + Self: Sized, + { + Timeout::new(self, duration) + } + + /// Slows down a stream by enforcing a delay between items. + /// + /// The underlying timer behind this utility has a granularity of one millisecond. + /// + /// # Example + /// + /// Create a throttled stream. + /// ```rust,no_run + /// use std::time::Duration; + /// use tokio_stream::StreamExt; + /// + /// # async fn dox() { + /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); + /// tokio::pin!(item_stream); + /// + /// loop { + /// // The string will be produced at most every 2 seconds + /// println!("{:?}", item_stream.next().await); + /// } + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn throttle(self, duration: Duration) -> Throttle<Self> + where + Self: Sized, + { + throttle(duration, self) + } + + /// Batches the items in the given stream using a maximum duration and size for each batch. + /// + /// This stream returns the next batch of items in the following situations: + /// 1. The inner stream has returned at least `max_size` many items since the last batch. + /// 2. The time since the first item of a batch is greater than the given duration. + /// 3. The end of the stream is reached. + /// + /// The length of the returned vector is never empty or greater than the maximum size. Empty batches + /// will not be emitted if no items are received upstream. + /// + /// # Panics + /// + /// This function panics if `max_size` is zero + /// + /// # Example + /// + /// ```rust + /// use std::time::Duration; + /// use tokio::time; + /// use tokio_stream::{self as stream, StreamExt}; + /// use futures::FutureExt; + /// + /// #[tokio::main] + /// # async fn _unused() {} + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// async fn main() { + /// let iter = vec![1, 2, 3, 4].into_iter(); + /// let stream0 = stream::iter(iter); + /// + /// let iter = vec![5].into_iter(); + /// let stream1 = stream::iter(iter) + /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + /// + /// let chunk_stream = stream0 + /// .chain(stream1) + /// .chunks_timeout(3, Duration::from_secs(2)); + /// tokio::pin!(chunk_stream); + /// + /// // a full batch was received + /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + /// // deadline was reached before max_size was reached + /// assert_eq!(chunk_stream.next().await, Some(vec![4])); + /// // last element in the stream + /// assert_eq!(chunk_stream.next().await, Some(vec![5])); + /// } + /// ``` + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + #[track_caller] + fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> + where + Self: Sized, + { + assert!(max_size > 0, "`max_size` must be non-zero."); + ChunksTimeout::new(self, max_size, duration) + } +} + +impl<St: ?Sized> StreamExt for St where St: Stream {} + +/// Merge the size hints from two streams. +fn merge_size_hints( + (left_low, left_high): (usize, Option<usize>), + (right_low, right_high): (usize, Option<usize>), +) -> (usize, Option<usize>) { + let low = left_low.saturating_add(right_low); + let high = match (left_high, right_high) { + (Some(h1), Some(h2)) => h1.checked_add(h2), + _ => None, + }; + (low, high) +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/all.rs b/third_party/rust/tokio-stream/src/stream_ext/all.rs new file mode 100644 index 0000000000..b4dbc1e97c --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/all.rs @@ -0,0 +1,58 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`all`](super::StreamExt::all) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct AllFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<'a, St: ?Sized, F> AllFuture<'a, St, F> { + pub(super) fn new(stream: &'a mut St, f: F) -> Self { + Self { + stream, + f, + _pin: PhantomPinned, + } + } +} + +impl<St, F> Future for AllFuture<'_, St, F> +where + St: ?Sized + Stream + Unpin, + F: FnMut(St::Item) -> bool, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + let mut stream = Pin::new(me.stream); + + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if !(me.f)(v) { + return Poll::Ready(false); + } + } + None => return Poll::Ready(true), + } + } + + cx.waker().wake_by_ref(); + Poll::Pending + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/any.rs b/third_party/rust/tokio-stream/src/stream_ext/any.rs new file mode 100644 index 0000000000..31394f249b --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/any.rs @@ -0,0 +1,58 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`any`](super::StreamExt::any) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct AnyFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> { + pub(super) fn new(stream: &'a mut St, f: F) -> Self { + Self { + stream, + f, + _pin: PhantomPinned, + } + } +} + +impl<St, F> Future for AnyFuture<'_, St, F> +where + St: ?Sized + Stream + Unpin, + F: FnMut(St::Item) -> bool, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + let mut stream = Pin::new(me.stream); + + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if (me.f)(v) { + return Poll::Ready(true); + } + } + None => return Poll::Ready(false), + } + } + + cx.waker().wake_by_ref(); + Poll::Pending + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/chain.rs b/third_party/rust/tokio-stream/src/stream_ext/chain.rs new file mode 100644 index 0000000000..bd64f33ce4 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/chain.rs @@ -0,0 +1,50 @@ +use crate::stream_ext::Fuse; +use crate::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`chain`](super::StreamExt::chain) method. + pub struct Chain<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: U, + } +} + +impl<T, U> Chain<T, U> { + pub(super) fn new(a: T, b: U) -> Chain<T, U> + where + T: Stream, + U: Stream, + { + Chain { a: Fuse::new(a), b } + } +} + +impl<T, U> Stream for Chain<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + use Poll::Ready; + + let me = self.project(); + + if let Some(v) = ready!(me.a.poll_next(cx)) { + return Ready(Some(v)); + } + + me.b.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + super::merge_size_hints(self.a.size_hint(), self.b.size_hint()) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs new file mode 100644 index 0000000000..48acd9328b --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -0,0 +1,86 @@ +use crate::stream_ext::Fuse; +use crate::Stream; +use tokio::time::{sleep, Sleep}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct ChunksTimeout<S: Stream> { + #[pin] + stream: Fuse<S>, + #[pin] + deadline: Option<Sleep>, + duration: Duration, + items: Vec<S::Item>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl<S: Stream> ChunksTimeout<S> { + pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { + ChunksTimeout { + stream: Fuse::new(stream), + deadline: None, + duration, + items: Vec::with_capacity(max_size), + cap: max_size, + } + } +} + +impl<S: Stream> Stream for ChunksTimeout<S> { + type Item = Vec<S::Item>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut me = self.as_mut().project(); + loop { + match me.stream.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(Some(item)) => { + if me.items.is_empty() { + me.deadline.set(Some(sleep(*me.duration))); + me.items.reserve_exact(*me.cap); + } + me.items.push(item); + if me.items.len() >= *me.cap { + return Poll::Ready(Some(std::mem::take(me.items))); + } + } + Poll::Ready(None) => { + // Returning Some here is only correct because we fuse the inner stream. + let last = if me.items.is_empty() { + None + } else { + Some(std::mem::take(me.items)) + }; + + return Poll::Ready(last); + } + } + } + + if !me.items.is_empty() { + if let Some(deadline) = me.deadline.as_pin_mut() { + ready!(deadline.poll(cx)); + } + return Poll::Ready(Some(std::mem::take(me.items))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = (lower / self.cap).saturating_add(chunk_len); + let upper = upper.and_then(|x| x.checked_add(chunk_len)); + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/collect.rs b/third_party/rust/tokio-stream/src/stream_ext/collect.rs new file mode 100644 index 0000000000..8548b74556 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/collect.rs @@ -0,0 +1,229 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +// Do not export this struct until `FromStream` can be unsealed. +pin_project! { + /// Future returned by the [`collect`](super::StreamExt::collect) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct Collect<T, U> + where + T: Stream, + U: FromStream<T::Item>, + { + #[pin] + stream: T, + collection: U::InternalCollection, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +/// Convert from a [`Stream`](crate::Stream). +/// +/// This trait is not intended to be used directly. Instead, call +/// [`StreamExt::collect()`](super::StreamExt::collect). +/// +/// # Implementing +/// +/// Currently, this trait may not be implemented by third parties. The trait is +/// sealed in order to make changes in the future. Stabilization is pending +/// enhancements to the Rust language. +pub trait FromStream<T>: sealed::FromStreamPriv<T> {} + +impl<T, U> Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + pub(super) fn new(stream: T) -> Collect<T, U> { + let (lower, upper) = stream.size_hint(); + let collection = U::initialize(sealed::Internal, lower, upper); + + Collect { + stream, + collection, + _pin: PhantomPinned, + } + } +} + +impl<T, U> Future for Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + type Output = U; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> { + use Poll::Ready; + + loop { + let me = self.as_mut().project(); + + let item = match ready!(me.stream.poll_next(cx)) { + Some(item) => item, + None => { + return Ready(U::finalize(sealed::Internal, me.collection)); + } + }; + + if !U::extend(sealed::Internal, me.collection, item) { + return Ready(U::finalize(sealed::Internal, me.collection)); + } + } + } +} + +// ===== FromStream implementations + +impl FromStream<()> for () {} + +impl sealed::FromStreamPriv<()> for () { + type InternalCollection = (); + + fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {} + + fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool { + true + } + + fn finalize(_: sealed::Internal, _collection: &mut ()) {} +} + +impl<T: AsRef<str>> FromStream<T> for String {} + +impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { + type InternalCollection = String; + + fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String { + String::new() + } + + fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool { + collection.push_str(item.as_ref()); + true + } + + fn finalize(_: sealed::Internal, collection: &mut String) -> String { + mem::take(collection) + } +} + +impl<T> FromStream<T> for Vec<T> {} + +impl<T> sealed::FromStreamPriv<T> for Vec<T> { + type InternalCollection = Vec<T>; + + fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> { + Vec::with_capacity(lower) + } + + fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> { + mem::take(collection) + } +} + +impl<T> FromStream<T> for Box<[T]> {} + +impl<T> sealed::FromStreamPriv<T> for Box<[T]> { + type InternalCollection = Vec<T>; + + fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> { + <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper) + } + + fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { + <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item) + } + + fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> { + <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection) + .into_boxed_slice() + } +} + +impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {} + +impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E> +where + U: FromStream<T>, +{ + type InternalCollection = Result<U::InternalCollection, E>; + + fn initialize( + _: sealed::Internal, + lower: usize, + upper: Option<usize>, + ) -> Result<U::InternalCollection, E> { + Ok(U::initialize(sealed::Internal, lower, upper)) + } + + fn extend( + _: sealed::Internal, + collection: &mut Self::InternalCollection, + item: Result<T, E>, + ) -> bool { + assert!(collection.is_ok()); + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(sealed::Internal, collection, item) + } + Err(err) => { + *collection = Err(err); + false + } + } + } + + fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> { + if let Ok(collection) = collection.as_mut() { + Ok(U::finalize(sealed::Internal, collection)) + } else { + let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); + + Err(res.map(drop).unwrap_err()) + } + } +} + +pub(crate) mod sealed { + #[doc(hidden)] + pub trait FromStreamPriv<T> { + /// Intermediate type used during collection process + /// + /// The name of this type is internal and cannot be relied upon. + type InternalCollection; + + /// Initialize the collection + fn initialize( + internal: Internal, + lower: usize, + upper: Option<usize>, + ) -> Self::InternalCollection; + + /// Extend the collection with the received item + /// + /// Return `true` to continue streaming, `false` complete collection. + fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool; + + /// Finalize collection into target type. + fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self; + } + + #[allow(missing_debug_implementations)] + pub struct Internal; +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/filter.rs b/third_party/rust/tokio-stream/src/stream_ext/filter.rs new file mode 100644 index 0000000000..f3dd8716b4 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/filter.rs @@ -0,0 +1,58 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`filter`](super::StreamExt::filter) method. + #[must_use = "streams do nothing unless polled"] + pub struct Filter<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Filter<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Filter") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> Filter<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f } + } +} + +impl<St, F> Stream for Filter<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if (self.as_mut().project().f)(&e) { + return Poll::Ready(Some(e)); + } + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs b/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs new file mode 100644 index 0000000000..fe604a6f4b --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs @@ -0,0 +1,58 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`filter_map`](super::StreamExt::filter_map) method. + #[must_use = "streams do nothing unless polled"] + pub struct FilterMap<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for FilterMap<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FilterMap") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> FilterMap<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f } + } +} + +impl<St, F, T> Stream for FilterMap<St, F> +where + St: Stream, + F: FnMut(St::Item) -> Option<T>, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if let Some(e) = (self.as_mut().project().f)(e) { + return Poll::Ready(Some(e)); + } + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/fold.rs b/third_party/rust/tokio-stream/src/stream_ext/fold.rs new file mode 100644 index 0000000000..e2e97d8f37 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/fold.rs @@ -0,0 +1,57 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future returned by the [`fold`](super::StreamExt::fold) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct FoldFuture<St, B, F> { + #[pin] + stream: St, + acc: Option<B>, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<St, B, F> FoldFuture<St, B, F> { + pub(super) fn new(stream: St, init: B, f: F) -> Self { + Self { + stream, + acc: Some(init), + f, + _pin: PhantomPinned, + } + } +} + +impl<St, B, F> Future for FoldFuture<St, B, F> +where + St: Stream, + F: FnMut(B, St::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + let next = ready!(me.stream.as_mut().poll_next(cx)); + + match next { + Some(v) => { + let old = me.acc.take().unwrap(); + let new = (me.f)(old, v); + *me.acc = Some(new); + } + None => return Poll::Ready(me.acc.take().unwrap()), + } + } + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/fuse.rs b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs new file mode 100644 index 0000000000..2500641d95 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs @@ -0,0 +1,53 @@ +use crate::Stream; + +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Stream returned by [`fuse()`][super::StreamExt::fuse]. + #[derive(Debug)] + pub struct Fuse<T> { + #[pin] + stream: Option<T>, + } +} + +impl<T> Fuse<T> +where + T: Stream, +{ + pub(crate) fn new(stream: T) -> Fuse<T> { + Fuse { + stream: Some(stream), + } + } +} + +impl<T> Stream for Fuse<T> +where + T: Stream, +{ + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let res = match Option::as_pin_mut(self.as_mut().project().stream) { + Some(stream) => ready!(stream.poll_next(cx)), + None => return Poll::Ready(None), + }; + + if res.is_none() { + // Do not poll the stream anymore + self.as_mut().project().stream.set(None); + } + + Poll::Ready(res) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self.stream { + Some(ref stream) => stream.size_hint(), + None => (0, Some(0)), + } + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/map.rs b/third_party/rust/tokio-stream/src/stream_ext/map.rs new file mode 100644 index 0000000000..e6b47cd258 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/map.rs @@ -0,0 +1,51 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map`](super::StreamExt::map) method. + #[must_use = "streams do nothing unless polled"] + pub struct Map<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Map<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Map").field("stream", &self.stream).finish() + } +} + +impl<St, F> Map<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Map { stream, f } + } +} + +impl<St, F, T> Stream for Map<St, F> +where + St: Stream, + F: FnMut(St::Item) -> T, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.as_mut() + .project() + .stream + .poll_next(cx) + .map(|opt| opt.map(|x| (self.as_mut().project().f)(x))) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/map_while.rs b/third_party/rust/tokio-stream/src/stream_ext/map_while.rs new file mode 100644 index 0000000000..d4fd825656 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/map_while.rs @@ -0,0 +1,52 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map_while`](super::StreamExt::map_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct MapWhile<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for MapWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapWhile") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> MapWhile<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + MapWhile { stream, f } + } +} + +impl<St, F, T> Stream for MapWhile<St, F> +where + St: Stream, + F: FnMut(St::Item) -> Option<T>, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let me = self.project(); + let f = me.f; + me.stream.poll_next(cx).map(|opt| opt.and_then(f)) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (_, upper) = self.stream.size_hint(); + (0, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/merge.rs b/third_party/rust/tokio-stream/src/stream_ext/merge.rs new file mode 100644 index 0000000000..9d5123c85a --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/merge.rs @@ -0,0 +1,90 @@ +use crate::stream_ext::Fuse; +use crate::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`merge`](super::StreamExt::merge) method. + pub struct Merge<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: Fuse<U>, + // When `true`, poll `a` first, otherwise, `poll` b`. + a_first: bool, + } +} + +impl<T, U> Merge<T, U> { + pub(super) fn new(a: T, b: U) -> Merge<T, U> + where + T: Stream, + U: Stream, + { + Merge { + a: Fuse::new(a), + b: Fuse::new(b), + a_first: true, + } + } +} + +impl<T, U> Stream for Merge<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let me = self.project(); + let a_first = *me.a_first; + + // Toggle the flag + *me.a_first = !a_first; + + if a_first { + poll_next(me.a, me.b, cx) + } else { + poll_next(me.b, me.a, cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + super::merge_size_hints(self.a.size_hint(), self.b.size_hint()) + } +} + +fn poll_next<T, U>( + first: Pin<&mut T>, + second: Pin<&mut U>, + cx: &mut Context<'_>, +) -> Poll<Option<T::Item>> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + use Poll::*; + + let mut done = true; + + match first.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + match second.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + if done { + Ready(None) + } else { + Pending + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/next.rs b/third_party/rust/tokio-stream/src/stream_ext/next.rs new file mode 100644 index 0000000000..706069fa6e --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/next.rs @@ -0,0 +1,44 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`next`](super::StreamExt::next) method. + /// + /// # Cancel safety + /// + /// This method is cancel safe. It only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Next<'a, St: ?Sized> { + stream: &'a mut St, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<'a, St: ?Sized> Next<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Next { + stream, + _pin: PhantomPinned, + } + } +} + +impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { + type Output = Option<St::Item>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + Pin::new(me.stream).poll_next(cx) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/skip.rs b/third_party/rust/tokio-stream/src/stream_ext/skip.rs new file mode 100644 index 0000000000..80a0a0aff0 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/skip.rs @@ -0,0 +1,63 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip`](super::StreamExt::skip) method. + #[must_use = "streams do nothing unless polled"] + pub struct Skip<St> { + #[pin] + stream: St, + remaining: usize, + } +} + +impl<St> fmt::Debug for Skip<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Skip") + .field("stream", &self.stream) + .finish() + } +} + +impl<St> Skip<St> { + pub(super) fn new(stream: St, remaining: usize) -> Self { + Self { stream, remaining } + } +} + +impl<St> Stream for Skip<St> +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if self.remaining == 0 { + return Poll::Ready(Some(e)); + } + *self.as_mut().project().remaining -= 1; + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_sub(self.remaining); + let upper = upper.map(|x| x.saturating_sub(self.remaining)); + + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs b/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs new file mode 100644 index 0000000000..985a92666e --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs @@ -0,0 +1,73 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct SkipWhile<St, F> { + #[pin] + stream: St, + predicate: Option<F>, + } +} + +impl<St, F> fmt::Debug for SkipWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SkipWhile") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> SkipWhile<St, F> { + pub(super) fn new(stream: St, predicate: F) -> Self { + Self { + stream, + predicate: Some(predicate), + } + } +} + +impl<St, F> Stream for SkipWhile<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + if let Some(predicate) = this.predicate { + loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + if !(predicate)(&item) { + *this.predicate = None; + return Poll::Ready(Some(item)); + } + } + None => return Poll::Ready(None), + } + } + } else { + this.stream.poll_next(cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + if self.predicate.is_some() { + return (0, upper); + } + + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/take.rs b/third_party/rust/tokio-stream/src/stream_ext/take.rs new file mode 100644 index 0000000000..c75648f606 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/take.rs @@ -0,0 +1,76 @@ +use crate::Stream; + +use core::cmp; +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`take`](super::StreamExt::take) method. + #[must_use = "streams do nothing unless polled"] + pub struct Take<St> { + #[pin] + stream: St, + remaining: usize, + } +} + +impl<St> fmt::Debug for Take<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Take") + .field("stream", &self.stream) + .finish() + } +} + +impl<St> Take<St> { + pub(super) fn new(stream: St, remaining: usize) -> Self { + Self { stream, remaining } + } +} + +impl<St> Stream for Take<St> +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if *self.as_mut().project().remaining > 0 { + self.as_mut().project().stream.poll_next(cx).map(|ready| { + match &ready { + Some(_) => { + *self.as_mut().project().remaining -= 1; + } + None => { + *self.as_mut().project().remaining = 0; + } + } + ready + }) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.remaining == 0 { + return (0, Some(0)); + } + + let (lower, upper) = self.stream.size_hint(); + + let lower = cmp::min(lower, self.remaining as usize); + + let upper = match upper { + Some(x) if x < self.remaining as usize => Some(x), + _ => Some(self.remaining as usize), + }; + + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/take_while.rs b/third_party/rust/tokio-stream/src/stream_ext/take_while.rs new file mode 100644 index 0000000000..5ce4dd98a9 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/take_while.rs @@ -0,0 +1,79 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`take_while`](super::StreamExt::take_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct TakeWhile<St, F> { + #[pin] + stream: St, + predicate: F, + done: bool, + } +} + +impl<St, F> fmt::Debug for TakeWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TakeWhile") + .field("stream", &self.stream) + .field("done", &self.done) + .finish() + } +} + +impl<St, F> TakeWhile<St, F> { + pub(super) fn new(stream: St, predicate: F) -> Self { + Self { + stream, + predicate, + done: false, + } + } +} + +impl<St, F> Stream for TakeWhile<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if !*self.as_mut().project().done { + self.as_mut().project().stream.poll_next(cx).map(|ready| { + let ready = ready.and_then(|item| { + if !(self.as_mut().project().predicate)(&item) { + None + } else { + Some(item) + } + }); + + if ready.is_none() { + *self.as_mut().project().done = true; + } + + ready + }) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.done { + return (0, Some(0)); + } + + let (_, upper) = self.stream.size_hint(); + + (0, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/then.rs b/third_party/rust/tokio-stream/src/stream_ext/then.rs new file mode 100644 index 0000000000..cc7caa721e --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/then.rs @@ -0,0 +1,83 @@ +use crate::Stream; + +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`then`](super::StreamExt::then) method. + #[must_use = "streams do nothing unless polled"] + pub struct Then<St, Fut, F> { + #[pin] + stream: St, + #[pin] + future: Option<Fut>, + f: F, + } +} + +impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Then") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, Fut, F> Then<St, Fut, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Then { + stream, + future: None, + f, + } + } +} + +impl<St, F, Fut> Stream for Then<St, Fut, F> +where + St: Stream, + Fut: Future, + F: FnMut(St::Item) -> Fut, +{ + type Item = Fut::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> { + let mut me = self.project(); + + loop { + if let Some(future) = me.future.as_mut().as_pin_mut() { + match future.poll(cx) { + Poll::Ready(item) => { + me.future.set(None); + return Poll::Ready(Some(item)); + } + Poll::Pending => return Poll::Pending, + } + } + + match me.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => { + me.future.set(Some((me.f)(item))); + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let future_len = usize::from(self.future.is_some()); + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_add(future_len); + let upper = upper.and_then(|upper| upper.checked_add(future_len)); + + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/throttle.rs b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs new file mode 100644 index 0000000000..50001392ee --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs @@ -0,0 +1,96 @@ +//! Slow down a stream by enforcing a delay between items. + +use crate::Stream; +use tokio::time::{Duration, Instant, Sleep}; + +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; + +use pin_project_lite::pin_project; + +pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T> +where + T: Stream, +{ + Throttle { + delay: tokio::time::sleep_until(Instant::now() + duration), + duration, + has_delayed: true, + stream, + } +} + +pin_project! { + /// Stream for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to + /// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Throttle<T> { + #[pin] + delay: Sleep, + duration: Duration, + + // Set to true when `delay` has returned ready, but `stream` hasn't. + has_delayed: bool, + + // The stream to throttle + #[pin] + stream: T, + } +} + +impl<T> Throttle<T> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this combinator + /// is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the stream + /// which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so care + /// should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.stream + } +} + +impl<T: Stream> Stream for Throttle<T> { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + let mut me = self.project(); + let dur = *me.duration; + + if !*me.has_delayed && !is_zero(dur) { + ready!(me.delay.as_mut().poll(cx)); + *me.has_delayed = true; + } + + let value = ready!(me.stream.poll_next(cx)); + + if value.is_some() { + if !is_zero(dur) { + me.delay.reset(Instant::now() + dur); + } + + *me.has_delayed = false; + } + + Poll::Ready(value) + } +} + +fn is_zero(dur: Duration) -> bool { + dur == Duration::from_millis(0) +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/timeout.rs new file mode 100644 index 0000000000..a440d203ec --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/timeout.rs @@ -0,0 +1,107 @@ +use crate::stream_ext::Fuse; +use crate::Stream; +use tokio::time::{Instant, Sleep}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::fmt; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`timeout`](super::StreamExt::timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct Timeout<S> { + #[pin] + stream: Fuse<S>, + #[pin] + deadline: Sleep, + duration: Duration, + poll_deadline: bool, + } +} + +/// Error returned by `Timeout`. +#[derive(Debug, PartialEq, Eq)] +pub struct Elapsed(()); + +impl<S: Stream> Timeout<S> { + pub(super) fn new(stream: S, duration: Duration) -> Self { + let next = Instant::now() + duration; + let deadline = tokio::time::sleep_until(next); + + Timeout { + stream: Fuse::new(stream), + deadline, + duration, + poll_deadline: true, + } + } +} + +impl<S: Stream> Stream for Timeout<S> { + type Item = Result<S::Item, Elapsed>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let me = self.project(); + + match me.stream.poll_next(cx) { + Poll::Ready(v) => { + if v.is_some() { + let next = Instant::now() + *me.duration; + me.deadline.reset(next); + *me.poll_deadline = true; + } + return Poll::Ready(v.map(Ok)); + } + Poll::Pending => {} + }; + + if *me.poll_deadline { + ready!(me.deadline.poll(cx)); + *me.poll_deadline = false; + return Poll::Ready(Some(Err(Elapsed::new()))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + // The timeout stream may insert an error before and after each message + // from the underlying stream, but no more than one error between each + // message. Hence the upper bound is computed as 2x+1. + + // Using a helper function to enable use of question mark operator. + fn twice_plus_one(value: Option<usize>) -> Option<usize> { + value?.checked_mul(2)?.checked_add(1) + } + + (lower, twice_plus_one(upper)) + } +} + +// ===== impl Elapsed ===== + +impl Elapsed { + pub(crate) fn new() -> Self { + Elapsed(()) + } +} + +impl fmt::Display for Elapsed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + "deadline has elapsed".fmt(fmt) + } +} + +impl std::error::Error for Elapsed {} + +impl From<Elapsed> for std::io::Error { + fn from(_err: Elapsed) -> std::io::Error { + std::io::ErrorKind::TimedOut.into() + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/try_next.rs b/third_party/rust/tokio-stream/src/stream_ext/try_next.rs new file mode 100644 index 0000000000..93aa3bc15f --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/try_next.rs @@ -0,0 +1,45 @@ +use crate::stream_ext::Next; +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`try_next`](super::StreamExt::try_next) method. + /// + /// # Cancel safety + /// + /// This method is cancel safe. It only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryNext<'a, St: ?Sized> { + #[pin] + inner: Next<'a, St>, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<'a, St: ?Sized> TryNext<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Self { + inner: Next::new(stream), + _pin: PhantomPinned, + } + } +} + +impl<T, E, St: ?Sized + Stream<Item = Result<T, E>> + Unpin> Future for TryNext<'_, St> { + type Output = Result<Option<T>, E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + me.inner.poll(cx).map(Option::transpose) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_map.rs b/third_party/rust/tokio-stream/src/stream_map.rs new file mode 100644 index 0000000000..215980474b --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_map.rs @@ -0,0 +1,690 @@ +use crate::Stream; + +use std::borrow::Borrow; +use std::hash::Hash; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Combine many streams into one, indexing each source stream with a unique +/// key. +/// +/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source +/// streams into a single merged stream that yields values in the order that +/// they arrive from the source streams. However, `StreamMap` has a lot more +/// flexibility in usage patterns. +/// +/// `StreamMap` can: +/// +/// * Merge an arbitrary number of streams. +/// * Track which source stream the value was received from. +/// * Handle inserting and removing streams from the set of managed streams at +/// any point during iteration. +/// +/// All source streams held by `StreamMap` are indexed using a key. This key is +/// included with the value when a source stream yields a value. The key is also +/// used to remove the stream from the `StreamMap` before the stream has +/// completed streaming. +/// +/// # `Unpin` +/// +/// Because the `StreamMap` API moves streams during runtime, both streams and +/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a +/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to +/// pin the stream in the heap. +/// +/// # Implementation +/// +/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this +/// internal implementation detail will persist in future versions, but it is +/// important to know the runtime implications. In general, `StreamMap` works +/// best with a "smallish" number of streams as all entries are scanned on +/// insert, remove, and polling. In cases where a large number of streams need +/// to be merged, it may be advisable to use tasks sending values on a shared +/// [`mpsc`] channel. +/// +/// [`StreamExt::merge`]: crate::StreamExt::merge +/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html +/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html +/// [`Box::pin`]: std::boxed::Box::pin +/// +/// # Examples +/// +/// Merging two streams, then remove them after receiving the first value +/// +/// ``` +/// use tokio_stream::{StreamExt, StreamMap, Stream}; +/// use tokio::sync::mpsc; +/// use std::pin::Pin; +/// +/// #[tokio::main] +/// async fn main() { +/// let (tx1, mut rx1) = mpsc::channel::<usize>(10); +/// let (tx2, mut rx2) = mpsc::channel::<usize>(10); +/// +/// // Convert the channels to a `Stream`. +/// let rx1 = Box::pin(async_stream::stream! { +/// while let Some(item) = rx1.recv().await { +/// yield item; +/// } +/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; +/// +/// let rx2 = Box::pin(async_stream::stream! { +/// while let Some(item) = rx2.recv().await { +/// yield item; +/// } +/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; +/// +/// tokio::spawn(async move { +/// tx1.send(1).await.unwrap(); +/// +/// // This value will never be received. The send may or may not return +/// // `Err` depending on if the remote end closed first or not. +/// let _ = tx1.send(2).await; +/// }); +/// +/// tokio::spawn(async move { +/// tx2.send(3).await.unwrap(); +/// let _ = tx2.send(4).await; +/// }); +/// +/// let mut map = StreamMap::new(); +/// +/// // Insert both streams +/// map.insert("one", rx1); +/// map.insert("two", rx2); +/// +/// // Read twice +/// for _ in 0..2 { +/// let (key, val) = map.next().await.unwrap(); +/// +/// if key == "one" { +/// assert_eq!(val, 1); +/// } else { +/// assert_eq!(val, 3); +/// } +/// +/// // Remove the stream to prevent reading the next value +/// map.remove(key); +/// } +/// } +/// ``` +/// +/// This example models a read-only client to a chat system with channels. The +/// client sends commands to join and leave channels. `StreamMap` is used to +/// manage active channel subscriptions. +/// +/// For simplicity, messages are displayed with `println!`, but they could be +/// sent to the client over a socket. +/// +/// ```no_run +/// use tokio_stream::{Stream, StreamExt, StreamMap}; +/// +/// enum Command { +/// Join(String), +/// Leave(String), +/// } +/// +/// fn commands() -> impl Stream<Item = Command> { +/// // Streams in user commands by parsing `stdin`. +/// # tokio_stream::pending() +/// } +/// +/// // Join a channel, returns a stream of messages received on the channel. +/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { +/// // left as an exercise to the reader +/// # tokio_stream::pending() +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let mut channels = StreamMap::new(); +/// +/// // Input commands (join / leave channels). +/// let cmds = commands(); +/// tokio::pin!(cmds); +/// +/// loop { +/// tokio::select! { +/// Some(cmd) = cmds.next() => { +/// match cmd { +/// Command::Join(chan) => { +/// // Join the channel and add it to the `channels` +/// // stream map +/// let msgs = join(&chan); +/// channels.insert(chan, msgs); +/// } +/// Command::Leave(chan) => { +/// channels.remove(&chan); +/// } +/// } +/// } +/// Some((chan, msg)) = channels.next() => { +/// // Received a message, display it on stdout with the channel +/// // it originated from. +/// println!("{}: {}", chan, msg); +/// } +/// // Both the `commands` stream and the `channels` stream are +/// // complete. There is no more work to do, so leave the loop. +/// else => break, +/// } +/// } +/// } +/// ``` +#[derive(Debug)] +pub struct StreamMap<K, V> { + /// Streams stored in the map + entries: Vec<(K, V)>, +} + +impl<K, V> StreamMap<K, V> { + /// An iterator visiting all key-value pairs in arbitrary order. + /// + /// The iterator element type is &'a (K, V). + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for (key, stream) in map.iter() { + /// println!("({}, {:?})", key, stream); + /// } + /// ``` + pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { + self.entries.iter() + } + + /// An iterator visiting all key-value pairs mutably in arbitrary order. + /// + /// The iterator element type is &'a mut (K, V). + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for (key, stream) in map.iter_mut() { + /// println!("({}, {:?})", key, stream); + /// } + /// ``` + pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { + self.entries.iter_mut() + } + + /// Creates an empty `StreamMap`. + /// + /// The stream map is initially created with a capacity of `0`, so it will + /// not allocate until it is first inserted into. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); + /// ``` + pub fn new() -> StreamMap<K, V> { + StreamMap { entries: vec![] } + } + + /// Creates an empty `StreamMap` with the specified capacity. + /// + /// The stream map will be able to hold at least `capacity` elements without + /// reallocating. If `capacity` is 0, the stream map will not allocate. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); + /// ``` + pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { + StreamMap { + entries: Vec::with_capacity(capacity), + } + } + + /// Returns an iterator visiting all keys in arbitrary order. + /// + /// The iterator element type is &'a K. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for key in map.keys() { + /// println!("{}", key); + /// } + /// ``` + pub fn keys(&self) -> impl Iterator<Item = &K> { + self.iter().map(|(k, _)| k) + } + + /// An iterator visiting all values in arbitrary order. + /// + /// The iterator element type is &'a V. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for stream in map.values() { + /// println!("{:?}", stream); + /// } + /// ``` + pub fn values(&self) -> impl Iterator<Item = &V> { + self.iter().map(|(_, v)| v) + } + + /// An iterator visiting all values mutably in arbitrary order. + /// + /// The iterator element type is &'a mut V. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for stream in map.values_mut() { + /// println!("{:?}", stream); + /// } + /// ``` + pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { + self.iter_mut().map(|(_, v)| v) + } + + /// Returns the number of streams the map can hold without reallocating. + /// + /// This number is a lower bound; the `StreamMap` might be able to hold + /// more, but is guaranteed to be able to hold at least this many. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); + /// assert!(map.capacity() >= 100); + /// ``` + pub fn capacity(&self) -> usize { + self.entries.capacity() + } + + /// Returns the number of streams in the map. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut a = StreamMap::new(); + /// assert_eq!(a.len(), 0); + /// a.insert(1, pending::<i32>()); + /// assert_eq!(a.len(), 1); + /// ``` + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Returns `true` if the map contains no elements. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut a = StreamMap::new(); + /// assert!(a.is_empty()); + /// a.insert(1, pending::<i32>()); + /// assert!(!a.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Clears the map, removing all key-stream pairs. Keeps the allocated + /// memory for reuse. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut a = StreamMap::new(); + /// a.insert(1, pending::<i32>()); + /// a.clear(); + /// assert!(a.is_empty()); + /// ``` + pub fn clear(&mut self) { + self.entries.clear(); + } + + /// Insert a key-stream pair into the map. + /// + /// If the map did not have this key present, `None` is returned. + /// + /// If the map did have this key present, the new `stream` replaces the old + /// one and the old stream is returned. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// assert!(map.insert(37, pending::<i32>()).is_none()); + /// assert!(!map.is_empty()); + /// + /// map.insert(37, pending()); + /// assert!(map.insert(37, pending()).is_some()); + /// ``` + pub fn insert(&mut self, k: K, stream: V) -> Option<V> + where + K: Hash + Eq, + { + let ret = self.remove(&k); + self.entries.push((k, stream)); + + ret + } + + /// Removes a key from the map, returning the stream at the key if the key was previously in the map. + /// + /// The key may be any borrowed form of the map's key type, but `Hash` and + /// `Eq` on the borrowed form must match those for the key type. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// map.insert(1, pending::<i32>()); + /// assert!(map.remove(&1).is_some()); + /// assert!(map.remove(&1).is_none()); + /// ``` + pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> + where + K: Borrow<Q>, + Q: Hash + Eq, + { + for i in 0..self.entries.len() { + if self.entries[i].0.borrow() == k { + return Some(self.entries.swap_remove(i).1); + } + } + + None + } + + /// Returns `true` if the map contains a stream for the specified key. + /// + /// The key may be any borrowed form of the map's key type, but `Hash` and + /// `Eq` on the borrowed form must match those for the key type. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// map.insert(1, pending::<i32>()); + /// assert_eq!(map.contains_key(&1), true); + /// assert_eq!(map.contains_key(&2), false); + /// ``` + pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool + where + K: Borrow<Q>, + Q: Hash + Eq, + { + for i in 0..self.entries.len() { + if self.entries[i].0.borrow() == k { + return true; + } + } + + false + } +} + +impl<K, V> StreamMap<K, V> +where + K: Unpin, + V: Stream + Unpin, +{ + /// Polls the next value, includes the vec entry index + fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { + use Poll::*; + + let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; + let mut idx = start; + + for _ in 0..self.entries.len() { + let (_, stream) = &mut self.entries[idx]; + + match Pin::new(stream).poll_next(cx) { + Ready(Some(val)) => return Ready(Some((idx, val))), + Ready(None) => { + // Remove the entry + self.entries.swap_remove(idx); + + // Check if this was the last entry, if so the cursor needs + // to wrap + if idx == self.entries.len() { + idx = 0; + } else if idx < start && start <= self.entries.len() { + // The stream being swapped into the current index has + // already been polled, so skip it. + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + Pending => { + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + } + + // If the map is empty, then the stream is complete. + if self.entries.is_empty() { + Ready(None) + } else { + Pending + } + } +} + +impl<K, V> Default for StreamMap<K, V> { + fn default() -> Self { + Self::new() + } +} + +impl<K, V> Stream for StreamMap<K, V> +where + K: Clone + Unpin, + V: Stream + Unpin, +{ + type Item = (K, V::Item); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { + let key = self.entries[idx].0.clone(); + Poll::Ready(Some((key, val))) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let mut ret = (0, Some(0)); + + for (_, stream) in &self.entries { + let hint = stream.size_hint(); + + ret.0 += hint.0; + + match (ret.1, hint.1) { + (Some(a), Some(b)) => ret.1 = Some(a + b), + (Some(_), None) => ret.1 = None, + _ => {} + } + } + + ret + } +} + +impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V> +where + K: Hash + Eq, +{ + fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { + let iterator = iter.into_iter(); + let (lower_bound, _) = iterator.size_hint(); + let mut stream_map = Self::with_capacity(lower_bound); + + for (key, value) in iterator { + stream_map.insert(key, value); + } + + stream_map + } +} + +impl<K, V> Extend<(K, V)> for StreamMap<K, V> { + fn extend<T>(&mut self, iter: T) + where + T: IntoIterator<Item = (K, V)>, + { + self.entries.extend(iter); + } +} + +mod rand { + use std::cell::Cell; + + mod loom { + #[cfg(not(loom))] + pub(crate) mod rand { + use std::collections::hash_map::RandomState; + use std::hash::{BuildHasher, Hash, Hasher}; + use std::sync::atomic::AtomicU32; + use std::sync::atomic::Ordering::Relaxed; + + static COUNTER: AtomicU32 = AtomicU32::new(1); + + pub(crate) fn seed() -> u64 { + let rand_state = RandomState::new(); + + let mut hasher = rand_state.build_hasher(); + + // Hash some unique-ish data to generate some new state + COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); + + // Get the seed + hasher.finish() + } + } + + #[cfg(loom)] + pub(crate) mod rand { + pub(crate) fn seed() -> u64 { + 1 + } + } + } + + /// Fast random number generate + /// + /// Implement xorshift64+: 2 32-bit xorshift sequences added together. + /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's + /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> + /// This generator passes the SmallCrush suite, part of TestU01 framework: + /// <http://simul.iro.umontreal.ca/testu01/tu01.html> + #[derive(Debug)] + pub(crate) struct FastRand { + one: Cell<u32>, + two: Cell<u32>, + } + + impl FastRand { + /// Initialize a new, thread-local, fast random number generator. + pub(crate) fn new(seed: u64) -> FastRand { + let one = (seed >> 32) as u32; + let mut two = seed as u32; + + if two == 0 { + // This value cannot be zero + two = 1; + } + + FastRand { + one: Cell::new(one), + two: Cell::new(two), + } + } + + pub(crate) fn fastrand_n(&self, n: u32) -> u32 { + // This is similar to fastrand() % n, but faster. + // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + let mul = (self.fastrand() as u64).wrapping_mul(n as u64); + (mul >> 32) as u32 + } + + fn fastrand(&self) -> u32 { + let mut s1 = self.one.get(); + let s0 = self.two.get(); + + s1 ^= s1 << 17; + s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; + + self.one.set(s0); + self.two.set(s1); + + s0.wrapping_add(s1) + } + } + + // Used by `StreamMap` + pub(crate) fn thread_rng_n(n: u32) -> u32 { + thread_local! { + static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); + } + + THREAD_RNG.with(|rng| rng.fastrand_n(n)) + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers.rs b/third_party/rust/tokio-stream/src/wrappers.rs new file mode 100644 index 0000000000..62cabe4f7d --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers.rs @@ -0,0 +1,62 @@ +//! Wrappers for Tokio types that implement `Stream`. + +/// Error types for the wrappers. +pub mod errors { + cfg_sync! { + pub use crate::wrappers::broadcast::BroadcastStreamRecvError; + } +} + +mod mpsc_bounded; +pub use mpsc_bounded::ReceiverStream; + +mod mpsc_unbounded; +pub use mpsc_unbounded::UnboundedReceiverStream; + +cfg_sync! { + mod broadcast; + pub use broadcast::BroadcastStream; + + mod watch; + pub use watch::WatchStream; +} + +cfg_signal! { + #[cfg(unix)] + mod signal_unix; + #[cfg(unix)] + pub use signal_unix::SignalStream; + + #[cfg(any(windows, docsrs))] + mod signal_windows; + #[cfg(any(windows, docsrs))] + pub use signal_windows::{CtrlCStream, CtrlBreakStream}; +} + +cfg_time! { + mod interval; + pub use interval::IntervalStream; +} + +cfg_net! { + mod tcp_listener; + pub use tcp_listener::TcpListenerStream; + + #[cfg(unix)] + mod unix_listener; + #[cfg(unix)] + pub use unix_listener::UnixListenerStream; +} + +cfg_io_util! { + mod split; + pub use split::SplitStream; + + mod lines; + pub use lines::LinesStream; +} + +cfg_fs! { + mod read_dir; + pub use read_dir::ReadDirStream; +} diff --git a/third_party/rust/tokio-stream/src/wrappers/broadcast.rs b/third_party/rust/tokio-stream/src/wrappers/broadcast.rs new file mode 100644 index 0000000000..711066466a --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/broadcast.rs @@ -0,0 +1,79 @@ +use std::pin::Pin; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; + +/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct BroadcastStream<T> { + inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>, +} + +/// An error returned from the inner stream of a [`BroadcastStream`]. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum BroadcastStreamRecvError { + /// The receiver lagged too far behind. Attempting to receive again will + /// return the oldest message still retained by the channel. + /// + /// Includes the number of skipped messages. + Lagged(u64), +} + +impl fmt::Display for BroadcastStreamRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BroadcastStreamRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), + } + } +} + +impl std::error::Error for BroadcastStreamRecvError {} + +async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) { + let result = rx.recv().await; + (result, rx) +} + +impl<T: 'static + Clone + Send> BroadcastStream<T> { + /// Create a new `BroadcastStream`. + pub fn new(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> { + type Item = Result<T, BroadcastStreamRecvError>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Ok(item) => Poll::Ready(Some(Ok(item))), + Err(RecvError::Closed) => Poll::Ready(None), + Err(RecvError::Lagged(n)) => { + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) + } + } + } +} + +impl<T> fmt::Debug for BroadcastStream<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BroadcastStream").finish() + } +} + +impl<T: 'static + Clone + Send> From<Receiver<T>> for BroadcastStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/interval.rs b/third_party/rust/tokio-stream/src/wrappers/interval.rs new file mode 100644 index 0000000000..2bf0194bd0 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/interval.rs @@ -0,0 +1,50 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::time::{Instant, Interval}; + +/// A wrapper around [`Interval`] that implements [`Stream`]. +/// +/// [`Interval`]: struct@tokio::time::Interval +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "time")))] +pub struct IntervalStream { + inner: Interval, +} + +impl IntervalStream { + /// Create a new `IntervalStream`. + pub fn new(interval: Interval) -> Self { + Self { inner: interval } + } + + /// Get back the inner `Interval`. + pub fn into_inner(self) -> Interval { + self.inner + } +} + +impl Stream for IntervalStream { + type Item = Instant; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> { + self.inner.poll_tick(cx).map(Some) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (std::usize::MAX, None) + } +} + +impl AsRef<Interval> for IntervalStream { + fn as_ref(&self) -> &Interval { + &self.inner + } +} + +impl AsMut<Interval> for IntervalStream { + fn as_mut(&mut self) -> &mut Interval { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/lines.rs b/third_party/rust/tokio-stream/src/wrappers/lines.rs new file mode 100644 index 0000000000..ad3c25349f --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/lines.rs @@ -0,0 +1,60 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, Lines}; + +pin_project! { + /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`]. + /// + /// [`tokio::io::Lines`]: struct@tokio::io::Lines + /// [`Stream`]: trait@crate::Stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct LinesStream<R> { + #[pin] + inner: Lines<R>, + } +} + +impl<R> LinesStream<R> { + /// Create a new `LinesStream`. + pub fn new(lines: Lines<R>) -> Self { + Self { inner: lines } + } + + /// Get back the inner `Lines`. + pub fn into_inner(self) -> Lines<R> { + self.inner + } + + /// Obtain a pinned reference to the inner `Lines<R>`. + #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546 + pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>> { + self.project().inner + } +} + +impl<R: AsyncBufRead> Stream for LinesStream<R> { + type Item = io::Result<String>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.project() + .inner + .poll_next_line(cx) + .map(Result::transpose) + } +} + +impl<R> AsRef<Lines<R>> for LinesStream<R> { + fn as_ref(&self) -> &Lines<R> { + &self.inner + } +} + +impl<R> AsMut<Lines<R>> for LinesStream<R> { + fn as_mut(&mut self) -> &mut Lines<R> { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs b/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs new file mode 100644 index 0000000000..b5362680ee --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -0,0 +1,65 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::Receiver; + +/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +pub struct ReceiverStream<T> { + inner: Receiver<T>, +} + +impl<T> ReceiverStream<T> { + /// Create a new `ReceiverStream`. + pub fn new(recv: Receiver<T>) -> Self { + Self { inner: recv } + } + + /// Get back the inner `Receiver`. + pub fn into_inner(self) -> Receiver<T> { + self.inner + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. Any + /// outstanding [`Permit`] values will still be able to send messages. + /// + /// To guarantee no messages are dropped, after calling `close()`, you must + /// receive all items from the stream until `None` is returned. + /// + /// [`Permit`]: struct@tokio::sync::mpsc::Permit + pub fn close(&mut self) { + self.inner.close() + } +} + +impl<T> Stream for ReceiverStream<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner.poll_recv(cx) + } +} + +impl<T> AsRef<Receiver<T>> for ReceiverStream<T> { + fn as_ref(&self) -> &Receiver<T> { + &self.inner + } +} + +impl<T> AsMut<Receiver<T>> for ReceiverStream<T> { + fn as_mut(&mut self) -> &mut Receiver<T> { + &mut self.inner + } +} + +impl<T> From<Receiver<T>> for ReceiverStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs b/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs new file mode 100644 index 0000000000..54597b7f6f --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -0,0 +1,59 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::UnboundedReceiver; + +/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +pub struct UnboundedReceiverStream<T> { + inner: UnboundedReceiver<T>, +} + +impl<T> UnboundedReceiverStream<T> { + /// Create a new `UnboundedReceiverStream`. + pub fn new(recv: UnboundedReceiver<T>) -> Self { + Self { inner: recv } + } + + /// Get back the inner `UnboundedReceiver`. + pub fn into_inner(self) -> UnboundedReceiver<T> { + self.inner + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.inner.close() + } +} + +impl<T> Stream for UnboundedReceiverStream<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner.poll_recv(cx) + } +} + +impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { + fn as_ref(&self) -> &UnboundedReceiver<T> { + &self.inner + } +} + +impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { + fn as_mut(&mut self) -> &mut UnboundedReceiver<T> { + &mut self.inner + } +} + +impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { + fn from(recv: UnboundedReceiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/read_dir.rs b/third_party/rust/tokio-stream/src/wrappers/read_dir.rs new file mode 100644 index 0000000000..b5cf54f79e --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/read_dir.rs @@ -0,0 +1,47 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::fs::{DirEntry, ReadDir}; + +/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`]. +/// +/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "fs")))] +pub struct ReadDirStream { + inner: ReadDir, +} + +impl ReadDirStream { + /// Create a new `ReadDirStream`. + pub fn new(read_dir: ReadDir) -> Self { + Self { inner: read_dir } + } + + /// Get back the inner `ReadDir`. + pub fn into_inner(self) -> ReadDir { + self.inner + } +} + +impl Stream for ReadDirStream { + type Item = io::Result<DirEntry>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.inner.poll_next_entry(cx).map(Result::transpose) + } +} + +impl AsRef<ReadDir> for ReadDirStream { + fn as_ref(&self) -> &ReadDir { + &self.inner + } +} + +impl AsMut<ReadDir> for ReadDirStream { + fn as_mut(&mut self) -> &mut ReadDir { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs b/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs new file mode 100644 index 0000000000..2f74e7d152 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs @@ -0,0 +1,46 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::signal::unix::Signal; + +/// A wrapper around [`Signal`] that implements [`Stream`]. +/// +/// [`Signal`]: struct@tokio::signal::unix::Signal +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "signal"))))] +pub struct SignalStream { + inner: Signal, +} + +impl SignalStream { + /// Create a new `SignalStream`. + pub fn new(interval: Signal) -> Self { + Self { inner: interval } + } + + /// Get back the inner `Signal`. + pub fn into_inner(self) -> Signal { + self.inner + } +} + +impl Stream for SignalStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +impl AsRef<Signal> for SignalStream { + fn as_ref(&self) -> &Signal { + &self.inner + } +} + +impl AsMut<Signal> for SignalStream { + fn as_mut(&mut self) -> &mut Signal { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs b/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs new file mode 100644 index 0000000000..4631fbad8d --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs @@ -0,0 +1,88 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::signal::windows::{CtrlBreak, CtrlC}; + +/// A wrapper around [`CtrlC`] that implements [`Stream`]. +/// +/// [`CtrlC`]: struct@tokio::signal::windows::CtrlC +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))] +pub struct CtrlCStream { + inner: CtrlC, +} + +impl CtrlCStream { + /// Create a new `CtrlCStream`. + pub fn new(interval: CtrlC) -> Self { + Self { inner: interval } + } + + /// Get back the inner `CtrlC`. + pub fn into_inner(self) -> CtrlC { + self.inner + } +} + +impl Stream for CtrlCStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +impl AsRef<CtrlC> for CtrlCStream { + fn as_ref(&self) -> &CtrlC { + &self.inner + } +} + +impl AsMut<CtrlC> for CtrlCStream { + fn as_mut(&mut self) -> &mut CtrlC { + &mut self.inner + } +} + +/// A wrapper around [`CtrlBreak`] that implements [`Stream`]. +/// +/// [`CtrlBreak`]: struct@tokio::signal::windows::CtrlBreak +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))] +pub struct CtrlBreakStream { + inner: CtrlBreak, +} + +impl CtrlBreakStream { + /// Create a new `CtrlBreakStream`. + pub fn new(interval: CtrlBreak) -> Self { + Self { inner: interval } + } + + /// Get back the inner `CtrlBreak`. + pub fn into_inner(self) -> CtrlBreak { + self.inner + } +} + +impl Stream for CtrlBreakStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +impl AsRef<CtrlBreak> for CtrlBreakStream { + fn as_ref(&self) -> &CtrlBreak { + &self.inner + } +} + +impl AsMut<CtrlBreak> for CtrlBreakStream { + fn as_mut(&mut self) -> &mut CtrlBreak { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/split.rs b/third_party/rust/tokio-stream/src/wrappers/split.rs new file mode 100644 index 0000000000..5a6bb2d408 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/split.rs @@ -0,0 +1,60 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, Split}; + +pin_project! { + /// A wrapper around [`tokio::io::Split`] that implements [`Stream`]. + /// + /// [`tokio::io::Split`]: struct@tokio::io::Split + /// [`Stream`]: trait@crate::Stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct SplitStream<R> { + #[pin] + inner: Split<R>, + } +} + +impl<R> SplitStream<R> { + /// Create a new `SplitStream`. + pub fn new(split: Split<R>) -> Self { + Self { inner: split } + } + + /// Get back the inner `Split`. + pub fn into_inner(self) -> Split<R> { + self.inner + } + + /// Obtain a pinned reference to the inner `Split<R>`. + #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546 + pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Split<R>> { + self.project().inner + } +} + +impl<R: AsyncBufRead> Stream for SplitStream<R> { + type Item = io::Result<Vec<u8>>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.project() + .inner + .poll_next_segment(cx) + .map(Result::transpose) + } +} + +impl<R> AsRef<Split<R>> for SplitStream<R> { + fn as_ref(&self) -> &Split<R> { + &self.inner + } +} + +impl<R> AsMut<Split<R>> for SplitStream<R> { + fn as_mut(&mut self) -> &mut Split<R> { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs b/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs new file mode 100644 index 0000000000..ce7cb16350 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs @@ -0,0 +1,54 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::{TcpListener, TcpStream}; + +/// A wrapper around [`TcpListener`] that implements [`Stream`]. +/// +/// [`TcpListener`]: struct@tokio::net::TcpListener +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +pub struct TcpListenerStream { + inner: TcpListener, +} + +impl TcpListenerStream { + /// Create a new `TcpListenerStream`. + pub fn new(listener: TcpListener) -> Self { + Self { inner: listener } + } + + /// Get back the inner `TcpListener`. + pub fn into_inner(self) -> TcpListener { + self.inner + } +} + +impl Stream for TcpListenerStream { + type Item = io::Result<TcpStream>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<io::Result<TcpStream>>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsRef<TcpListener> for TcpListenerStream { + fn as_ref(&self) -> &TcpListener { + &self.inner + } +} + +impl AsMut<TcpListener> for TcpListenerStream { + fn as_mut(&mut self) -> &mut TcpListener { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs b/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs new file mode 100644 index 0000000000..0beba588c2 --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs @@ -0,0 +1,54 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::{UnixListener, UnixStream}; + +/// A wrapper around [`UnixListener`] that implements [`Stream`]. +/// +/// [`UnixListener`]: struct@tokio::net::UnixListener +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))] +pub struct UnixListenerStream { + inner: UnixListener, +} + +impl UnixListenerStream { + /// Create a new `UnixListenerStream`. + pub fn new(listener: UnixListener) -> Self { + Self { inner: listener } + } + + /// Get back the inner `UnixListener`. + pub fn into_inner(self) -> UnixListener { + self.inner + } +} + +impl Stream for UnixListenerStream { + type Item = io::Result<UnixStream>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<io::Result<UnixStream>>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsRef<UnixListener> for UnixListenerStream { + fn as_ref(&self) -> &UnixListener { + &self.inner + } +} + +impl AsMut<UnixListener> for UnixListenerStream { + fn as_mut(&mut self) -> &mut UnixListener { + &mut self.inner + } +} diff --git a/third_party/rust/tokio-stream/src/wrappers/watch.rs b/third_party/rust/tokio-stream/src/wrappers/watch.rs new file mode 100644 index 0000000000..ec8ead06da --- /dev/null +++ b/third_party/rust/tokio-stream/src/wrappers/watch.rs @@ -0,0 +1,132 @@ +use std::pin::Pin; +use tokio::sync::watch::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; +use tokio::sync::watch::error::RecvError; + +/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. +/// +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream<T>::from_changes`]. +/// +/// # Examples +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new(rx); +/// +/// assert_eq!(rx.next().await, Some("hello")); +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new(rx); +/// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream<T>::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct WatchStream<T> { + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>, +} + +async fn make_future<T: Clone + Send + Sync>( + mut rx: Receiver<T>, +) -> (Result<(), RecvError>, Receiver<T>) { + let result = rx.changed().await; + (result, rx) +} + +impl<T: 'static + Clone + Send + Sync> WatchStream<T> { + /// Create a new `WatchStream`. + pub fn new(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(make_future(rx)); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(make_future(rx)); + Poll::Ready(None) + } + } + } +} + +impl<T> Unpin for WatchStream<T> {} + +impl<T> fmt::Debug for WatchStream<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WatchStream").finish() + } +} + +impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/third_party/rust/tokio-stream/tests/async_send_sync.rs b/third_party/rust/tokio-stream/tests/async_send_sync.rs new file mode 100644 index 0000000000..f1c8b4efe2 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/async_send_sync.rs @@ -0,0 +1,107 @@ +#![allow(clippy::diverging_sub_expression)] + +use std::rc::Rc; + +#[allow(dead_code)] +type BoxStream<T> = std::pin::Pin<Box<dyn tokio_stream::Stream<Item = T>>>; + +#[allow(dead_code)] +fn require_send<T: Send>(_t: &T) {} +#[allow(dead_code)] +fn require_sync<T: Sync>(_t: &T) {} +#[allow(dead_code)] +fn require_unpin<T: Unpin>(_t: &T) {} + +#[allow(dead_code)] +struct Invalid; + +trait AmbiguousIfSend<A> { + fn some_item(&self) {} +} +impl<T: ?Sized> AmbiguousIfSend<()> for T {} +impl<T: ?Sized + Send> AmbiguousIfSend<Invalid> for T {} + +trait AmbiguousIfSync<A> { + fn some_item(&self) {} +} +impl<T: ?Sized> AmbiguousIfSync<()> for T {} +impl<T: ?Sized + Sync> AmbiguousIfSync<Invalid> for T {} + +trait AmbiguousIfUnpin<A> { + fn some_item(&self) {} +} +impl<T: ?Sized> AmbiguousIfUnpin<()> for T {} +impl<T: ?Sized + Unpin> AmbiguousIfUnpin<Invalid> for T {} + +macro_rules! into_todo { + ($typ:ty) => {{ + let x: $typ = todo!(); + x + }}; +} + +macro_rules! async_assert_fn { + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_send(&f); + require_sync(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_send(&f); + AmbiguousIfSync::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfSend::some_item(&f); + require_sync(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfSend::some_item(&f); + AmbiguousIfSync::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfUnpin::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_unpin(&f); + }; + }; +} + +async_assert_fn!(tokio_stream::empty<Rc<u8>>(): Send & Sync); +async_assert_fn!(tokio_stream::pending<Rc<u8>>(): Send & Sync); +async_assert_fn!(tokio_stream::iter(std::vec::IntoIter<u8>): Send & Sync); + +async_assert_fn!(tokio_stream::StreamExt::next(&mut BoxStream<()>): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::try_next(&mut BoxStream<Result<(), ()>>): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::collect<Vec<()>>(&mut BoxStream<()>): !Unpin); diff --git a/third_party/rust/tokio-stream/tests/chunks_timeout.rs b/third_party/rust/tokio-stream/tests/chunks_timeout.rs new file mode 100644 index 0000000000..ffc7deadd7 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/chunks_timeout.rs @@ -0,0 +1,84 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time; +use tokio_stream::{self as stream, StreamExt}; +use tokio_test::assert_pending; +use tokio_test::task; + +use futures::FutureExt; +use std::time::Duration; + +#[tokio::test(start_paused = true)] +async fn usage() { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![4].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(4, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test(start_paused = true)] +async fn full_chunk_with_timeout() { + let iter = vec![1, 2].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![3].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n)); + + let iter = vec![4].into_iter(); + let stream2 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chain(stream2) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test] +#[ignore] +async fn real_time() { + let iter = vec![1, 2, 3, 4].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![5].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + assert_eq!(chunk_stream.next().await, Some(vec![4])); + assert_eq!(chunk_stream.next().await, Some(vec![5])); +} diff --git a/third_party/rust/tokio-stream/tests/stream_chain.rs b/third_party/rust/tokio-stream/tests/stream_chain.rs new file mode 100644 index 0000000000..f3b7edb16a --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_chain.rs @@ -0,0 +1,100 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; +use tokio_test::{assert_pending, assert_ready, task}; + +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + +#[tokio::test] +async fn basic_usage() { + let one = stream::iter(vec![1, 2, 3]); + let two = stream::iter(vec![4, 5, 6]); + + let mut stream = one.chain(two); + + assert_eq!(stream.size_hint(), (6, Some(6))); + assert_eq!(stream.next().await, Some(1)); + + assert_eq!(stream.size_hint(), (5, Some(5))); + assert_eq!(stream.next().await, Some(2)); + + assert_eq!(stream.size_hint(), (4, Some(4))); + assert_eq!(stream.next().await, Some(3)); + + assert_eq!(stream.size_hint(), (3, Some(3))); + assert_eq!(stream.next().await, Some(4)); + + assert_eq!(stream.size_hint(), (2, Some(2))); + assert_eq!(stream.next().await, Some(5)); + + assert_eq!(stream.size_hint(), (1, Some(1))); + assert_eq!(stream.next().await, Some(6)); + + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); + + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); +} + +#[tokio::test] +async fn pending_first() { + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let mut stream = task::spawn(rx1.chain(rx2)); + assert_eq!(stream.size_hint(), (0, None)); + + assert_pending!(stream.poll_next()); + + tx2.send(2).unwrap(); + assert!(!stream.is_woken()); + + assert_pending!(stream.poll_next()); + + tx1.send(1).unwrap(); + assert!(stream.is_woken()); + assert_eq!(Some(1), assert_ready!(stream.poll_next())); + + assert_pending!(stream.poll_next()); + + drop(tx1); + + assert_eq!(stream.size_hint(), (0, None)); + + assert!(stream.is_woken()); + assert_eq!(Some(2), assert_ready!(stream.poll_next())); + + assert_eq!(stream.size_hint(), (0, None)); + + drop(tx2); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(None, assert_ready!(stream.poll_next())); +} + +#[test] +fn size_overflow() { + struct Monster; + + impl tokio_stream::Stream for Monster { + type Item = (); + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<()>> { + panic!() + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (usize::MAX, Some(usize::MAX)) + } + } + + let m1 = Monster; + let m2 = Monster; + let m = m1.chain(m2); + assert_eq!(m.size_hint(), (usize::MAX, None)); +} diff --git a/third_party/rust/tokio-stream/tests/stream_collect.rs b/third_party/rust/tokio-stream/tests/stream_collect.rs new file mode 100644 index 0000000000..07659a1fc3 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_collect.rs @@ -0,0 +1,146 @@ +use tokio_stream::{self as stream, StreamExt}; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; + +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + +#[allow(clippy::let_unit_value)] +#[tokio::test] +async fn empty_unit() { + // Drains the stream. + let mut iter = vec![(), (), ()].into_iter(); + let _: () = stream::iter(&mut iter).collect().await; + assert!(iter.next().is_none()); +} + +#[tokio::test] +async fn empty_vec() { + let coll: Vec<u32> = stream::empty().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_box_slice() { + let coll: Box<[u32]> = stream::empty().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_string() { + let coll: String = stream::empty::<&str>().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_result() { + let coll: Result<Vec<u32>, &str> = stream::empty().collect().await; + assert_eq!(Ok(vec![]), coll); +} + +#[tokio::test] +async fn collect_vec_items() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + let mut fut = task::spawn(rx.collect::<Vec<i32>>()); + + assert_pending!(fut.poll()); + + tx.send(1).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(2).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!(vec![1, 2], coll); +} + +#[tokio::test] +async fn collect_string_items() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let mut fut = task::spawn(rx.collect::<String>()); + + assert_pending!(fut.poll()); + + tx.send("hello ".to_string()).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send("world".to_string()).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_str_items() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let mut fut = task::spawn(rx.collect::<String>()); + + assert_pending!(fut.poll()); + + tx.send("hello ").unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send("world").unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_results_ok() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); + + assert_pending!(fut.poll()); + + tx.send(Ok("hello ")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(Ok("world")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready_ok!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_results_err() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); + + assert_pending!(fut.poll()); + + tx.send(Ok("hello ")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(Err("oh no")).unwrap(); + assert!(fut.is_woken()); + let err = assert_ready_err!(fut.poll()); + assert_eq!("oh no", err); +} diff --git a/third_party/rust/tokio-stream/tests/stream_empty.rs b/third_party/rust/tokio-stream/tests/stream_empty.rs new file mode 100644 index 0000000000..c06f5c41c0 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_empty.rs @@ -0,0 +1,11 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; + +#[tokio::test] +async fn basic_usage() { + let mut stream = stream::empty::<i32>(); + + for _ in 0..2 { + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(None, stream.next().await); + } +} diff --git a/third_party/rust/tokio-stream/tests/stream_fuse.rs b/third_party/rust/tokio-stream/tests/stream_fuse.rs new file mode 100644 index 0000000000..9b6cf054cf --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_fuse.rs @@ -0,0 +1,50 @@ +use tokio_stream::{Stream, StreamExt}; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +// a stream which alternates between Some and None +struct Alternate { + state: i32, +} + +impl Stream for Alternate { + type Item = i32; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { + let val = self.state; + self.state += 1; + + // if it's even, Some(i32), else None + if val % 2 == 0 { + Poll::Ready(Some(val)) + } else { + Poll::Ready(None) + } + } +} + +#[tokio::test] +async fn basic_usage() { + let mut stream = Alternate { state: 0 }; + + // the stream goes back and forth + assert_eq!(stream.next().await, Some(0)); + assert_eq!(stream.next().await, None); + assert_eq!(stream.next().await, Some(2)); + assert_eq!(stream.next().await, None); + + // however, once it is fused + let mut stream = stream.fuse(); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(stream.next().await, Some(4)); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(stream.next().await, None); + + // it will always return `None` after the first time. + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); + assert_eq!(stream.size_hint(), (0, Some(0))); +} diff --git a/third_party/rust/tokio-stream/tests/stream_iter.rs b/third_party/rust/tokio-stream/tests/stream_iter.rs new file mode 100644 index 0000000000..8b9ee3ce5b --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_iter.rs @@ -0,0 +1,18 @@ +use tokio_stream as stream; +use tokio_test::task; + +use std::iter; + +#[tokio::test] +async fn coop() { + let mut stream = task::spawn(stream::iter(iter::repeat(1))); + + for _ in 0..10_000 { + if stream.poll_next().is_pending() { + assert!(stream.is_woken()); + return; + } + } + + panic!("did not yield"); +} diff --git a/third_party/rust/tokio-stream/tests/stream_merge.rs b/third_party/rust/tokio-stream/tests/stream_merge.rs new file mode 100644 index 0000000000..f603bccf88 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_merge.rs @@ -0,0 +1,83 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; +use tokio_test::task; +use tokio_test::{assert_pending, assert_ready}; + +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + +#[tokio::test] +async fn merge_sync_streams() { + let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5])); + + for i in 0..7 { + let rem = 7 - i; + assert_eq!(s.size_hint(), (rem, Some(rem))); + assert_eq!(Some(i), s.next().await); + } + + assert!(s.next().await.is_none()); +} + +#[tokio::test] +async fn merge_async_streams() { + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let mut rx = task::spawn(rx1.merge(rx2)); + + assert_eq!(rx.size_hint(), (0, None)); + + assert_pending!(rx.poll_next()); + + tx1.send(1).unwrap(); + + assert!(rx.is_woken()); + assert_eq!(Some(1), assert_ready!(rx.poll_next())); + + assert_pending!(rx.poll_next()); + tx2.send(2).unwrap(); + + assert!(rx.is_woken()); + assert_eq!(Some(2), assert_ready!(rx.poll_next())); + assert_pending!(rx.poll_next()); + + drop(tx1); + assert!(rx.is_woken()); + assert_pending!(rx.poll_next()); + + tx2.send(3).unwrap(); + assert!(rx.is_woken()); + assert_eq!(Some(3), assert_ready!(rx.poll_next())); + assert_pending!(rx.poll_next()); + + drop(tx2); + assert!(rx.is_woken()); + assert_eq!(None, assert_ready!(rx.poll_next())); +} + +#[test] +fn size_overflow() { + struct Monster; + + impl tokio_stream::Stream for Monster { + type Item = (); + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<()>> { + panic!() + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (usize::MAX, Some(usize::MAX)) + } + } + + let m1 = Monster; + let m2 = Monster; + let m = m1.merge(m2); + assert_eq!(m.size_hint(), (usize::MAX, None)); +} diff --git a/third_party/rust/tokio-stream/tests/stream_once.rs b/third_party/rust/tokio-stream/tests/stream_once.rs new file mode 100644 index 0000000000..f32bad3a12 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_once.rs @@ -0,0 +1,12 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; + +#[tokio::test] +async fn basic_usage() { + let mut one = stream::once(1); + + assert_eq!(one.size_hint(), (1, Some(1))); + assert_eq!(Some(1), one.next().await); + + assert_eq!(one.size_hint(), (0, Some(0))); + assert_eq!(None, one.next().await); +} diff --git a/third_party/rust/tokio-stream/tests/stream_panic.rs b/third_party/rust/tokio-stream/tests/stream_panic.rs new file mode 100644 index 0000000000..22c1c20800 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_panic.rs @@ -0,0 +1,55 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery + +use parking_lot::{const_mutex, Mutex}; +use std::error::Error; +use std::panic; +use std::sync::Arc; +use tokio::time::Duration; +use tokio_stream::{self as stream, StreamExt}; + +fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> { + static PANIC_MUTEX: Mutex<()> = const_mutex(()); + + { + let _guard = PANIC_MUTEX.lock(); + let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); + + let prev_hook = panic::take_hook(); + { + let panic_file = panic_file.clone(); + panic::set_hook(Box::new(move |panic_info| { + let panic_location = panic_info.location().unwrap(); + panic_file + .lock() + .clone_from(&Some(panic_location.file().to_string())); + })); + } + + let result = panic::catch_unwind(func); + // Return to the previously set panic hook (maybe default) so that we get nice error + // messages in the tests. + panic::set_hook(prev_hook); + + if result.is_err() { + panic_file.lock().clone() + } else { + None + } + } +} + +#[test] +fn stream_chunks_timeout_panic_caller() -> Result<(), Box<dyn Error>> { + let panic_location_file = test_panic(|| { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let _chunk_stream = stream0.chunks_timeout(0, Duration::from_secs(2)); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} diff --git a/third_party/rust/tokio-stream/tests/stream_pending.rs b/third_party/rust/tokio-stream/tests/stream_pending.rs new file mode 100644 index 0000000000..87b5d03bda --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_pending.rs @@ -0,0 +1,14 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; +use tokio_test::{assert_pending, task}; + +#[tokio::test] +async fn basic_usage() { + let mut stream = stream::pending::<i32>(); + + for _ in 0..2 { + assert_eq!(stream.size_hint(), (0, None)); + + let mut next = task::spawn(async { stream.next().await }); + assert_pending!(next.poll()); + } +} diff --git a/third_party/rust/tokio-stream/tests/stream_stream_map.rs b/third_party/rust/tokio-stream/tests/stream_stream_map.rs new file mode 100644 index 0000000000..b6b87e9d0a --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_stream_map.rs @@ -0,0 +1,330 @@ +use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap}; +use tokio_test::{assert_ok, assert_pending, assert_ready, task}; + +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + +use std::pin::Pin; + +macro_rules! assert_ready_some { + ($($t:tt)*) => { + match assert_ready!($($t)*) { + Some(v) => v, + None => panic!("expected `Some`, got `None`"), + } + }; +} + +macro_rules! assert_ready_none { + ($($t:tt)*) => { + match assert_ready!($($t)*) { + None => {} + Some(v) => panic!("expected `None`, got `Some({:?})`", v), + } + }; +} + +#[tokio::test] +async fn empty() { + let mut map = StreamMap::<&str, stream::Pending<()>>::new(); + + assert_eq!(map.len(), 0); + assert!(map.is_empty()); + + assert!(map.next().await.is_none()); + assert!(map.next().await.is_none()); + + assert!(map.remove("foo").is_none()); +} + +#[tokio::test] +async fn single_entry() { + let mut map = task::spawn(StreamMap::new()); + let (tx, rx) = mpsc::unbounded_channel_stream(); + let rx = Box::pin(rx); + + assert_ready_none!(map.poll_next()); + + assert!(map.insert("foo", rx).is_none()); + assert!(map.contains_key("foo")); + assert!(!map.contains_key("bar")); + + assert_eq!(map.len(), 1); + assert!(!map.is_empty()); + + assert_pending!(map.poll_next()); + + assert_ok!(tx.send(1)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 1); + + assert_pending!(map.poll_next()); + + assert_ok!(tx.send(2)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 2); + + assert_pending!(map.poll_next()); + drop(tx); + assert!(map.is_woken()); + assert_ready_none!(map.poll_next()); +} + +#[tokio::test] +async fn multiple_entries() { + let mut map = task::spawn(StreamMap::new()); + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let rx1 = Box::pin(rx1); + let rx2 = Box::pin(rx2); + + map.insert("foo", rx1); + map.insert("bar", rx2); + + assert_pending!(map.poll_next()); + + assert_ok!(tx1.send(1)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 1); + + assert_pending!(map.poll_next()); + + assert_ok!(tx2.send(2)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "bar"); + assert_eq!(v, 2); + + assert_pending!(map.poll_next()); + + assert_ok!(tx1.send(3)); + assert_ok!(tx2.send(4)); + + assert!(map.is_woken()); + + // Given the randomization, there is no guarantee what order the values will + // be received in. + let mut v = (0..2) + .map(|_| assert_ready_some!(map.poll_next())) + .collect::<Vec<_>>(); + + assert_pending!(map.poll_next()); + + v.sort_unstable(); + assert_eq!(v[0].0, "bar"); + assert_eq!(v[0].1, 4); + assert_eq!(v[1].0, "foo"); + assert_eq!(v[1].1, 3); + + drop(tx1); + assert!(map.is_woken()); + assert_pending!(map.poll_next()); + drop(tx2); + + assert_ready_none!(map.poll_next()); +} + +#[tokio::test] +async fn insert_remove() { + let mut map = task::spawn(StreamMap::new()); + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let rx = Box::pin(rx); + + assert_ready_none!(map.poll_next()); + + assert!(map.insert("foo", rx).is_none()); + let rx = map.remove("foo").unwrap(); + + assert_ok!(tx.send(1)); + + assert!(!map.is_woken()); + assert_ready_none!(map.poll_next()); + + assert!(map.insert("bar", rx).is_none()); + + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v.0, "bar"); + assert_eq!(v.1, 1); + + assert!(map.remove("bar").is_some()); + assert_ready_none!(map.poll_next()); + + assert!(map.is_empty()); + assert_eq!(0, map.len()); +} + +#[tokio::test] +async fn replace() { + let mut map = task::spawn(StreamMap::new()); + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let rx1 = Box::pin(rx1); + let rx2 = Box::pin(rx2); + + assert!(map.insert("foo", rx1).is_none()); + + assert_pending!(map.poll_next()); + + let _rx1 = map.insert("foo", rx2).unwrap(); + + assert_pending!(map.poll_next()); + + tx1.send(1).unwrap(); + assert_pending!(map.poll_next()); + + tx2.send(2).unwrap(); + assert!(map.is_woken()); + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v.0, "foo"); + assert_eq!(v.1, 2); +} + +#[test] +fn size_hint_with_upper() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + assert_eq!(3, map.len()); + assert!(!map.is_empty()); + + let size_hint = map.size_hint(); + assert_eq!(size_hint, (6, Some(6))); +} + +#[test] +fn size_hint_without_upper() { + let mut map = StreamMap::new(); + + map.insert("a", pin_box(stream::iter(vec![1]))); + map.insert("b", pin_box(stream::iter(vec![1, 2]))); + map.insert("c", pin_box(pending())); + + let size_hint = map.size_hint(); + assert_eq!(size_hint, (3, None)); +} + +#[test] +fn new_capacity_zero() { + let map = StreamMap::<&str, stream::Pending<()>>::new(); + assert_eq!(0, map.capacity()); + + assert!(map.keys().next().is_none()); +} + +#[test] +fn with_capacity() { + let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10); + assert!(10 <= map.capacity()); + + assert!(map.keys().next().is_none()); +} + +#[test] +fn iter_keys() { + let mut map = StreamMap::new(); + + map.insert("a", pending::<i32>()); + map.insert("b", pending()); + map.insert("c", pending()); + + let mut keys = map.keys().collect::<Vec<_>>(); + keys.sort_unstable(); + + assert_eq!(&keys[..], &[&"a", &"b", &"c"]); +} + +#[test] +fn iter_values() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>(); + + size_hints.sort_unstable(); + + assert_eq!(&size_hints[..], &[1, 2, 3]); +} + +#[test] +fn iter_values_mut() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + let mut size_hints = map + .values_mut() + .map(|s: &mut _| s.size_hint().0) + .collect::<Vec<_>>(); + + size_hints.sort_unstable(); + + assert_eq!(&size_hints[..], &[1, 2, 3]); +} + +#[test] +fn clear() { + let mut map = task::spawn(StreamMap::new()); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + assert_ready_some!(map.poll_next()); + + map.clear(); + + assert_ready_none!(map.poll_next()); + assert!(map.is_empty()); +} + +#[test] +fn contains_key_borrow() { + let mut map = StreamMap::new(); + map.insert("foo".to_string(), pending::<()>()); + + assert!(map.contains_key("foo")); +} + +#[test] +fn one_ready_many_none() { + // Run a few times because of randomness + for _ in 0..100 { + let mut map = task::spawn(StreamMap::new()); + + map.insert(0, pin_box(stream::empty())); + map.insert(1, pin_box(stream::empty())); + map.insert(2, pin_box(stream::once("hello"))); + map.insert(3, pin_box(stream::pending())); + + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v, (2, "hello")); + } +} + +fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> { + Box::pin(s) +} diff --git a/third_party/rust/tokio-stream/tests/stream_timeout.rs b/third_party/rust/tokio-stream/tests/stream_timeout.rs new file mode 100644 index 0000000000..2338f83358 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_timeout.rs @@ -0,0 +1,109 @@ +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time::{self, sleep, Duration}; +use tokio_stream::{self, StreamExt}; +use tokio_test::*; + +use futures::stream; + +async fn maybe_sleep(idx: i32) -> i32 { + if idx % 2 == 0 { + sleep(ms(200)).await; + } + idx +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} + +#[tokio::test] +async fn basic_usage() { + time::pause(); + + // Items 2 and 4 time out. If we run the stream until it completes, + // we end up with the following items: + // + // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] + + let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100)); + let mut stream = task::spawn(stream); + + // First item completes immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + + // Second item is delayed 200ms, times out after 100ms + assert_pending!(stream.poll_next()); + + time::advance(ms(150)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); + + assert_pending!(stream.poll_next()); + + time::advance(ms(100)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(2))); + + // Third item is ready immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + + // Fourth item is delayed 200ms, times out after 100ms + assert_pending!(stream.poll_next()); + + time::advance(ms(60)).await; + assert_pending!(stream.poll_next()); // nothing ready yet + + time::advance(ms(60)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); // timeout! + + time::advance(ms(120)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(4))); + + // Done. + assert_ready_eq!(stream.poll_next(), None); +} + +#[tokio::test] +async fn return_elapsed_errors_only_once() { + time::pause(); + + let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50)); + let mut stream = task::spawn(stream); + + // First item completes immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + + // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed` + // error is returned. + assert_pending!(stream.poll_next()); + // + time::advance(ms(51)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); // timeout! + + // deadline elapses again, but no error is returned + time::advance(ms(50)).await; + assert_pending!(stream.poll_next()); + + time::advance(ms(100)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(2))); + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + + // Done + assert_ready_eq!(stream.poll_next(), None); +} + +#[tokio::test] +async fn no_timeouts() { + let stream = stream::iter(vec![1, 3, 5]) + .then(maybe_sleep) + .timeout(ms(100)); + + let mut stream = task::spawn(stream); + + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + assert_ready_eq!(stream.poll_next(), Some(Ok(5))); + assert_ready_eq!(stream.poll_next(), None); +} diff --git a/third_party/rust/tokio-stream/tests/support/mpsc.rs b/third_party/rust/tokio-stream/tests/support/mpsc.rs new file mode 100644 index 0000000000..09dbe04215 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/support/mpsc.rs @@ -0,0 +1,15 @@ +use async_stream::stream; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio_stream::Stream; + +pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let stream = stream! { + while let Some(item) = rx.recv().await { + yield item; + } + }; + + (tx, stream) +} diff --git a/third_party/rust/tokio-stream/tests/time_throttle.rs b/third_party/rust/tokio-stream/tests/time_throttle.rs new file mode 100644 index 0000000000..e6c9917be3 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/time_throttle.rs @@ -0,0 +1,28 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time; +use tokio_stream::StreamExt; +use tokio_test::*; + +use std::time::Duration; + +#[tokio::test] +async fn usage() { + time::pause(); + + let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100))); + + assert_ready!(stream.poll_next()); + assert_pending!(stream.poll_next()); + + time::advance(Duration::from_millis(90)).await; + + assert_pending!(stream.poll_next()); + + time::advance(Duration::from_millis(101)).await; + + assert!(stream.is_woken()); + + assert_ready!(stream.poll_next()); +} diff --git a/third_party/rust/tokio-stream/tests/watch.rs b/third_party/rust/tokio-stream/tests/watch.rs new file mode 100644 index 0000000000..3a39aaf3db --- /dev/null +++ b/third_party/rust/tokio-stream/tests/watch.rs @@ -0,0 +1,57 @@ +#![cfg(feature = "sync")] + +use tokio::sync::watch; +use tokio_stream::wrappers::WatchStream; +use tokio_stream::StreamExt; +use tokio_test::assert_pending; +use tokio_test::task::spawn; + +#[tokio::test] +async fn watch_stream_message_not_twice() { + let (tx, rx) = watch::channel("hello"); + + let mut counter = 0; + let mut stream = WatchStream::new(rx).map(move |payload| { + println!("{}", payload); + if payload == "goodbye" { + counter += 1; + } + if counter >= 2 { + panic!("too many goodbyes"); + } + }); + + let task = tokio::spawn(async move { while stream.next().await.is_some() {} }); + + // Send goodbye just once + tx.send("goodbye").unwrap(); + + drop(tx); + task.await.unwrap(); +} + +#[tokio::test] +async fn watch_stream_from_rx() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from(rx); + + assert_eq!(stream.next().await.unwrap(), "hello"); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} + +#[tokio::test] +async fn watch_stream_from_changes() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from_changes(rx); + + assert_pending!(spawn(&mut stream).poll_next()); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} |