use core::mem; use core::marker::PhantomData; use {IntoFuture, Future, Poll, Async, StartSend, AsyncSink}; use sink::Sink; use stream::Stream; /// Sink for the `Sink::with` combinator, chaining a computation to run *prior* /// to pushing a value into the underlying sink. #[derive(Clone, Debug)] #[must_use = "sinks do nothing unless polled"] pub struct With where S: Sink, F: FnMut(U) -> Fut, Fut: IntoFuture, { sink: S, f: F, state: State, _phantom: PhantomData, } #[derive(Clone, Debug)] enum State { Empty, Process(Fut), Buffered(T), } impl State { fn is_empty(&self) -> bool { if let State::Empty = *self { true } else { false } } } pub fn new(sink: S, f: F) -> With where S: Sink, F: FnMut(U) -> Fut, Fut: IntoFuture, Fut::Error: From, { With { state: State::Empty, sink: sink, f: f, _phantom: PhantomData, } } // Forwarding impl of Stream from the underlying sink impl Stream for With where S: Stream + Sink, F: FnMut(U) -> Fut, Fut: IntoFuture { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Poll, S::Error> { self.sink.poll() } } impl With where S: Sink, F: FnMut(U) -> Fut, Fut: IntoFuture, Fut::Error: From, { /// Get a shared reference to the inner sink. pub fn get_ref(&self) -> &S { &self.sink } /// Get a mutable reference to the inner sink. pub fn get_mut(&mut self) -> &mut S { &mut self.sink } /// Consumes this combinator, returning the underlying sink. /// /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> S { self.sink } fn poll(&mut self) -> Poll<(), Fut::Error> { loop { match mem::replace(&mut self.state, State::Empty) { State::Empty => break, State::Process(mut fut) => { match fut.poll()? { Async::Ready(item) => { self.state = State::Buffered(item); } Async::NotReady => { self.state = State::Process(fut); break } } } State::Buffered(item) => { if let AsyncSink::NotReady(item) = self.sink.start_send(item)? { self.state = State::Buffered(item); break } } } } if self.state.is_empty() { Ok(Async::Ready(())) } else { Ok(Async::NotReady) } } } impl Sink for With where S: Sink, F: FnMut(U) -> Fut, Fut: IntoFuture, Fut::Error: From, { type SinkItem = U; type SinkError = Fut::Error; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { if self.poll()?.is_not_ready() { return Ok(AsyncSink::NotReady(item)) } self.state = State::Process((self.f)(item).into_future()); Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), Fut::Error> { // poll ourselves first, to push data downward let me_ready = self.poll()?; // always propagate `poll_complete` downward to attempt to make progress try_ready!(self.sink.poll_complete()); Ok(me_ready) } fn close(&mut self) -> Poll<(), Fut::Error> { try_ready!(self.poll()); Ok(self.sink.close()?) } }