use {Async, Poll}; use stream::{Stream, Fuse}; /// A `Stream` that implements a `peek` method. /// /// The `peek` method can be used to retrieve a reference /// to the next `Stream::Item` if available. A subsequent /// call to `poll` will return the owned item. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Peekable { stream: Fuse, peeked: Option, } pub fn new(stream: S) -> Peekable { Peekable { stream: stream.fuse(), peeked: None } } // Forwarding impl of Sink from the underlying stream impl ::sink::Sink for Peekable where S: ::sink::Sink + Stream { type SinkItem = S::SinkItem; type SinkError = S::SinkError; fn start_send(&mut self, item: S::SinkItem) -> ::StartSend { self.stream.start_send(item) } fn poll_complete(&mut self) -> Poll<(), S::SinkError> { self.stream.poll_complete() } fn close(&mut self) -> Poll<(), S::SinkError> { self.stream.close() } } impl Stream for Peekable { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Poll, Self::Error> { if let Some(item) = self.peeked.take() { return Ok(Async::Ready(Some(item))) } self.stream.poll() } } impl Peekable { /// Peek retrieves a reference to the next item in the stream. /// /// This method polls the underlying stream and return either a reference /// to the next item if the stream is ready or passes through any errors. pub fn peek(&mut self) -> Poll, S::Error> { if self.peeked.is_some() { return Ok(Async::Ready(self.peeked.as_ref())) } match try_ready!(self.poll()) { None => Ok(Async::Ready(None)), Some(item) => { self.peeked = Some(item); Ok(Async::Ready(self.peeked.as_ref())) } } } }