diff options
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_ext/collect.rs')
-rw-r--r-- | third_party/rust/tokio-stream/src/stream_ext/collect.rs | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_ext/collect.rs b/third_party/rust/tokio-stream/src/stream_ext/collect.rs new file mode 100644 index 0000000000..8548b74556 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/collect.rs @@ -0,0 +1,229 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +// Do not export this struct until `FromStream` can be unsealed. +pin_project! { + /// Future returned by the [`collect`](super::StreamExt::collect) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct Collect<T, U> + where + T: Stream, + U: FromStream<T::Item>, + { + #[pin] + stream: T, + collection: U::InternalCollection, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +/// Convert from a [`Stream`](crate::Stream). +/// +/// This trait is not intended to be used directly. Instead, call +/// [`StreamExt::collect()`](super::StreamExt::collect). +/// +/// # Implementing +/// +/// Currently, this trait may not be implemented by third parties. The trait is +/// sealed in order to make changes in the future. Stabilization is pending +/// enhancements to the Rust language. +pub trait FromStream<T>: sealed::FromStreamPriv<T> {} + +impl<T, U> Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + pub(super) fn new(stream: T) -> Collect<T, U> { + let (lower, upper) = stream.size_hint(); + let collection = U::initialize(sealed::Internal, lower, upper); + + Collect { + stream, + collection, + _pin: PhantomPinned, + } + } +} + +impl<T, U> Future for Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + type Output = U; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> { + use Poll::Ready; + + loop { + let me = self.as_mut().project(); + + let item = match ready!(me.stream.poll_next(cx)) { + Some(item) => item, + None => { + return Ready(U::finalize(sealed::Internal, me.collection)); + } + }; + + if !U::extend(sealed::Internal, me.collection, item) { + return Ready(U::finalize(sealed::Internal, me.collection)); + } + } + } +} + +// ===== FromStream implementations + +impl FromStream<()> for () {} + +impl sealed::FromStreamPriv<()> for () { + type InternalCollection = (); + + fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {} + + fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool { + true + } + + fn finalize(_: sealed::Internal, _collection: &mut ()) {} +} + +impl<T: AsRef<str>> FromStream<T> for String {} + +impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { + type InternalCollection = String; + + fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String { + String::new() + } + + fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool { + collection.push_str(item.as_ref()); + true + } + + fn finalize(_: sealed::Internal, collection: &mut String) -> String { + mem::take(collection) + } +} + +impl<T> FromStream<T> for Vec<T> {} + +impl<T> sealed::FromStreamPriv<T> for Vec<T> { + type InternalCollection = Vec<T>; + + fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> { + Vec::with_capacity(lower) + } + + fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> { + mem::take(collection) + } +} + +impl<T> FromStream<T> for Box<[T]> {} + +impl<T> sealed::FromStreamPriv<T> for Box<[T]> { + type InternalCollection = Vec<T>; + + fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> { + <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper) + } + + fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { + <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item) + } + + fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> { + <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection) + .into_boxed_slice() + } +} + +impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {} + +impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E> +where + U: FromStream<T>, +{ + type InternalCollection = Result<U::InternalCollection, E>; + + fn initialize( + _: sealed::Internal, + lower: usize, + upper: Option<usize>, + ) -> Result<U::InternalCollection, E> { + Ok(U::initialize(sealed::Internal, lower, upper)) + } + + fn extend( + _: sealed::Internal, + collection: &mut Self::InternalCollection, + item: Result<T, E>, + ) -> bool { + assert!(collection.is_ok()); + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(sealed::Internal, collection, item) + } + Err(err) => { + *collection = Err(err); + false + } + } + } + + fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> { + if let Ok(collection) = collection.as_mut() { + Ok(U::finalize(sealed::Internal, collection)) + } else { + let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); + + Err(res.map(drop).unwrap_err()) + } + } +} + +pub(crate) mod sealed { + #[doc(hidden)] + pub trait FromStreamPriv<T> { + /// Intermediate type used during collection process + /// + /// The name of this type is internal and cannot be relied upon. + type InternalCollection; + + /// Initialize the collection + fn initialize( + internal: Internal, + lower: usize, + upper: Option<usize>, + ) -> Self::InternalCollection; + + /// Extend the collection with the received item + /// + /// Return `true` to continue streaming, `false` complete collection. + fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool; + + /// Finalize collection into target type. + fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self; + } + + #[allow(missing_debug_implementations)] + pub struct Internal; +} |