use core::marker::PhantomData; use {Poll, Async, StartSend, AsyncSink}; use sink::Sink; use stream::Stream; /// Sink for the `Sink::with_flat_map` combinator, chaining a computation that returns an iterator /// to run prior to pushing a value into the underlying sink #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] pub struct WithFlatMap where S: Sink, F: FnMut(U) -> St, St: Stream, { sink: S, f: F, stream: Option, buffer: Option, _phantom: PhantomData, } pub fn new(sink: S, f: F) -> WithFlatMap where S: Sink, F: FnMut(U) -> St, St: Stream, { WithFlatMap { sink: sink, f: f, stream: None, buffer: None, _phantom: PhantomData, } } impl WithFlatMap where S: Sink, F: FnMut(U) -> St, St: Stream, { /// Get a shared reference to the inner sink. pub fn get_ref(&self) -> &S { &self.sink } /// Get a mutable reference to the inner sink. pub fn get_mut(&mut self) -> &mut S { &mut self.sink } /// Consumes this combinator, returning the underlying sink. /// /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> S { self.sink } fn try_empty_stream(&mut self) -> Poll<(), S::SinkError> { if let Some(x) = self.buffer.take() { if let AsyncSink::NotReady(x) = self.sink.start_send(x)? { self.buffer = Some(x); return Ok(Async::NotReady); } } if let Some(mut stream) = self.stream.take() { while let Some(x) = try_ready!(stream.poll()) { if let AsyncSink::NotReady(x) = self.sink.start_send(x)? { self.stream = Some(stream); self.buffer = Some(x); return Ok(Async::NotReady); } } } Ok(Async::Ready(())) } } impl Stream for WithFlatMap where S: Stream + Sink, F: FnMut(U) -> St, St: Stream, { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Poll, S::Error> { self.sink.poll() } } impl Sink for WithFlatMap where S: Sink, F: FnMut(U) -> St, St: Stream, { type SinkItem = U; type SinkError = S::SinkError; fn start_send(&mut self, i: Self::SinkItem) -> StartSend { if self.try_empty_stream()?.is_not_ready() { return Ok(AsyncSink::NotReady(i)); } assert!(self.stream.is_none()); self.stream = Some((self.f)(i)); self.try_empty_stream()?; Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { if self.try_empty_stream()?.is_not_ready() { return Ok(Async::NotReady); } self.sink.poll_complete() } fn close(&mut self) -> Poll<(), Self::SinkError> { if self.try_empty_stream()?.is_not_ready() { return Ok(Async::NotReady); } assert!(self.stream.is_none()); self.sink.close() } }