use super::assert_sink; use crate::unfold_state::UnfoldState; use core::{future::Future, pin::Pin}; use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_project_lite::pin_project; pin_project! { /// Sink for the [`unfold`] function. #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] pub struct Unfold { function: F, #[pin] state: UnfoldState, } } /// Create a sink from a function which processes one item at a time. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::sink::{self, SinkExt}; /// /// let unfold = sink::unfold(0, |mut sum, i: i32| { /// async move { /// sum += i; /// eprintln!("{}", i); /// Ok::<_, futures::never::Never>(sum) /// } /// }); /// futures::pin_mut!(unfold); /// unfold.send(5).await?; /// # Ok::<(), futures::never::Never>(()) }).unwrap(); /// ``` pub fn unfold(init: T, function: F) -> Unfold where F: FnMut(T, Item) -> R, R: Future>, { assert_sink::(Unfold { function, state: UnfoldState::Value { value: init } }) } impl Sink for Unfold where F: FnMut(T, Item) -> R, R: Future>, { type Error = E; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_flush(cx) } fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { let mut this = self.project(); let future = match this.state.as_mut().take_value() { Some(value) => (this.function)(value, item), None => panic!("start_send called without poll_ready being called first"), }; this.state.set(UnfoldState::Future { future }); Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); Poll::Ready(if let Some(future) = this.state.as_mut().project_future() { match ready!(future.poll(cx)) { Ok(state) => { this.state.set(UnfoldState::Value { value: state }); Ok(()) } Err(err) => { this.state.set(UnfoldState::Empty); Err(err) } } } else { Ok(()) }) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_flush(cx) } }