From 2aa4a82499d4becd2284cdb482213d541b8804dd Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 16:29:10 +0200 Subject: Adding upstream version 86.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/tokio/src/stream/collect.rs | 246 +++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 third_party/rust/tokio/src/stream/collect.rs (limited to 'third_party/rust/tokio/src/stream/collect.rs') diff --git a/third_party/rust/tokio/src/stream/collect.rs b/third_party/rust/tokio/src/stream/collect.rs new file mode 100644 index 0000000000..f44c72b7b3 --- /dev/null +++ b/third_party/rust/tokio/src/stream/collect.rs @@ -0,0 +1,246 @@ +use crate::stream::Stream; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use core::future::Future; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +// Do not export this struct until `FromStream` can be unsealed. +pin_project! { + /// Future returned by the [`collect`](super::StreamExt::collect) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct Collect + where + T: Stream, + U: FromStream, + { + #[pin] + stream: T, + collection: U::Collection, + } +} + +/// Convert from a [`Stream`](crate::stream::Stream). +/// +/// This trait is not intended to be used directly. Instead, call +/// [`StreamExt::collect()`](super::StreamExt::collect). +/// +/// # Implementing +/// +/// Currently, this trait may not be implemented by third parties. The trait is +/// sealed in order to make changes in the future. Stabilization is pending +/// enhancements to the Rust langague. +pub trait FromStream: sealed::FromStreamPriv {} + +impl Collect +where + T: Stream, + U: FromStream, +{ + pub(super) fn new(stream: T) -> Collect { + let (lower, upper) = stream.size_hint(); + let collection = U::initialize(lower, upper); + + Collect { stream, collection } + } +} + +impl Future for Collect +where + T: Stream, + U: FromStream, +{ + type Output = U; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use Poll::Ready; + + loop { + let mut me = self.as_mut().project(); + + let item = match ready!(me.stream.poll_next(cx)) { + Some(item) => item, + None => { + return Ready(U::finalize(&mut me.collection)); + } + }; + + if !U::extend(&mut me.collection, item) { + return Ready(U::finalize(&mut me.collection)); + } + } + } +} + +// ===== FromStream implementations + +impl FromStream<()> for () {} + +impl sealed::FromStreamPriv<()> for () { + type Collection = (); + + fn initialize(_lower: usize, _upper: Option) {} + + fn extend(_collection: &mut (), _item: ()) -> bool { + true + } + + fn finalize(_collection: &mut ()) {} +} + +impl> FromStream for String {} + +impl> sealed::FromStreamPriv for String { + type Collection = String; + + fn initialize(_lower: usize, _upper: Option) -> String { + String::new() + } + + fn extend(collection: &mut String, item: T) -> bool { + collection.push_str(item.as_ref()); + true + } + + fn finalize(collection: &mut String) -> String { + mem::replace(collection, String::new()) + } +} + +impl FromStream for Vec {} + +impl sealed::FromStreamPriv for Vec { + type Collection = Vec; + + fn initialize(lower: usize, _upper: Option) -> Vec { + Vec::with_capacity(lower) + } + + fn extend(collection: &mut Vec, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(collection: &mut Vec) -> Vec { + mem::replace(collection, vec![]) + } +} + +impl FromStream for Box<[T]> {} + +impl sealed::FromStreamPriv for Box<[T]> { + type Collection = Vec; + + fn initialize(lower: usize, upper: Option) -> Vec { + as sealed::FromStreamPriv>::initialize(lower, upper) + } + + fn extend(collection: &mut Vec, item: T) -> bool { + as sealed::FromStreamPriv>::extend(collection, item) + } + + fn finalize(collection: &mut Vec) -> Box<[T]> { + as sealed::FromStreamPriv>::finalize(collection).into_boxed_slice() + } +} + +impl FromStream> for Result where U: FromStream {} + +impl sealed::FromStreamPriv> for Result +where + U: FromStream, +{ + type Collection = Result; + + fn initialize(lower: usize, upper: Option) -> Result { + Ok(U::initialize(lower, upper)) + } + + fn extend(collection: &mut Self::Collection, item: Result) -> bool { + assert!(collection.is_ok()); + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(collection, item) + } + Err(err) => { + *collection = Err(err); + false + } + } + } + + fn finalize(collection: &mut Self::Collection) -> Result { + if let Ok(collection) = collection.as_mut() { + Ok(U::finalize(collection)) + } else { + let res = mem::replace(collection, Ok(U::initialize(0, Some(0)))); + + if let Err(err) = res { + Err(err) + } else { + unreachable!(); + } + } + } +} + +impl FromStream for Bytes {} + +impl sealed::FromStreamPriv for Bytes { + type Collection = BytesMut; + + fn initialize(_lower: usize, _upper: Option) -> BytesMut { + BytesMut::new() + } + + fn extend(collection: &mut BytesMut, item: T) -> bool { + collection.put(item); + true + } + + fn finalize(collection: &mut BytesMut) -> Bytes { + mem::replace(collection, BytesMut::new()).freeze() + } +} + +impl FromStream for BytesMut {} + +impl sealed::FromStreamPriv for BytesMut { + type Collection = BytesMut; + + fn initialize(_lower: usize, _upper: Option) -> BytesMut { + BytesMut::new() + } + + fn extend(collection: &mut BytesMut, item: T) -> bool { + collection.put(item); + true + } + + fn finalize(collection: &mut BytesMut) -> BytesMut { + mem::replace(collection, BytesMut::new()) + } +} + +pub(crate) mod sealed { + #[doc(hidden)] + pub trait FromStreamPriv { + /// Intermediate type used during collection process + type Collection; + + /// Initialize the collection + fn initialize(lower: usize, upper: Option) -> Self::Collection; + + /// Extend the collection with the received item + /// + /// Return `true` to continue streaming, `false` complete collection. + fn extend(collection: &mut Self::Collection, item: T) -> bool; + + /// Finalize collection into target type. + fn finalize(collection: &mut Self::Collection) -> Self; + } +} -- cgit v1.2.3