From 2aa4a82499d4becd2284cdb482213d541b8804dd Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 16:29:10 +0200 Subject: Adding upstream version 86.0.1. Signed-off-by: Daniel Baumann --- .../rust/futures-0.1.29/src/stream/flatten.rs | 96 ++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 third_party/rust/futures-0.1.29/src/stream/flatten.rs (limited to 'third_party/rust/futures-0.1.29/src/stream/flatten.rs') diff --git a/third_party/rust/futures-0.1.29/src/stream/flatten.rs b/third_party/rust/futures-0.1.29/src/stream/flatten.rs new file mode 100644 index 0000000000..4baf9045a0 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/stream/flatten.rs @@ -0,0 +1,96 @@ +use {Poll, Async}; +use stream::Stream; + +/// A combinator used to flatten a stream-of-streams into one long stream of +/// elements. +/// +/// This combinator is created by the `Stream::flatten` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Flatten + where S: Stream, +{ + stream: S, + next: Option, +} + +pub fn new(s: S) -> Flatten + where S: Stream, + S::Item: Stream, + ::Error: From, +{ + Flatten { + stream: s, + next: None, + } +} + +impl Flatten { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl ::sink::Sink for Flatten + where S: ::sink::Sink + Stream +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl Stream for Flatten + where S: Stream, + S::Item: Stream, + ::Error: From, +{ + type Item = ::Item; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + if self.next.is_none() { + match try_ready!(self.stream.poll()) { + Some(e) => self.next = Some(e), + None => return Ok(Async::Ready(None)), + } + } + assert!(self.next.is_some()); + match self.next.as_mut().unwrap().poll() { + Ok(Async::Ready(None)) => self.next = None, + other => return other, + } + } + } +} -- cgit v1.2.3