From 43a97878ce14b72f0981164f87f2e35e14151312 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 11:22:09 +0200 Subject: Adding upstream version 110.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/futures-0.1.31/src/stream/then.rs | 81 ++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 third_party/rust/futures-0.1.31/src/stream/then.rs (limited to 'third_party/rust/futures-0.1.31/src/stream/then.rs') diff --git a/third_party/rust/futures-0.1.31/src/stream/then.rs b/third_party/rust/futures-0.1.31/src/stream/then.rs new file mode 100644 index 0000000000..cab338e922 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/then.rs @@ -0,0 +1,81 @@ +use {Async, IntoFuture, Future, Poll}; +use stream::Stream; + +/// A stream combinator which chains a computation onto each item produced by a +/// stream. +/// +/// This structure is produced by the `Stream::then` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Then + where U: IntoFuture, +{ + stream: S, + future: Option, + f: F, +} + +pub fn new(s: S, f: F) -> Then + where S: Stream, + F: FnMut(Result) -> U, + U: IntoFuture, +{ + Then { + stream: s, + future: None, + f: f, + } +} + +// Forwarding impl of Sink from the underlying stream +impl ::sink::Sink for Then + where S: ::sink::Sink, U: IntoFuture, +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl Stream for Then + where S: Stream, + F: FnMut(Result) -> U, + U: IntoFuture, +{ + type Item = U::Item; + type Error = U::Error; + + fn poll(&mut self) -> Poll, U::Error> { + if self.future.is_none() { + let item = match self.stream.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::Ready(Some(e))) => Ok(e), + Err(e) => Err(e), + }; + self.future = Some((self.f)(item).into_future()); + } + assert!(self.future.is_some()); + match self.future.as_mut().unwrap().poll() { + Ok(Async::Ready(e)) => { + self.future = None; + Ok(Async::Ready(Some(e))) + } + Err(e) => { + self.future = None; + Err(e) + } + Ok(Async::NotReady) => Ok(Async::NotReady) + } + } +} -- cgit v1.2.3