diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/and_then.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/stream/and_then.rs | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/and_then.rs b/third_party/rust/futures-0.1.31/src/stream/and_then.rs new file mode 100644 index 0000000000..1fac8b952d --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/and_then.rs @@ -0,0 +1,106 @@ +use {IntoFuture, Future, Poll, Async}; +use stream::Stream; + +/// A stream combinator which chains a computation onto values produced by a +/// stream. +/// +/// This structure is produced by the `Stream::and_then` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct AndThen<S, F, U> + where U: IntoFuture, +{ + stream: S, + future: Option<U::Future>, + f: F, +} + +pub fn new<S, F, U>(s: S, f: F) -> AndThen<S, F, U> + where S: Stream, + F: FnMut(S::Item) -> U, + U: IntoFuture<Error=S::Error>, +{ + AndThen { + stream: s, + future: None, + f: f, + } +} + +impl<S, F, U> AndThen<S, F, U> + where U: IntoFuture, +{ + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &S { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut S { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.stream + } +} + +// Forwarding impl of Sink from the underlying stream +impl<S, F, U: IntoFuture> ::sink::Sink for AndThen<S, F, U> + where S: ::sink::Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { + self.stream.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.stream.poll_complete() + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.stream.close() + } +} + +impl<S, F, U> Stream for AndThen<S, F, U> + where S: Stream, + F: FnMut(S::Item) -> U, + U: IntoFuture<Error=S::Error>, +{ + type Item = U::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<U::Item>, S::Error> { + if self.future.is_none() { + let item = match try_ready!(self.stream.poll()) { + None => return Ok(Async::Ready(None)), + Some(e) => e, + }; + self.future = Some((self.f)(item).into_future()); + } + assert!(self.future.is_some()); + match self.future.as_mut().unwrap().poll() { + Ok(Async::Ready(e)) => { + self.future = None; + Ok(Async::Ready(Some(e))) + } + Err(e) => { + self.future = None; + Err(e) + } + Ok(Async::NotReady) => Ok(Async::NotReady) + } + } +} |