1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
use super::assert_sink;
use crate::unfold_state::UnfoldState;
use core::{future::Future, pin::Pin};
use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_project_lite::pin_project;
pin_project! {
/// Sink for the [`unfold`] function.
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct Unfold<T, F, R> {
function: F,
#[pin]
state: UnfoldState<T, R>,
}
}
/// Create a sink from a function which processes one item at a time.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::sink::{self, SinkExt};
///
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
/// async move {
/// sum += i;
/// eprintln!("{}", i);
/// Ok::<_, futures::never::Never>(sum)
/// }
/// });
/// futures::pin_mut!(unfold);
/// unfold.send(5).await?;
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
/// ```
pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
where
F: FnMut(T, Item) -> R,
R: Future<Output = Result<T, E>>,
{
assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
}
impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
where
F: FnMut(T, Item) -> R,
R: Future<Output = Result<T, E>>,
{
type Error = E;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
let mut this = self.project();
let future = match this.state.as_mut().take_value() {
Some(value) => (this.function)(value, item),
None => panic!("start_send called without poll_ready being called first"),
};
this.state.set(UnfoldState::Future { future });
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
match ready!(future.poll(cx)) {
Ok(state) => {
this.state.set(UnfoldState::Value { value: state });
Ok(())
}
Err(err) => {
this.state.set(UnfoldState::Empty);
Err(err)
}
}
} else {
Ok(())
})
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}
|