diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/src/stream/collect.rs | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/stream/collect.rs')
-rw-r--r-- | third_party/rust/tokio/src/stream/collect.rs | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/stream/collect.rs b/third_party/rust/tokio/src/stream/collect.rs new file mode 100644 index 0000000000..f44c72b7b3 --- /dev/null +++ b/third_party/rust/tokio/src/stream/collect.rs @@ -0,0 +1,246 @@ +use crate::stream::Stream; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use core::future::Future; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +// Do not export this struct until `FromStream` can be unsealed. +pin_project! { + /// Future returned by the [`collect`](super::StreamExt::collect) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct Collect<T, U> + where + T: Stream, + U: FromStream<T::Item>, + { + #[pin] + stream: T, + collection: U::Collection, + } +} + +/// Convert from a [`Stream`](crate::stream::Stream). +/// +/// This trait is not intended to be used directly. Instead, call +/// [`StreamExt::collect()`](super::StreamExt::collect). +/// +/// # Implementing +/// +/// Currently, this trait may not be implemented by third parties. The trait is +/// sealed in order to make changes in the future. Stabilization is pending +/// enhancements to the Rust langague. +pub trait FromStream<T>: sealed::FromStreamPriv<T> {} + +impl<T, U> Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + pub(super) fn new(stream: T) -> Collect<T, U> { + let (lower, upper) = stream.size_hint(); + let collection = U::initialize(lower, upper); + + Collect { stream, collection } + } +} + +impl<T, U> Future for Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + type Output = U; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> { + use Poll::Ready; + + loop { + let mut me = self.as_mut().project(); + + let item = match ready!(me.stream.poll_next(cx)) { + Some(item) => item, + None => { + return Ready(U::finalize(&mut me.collection)); + } + }; + + if !U::extend(&mut me.collection, item) { + return Ready(U::finalize(&mut me.collection)); + } + } + } +} + +// ===== FromStream implementations + +impl FromStream<()> for () {} + +impl sealed::FromStreamPriv<()> for () { + type Collection = (); + + fn initialize(_lower: usize, _upper: Option<usize>) {} + + fn extend(_collection: &mut (), _item: ()) -> bool { + true + } + + fn finalize(_collection: &mut ()) {} +} + +impl<T: AsRef<str>> FromStream<T> for String {} + +impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { + type Collection = String; + + fn initialize(_lower: usize, _upper: Option<usize>) -> String { + String::new() + } + + fn extend(collection: &mut String, item: T) -> bool { + collection.push_str(item.as_ref()); + true + } + + fn finalize(collection: &mut String) -> String { + mem::replace(collection, String::new()) + } +} + +impl<T> FromStream<T> for Vec<T> {} + +impl<T> sealed::FromStreamPriv<T> for Vec<T> { + type Collection = Vec<T>; + + fn initialize(lower: usize, _upper: Option<usize>) -> Vec<T> { + Vec::with_capacity(lower) + } + + fn extend(collection: &mut Vec<T>, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(collection: &mut Vec<T>) -> Vec<T> { + mem::replace(collection, vec![]) + } +} + +impl<T> FromStream<T> for Box<[T]> {} + +impl<T> sealed::FromStreamPriv<T> for Box<[T]> { + type Collection = Vec<T>; + + fn initialize(lower: usize, upper: Option<usize>) -> Vec<T> { + <Vec<T> as sealed::FromStreamPriv<T>>::initialize(lower, upper) + } + + fn extend(collection: &mut Vec<T>, item: T) -> bool { + <Vec<T> as sealed::FromStreamPriv<T>>::extend(collection, item) + } + + fn finalize(collection: &mut Vec<T>) -> Box<[T]> { + <Vec<T> as sealed::FromStreamPriv<T>>::finalize(collection).into_boxed_slice() + } +} + +impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {} + +impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E> +where + U: FromStream<T>, +{ + type Collection = Result<U::Collection, E>; + + fn initialize(lower: usize, upper: Option<usize>) -> Result<U::Collection, E> { + Ok(U::initialize(lower, upper)) + } + + fn extend(collection: &mut Self::Collection, item: Result<T, E>) -> bool { + assert!(collection.is_ok()); + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(collection, item) + } + Err(err) => { + *collection = Err(err); + false + } + } + } + + fn finalize(collection: &mut Self::Collection) -> Result<U, E> { + if let Ok(collection) = collection.as_mut() { + Ok(U::finalize(collection)) + } else { + let res = mem::replace(collection, Ok(U::initialize(0, Some(0)))); + + if let Err(err) = res { + Err(err) + } else { + unreachable!(); + } + } + } +} + +impl<T: Buf> FromStream<T> for Bytes {} + +impl<T: Buf> sealed::FromStreamPriv<T> for Bytes { + type Collection = BytesMut; + + fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut { + BytesMut::new() + } + + fn extend(collection: &mut BytesMut, item: T) -> bool { + collection.put(item); + true + } + + fn finalize(collection: &mut BytesMut) -> Bytes { + mem::replace(collection, BytesMut::new()).freeze() + } +} + +impl<T: Buf> FromStream<T> for BytesMut {} + +impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut { + type Collection = BytesMut; + + fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut { + BytesMut::new() + } + + fn extend(collection: &mut BytesMut, item: T) -> bool { + collection.put(item); + true + } + + fn finalize(collection: &mut BytesMut) -> BytesMut { + mem::replace(collection, BytesMut::new()) + } +} + +pub(crate) mod sealed { + #[doc(hidden)] + pub trait FromStreamPriv<T> { + /// Intermediate type used during collection process + type Collection; + + /// Initialize the collection + fn initialize(lower: usize, upper: Option<usize>) -> Self::Collection; + + /// Extend the collection with the received item + /// + /// Return `true` to continue streaming, `false` complete collection. + fn extend(collection: &mut Self::Collection, item: T) -> bool; + + /// Finalize collection into target type. + fn finalize(collection: &mut Self::Collection) -> Self; + } +} |