summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-util/src/sink
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-util/src/sink')
-rw-r--r--third_party/rust/futures-util/src/sink/buffer.rs105
-rw-r--r--third_party/rust/futures-util/src/sink/close.rs32
-rw-r--r--third_party/rust/futures-util/src/sink/drain.rs59
-rw-r--r--third_party/rust/futures-util/src/sink/err_into.rs57
-rw-r--r--third_party/rust/futures-util/src/sink/fanout.rs111
-rw-r--r--third_party/rust/futures-util/src/sink/feed.rs43
-rw-r--r--third_party/rust/futures-util/src/sink/flush.rs36
-rw-r--r--third_party/rust/futures-util/src/sink/map_err.rs65
-rw-r--r--third_party/rust/futures-util/src/sink/mod.rs344
-rw-r--r--third_party/rust/futures-util/src/sink/send.rs41
-rw-r--r--third_party/rust/futures-util/src/sink/send_all.rs100
-rw-r--r--third_party/rust/futures-util/src/sink/unfold.rs89
-rw-r--r--third_party/rust/futures-util/src/sink/with.rs134
-rw-r--r--third_party/rust/futures-util/src/sink/with_flat_map.rs127
14 files changed, 1343 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/sink/buffer.rs b/third_party/rust/futures-util/src/sink/buffer.rs
new file mode 100644
index 0000000000..4aa6c36033
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/buffer.rs
@@ -0,0 +1,105 @@
+use alloc::collections::VecDeque;
+use core::pin::Pin;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink for the [`buffer`](super::SinkExt::buffer) method.
+ #[derive(Debug)]
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct Buffer<Si, Item> {
+ #[pin]
+ sink: Si,
+ buf: VecDeque<Item>,
+
+ // Track capacity separately from the `VecDeque`, which may be rounded up
+ capacity: usize,
+ }
+}
+
+impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
+ pub(super) fn new(sink: Si, capacity: usize) -> Self {
+ Self { sink, buf: VecDeque::with_capacity(capacity), capacity }
+ }
+
+ delegate_access_inner!(sink, Si, ());
+
+ fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> {
+ let mut this = self.project();
+ ready!(this.sink.as_mut().poll_ready(cx))?;
+ while let Some(item) = this.buf.pop_front() {
+ this.sink.as_mut().start_send(item)?;
+ if !this.buf.is_empty() {
+ ready!(this.sink.as_mut().poll_ready(cx))?;
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S, Item> Stream for Buffer<S, Item>
+where
+ S: Sink<Item> + Stream,
+{
+ type Item = S::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
+ self.project().sink.poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.sink.size_hint()
+ }
+}
+
+impl<S, Item> FusedStream for Buffer<S, Item>
+where
+ S: Sink<Item> + FusedStream,
+{
+ fn is_terminated(&self) -> bool {
+ self.sink.is_terminated()
+ }
+}
+
+impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
+ type Error = Si::Error;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.capacity == 0 {
+ return self.project().sink.poll_ready(cx);
+ }
+
+ let _ = self.as_mut().try_empty_buffer(cx)?;
+
+ if self.buf.len() >= self.capacity {
+ Poll::Pending
+ } else {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ if self.capacity == 0 {
+ self.project().sink.start_send(item)
+ } else {
+ self.project().buf.push_back(item);
+ Ok(())
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().try_empty_buffer(cx))?;
+ debug_assert!(self.buf.is_empty());
+ self.project().sink.poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().try_empty_buffer(cx))?;
+ debug_assert!(self.buf.is_empty());
+ self.project().sink.poll_close(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/close.rs b/third_party/rust/futures-util/src/sink/close.rs
new file mode 100644
index 0000000000..43eea74b0f
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/close.rs
@@ -0,0 +1,32 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+
+/// Future for the [`close`](super::SinkExt::close) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Close<'a, Si: ?Sized, Item> {
+ sink: &'a mut Si,
+ _phantom: PhantomData<fn(Item)>,
+}
+
+impl<Si: Unpin + ?Sized, Item> Unpin for Close<'_, Si, Item> {}
+
+/// A future that completes when the sink has finished closing.
+///
+/// The sink itself is returned after closing is complete.
+impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Close<'a, Si, Item> {
+ pub(super) fn new(sink: &'a mut Si) -> Self {
+ Self { sink, _phantom: PhantomData }
+ }
+}
+
+impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Close<'_, Si, Item> {
+ type Output = Result<(), Si::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.sink).poll_close(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/drain.rs b/third_party/rust/futures-util/src/sink/drain.rs
new file mode 100644
index 0000000000..1a5480c0d6
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/drain.rs
@@ -0,0 +1,59 @@
+use super::assert_sink;
+use crate::never::Never;
+use core::marker::PhantomData;
+use core::pin::Pin;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+
+/// Sink for the [`drain`] function.
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct Drain<T> {
+ marker: PhantomData<T>,
+}
+
+/// Create a sink that will just discard all items given to it.
+///
+/// Similar to [`io::Sink`](::std::io::Sink).
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::sink::{self, SinkExt};
+///
+/// let mut drain = sink::drain();
+/// drain.send(5).await?;
+/// # Ok::<(), futures::never::Never>(()) }).unwrap();
+/// ```
+pub fn drain<T>() -> Drain<T> {
+ assert_sink::<T, Never, _>(Drain { marker: PhantomData })
+}
+
+impl<T> Unpin for Drain<T> {}
+
+impl<T> Clone for Drain<T> {
+ fn clone(&self) -> Self {
+ drain()
+ }
+}
+
+impl<T> Sink<T> for Drain<T> {
+ type Error = Never;
+
+ fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn start_send(self: Pin<&mut Self>, _item: T) -> Result<(), Self::Error> {
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/err_into.rs b/third_party/rust/futures-util/src/sink/err_into.rs
new file mode 100644
index 0000000000..a64d1337ba
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/err_into.rs
@@ -0,0 +1,57 @@
+use crate::sink::{SinkExt, SinkMapErr};
+use futures_core::stream::{FusedStream, Stream};
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method.
+ #[derive(Debug)]
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct SinkErrInto<Si: Sink<Item>, Item, E> {
+ #[pin]
+ sink: SinkMapErr<Si, fn(Si::Error) -> E>,
+ }
+}
+
+impl<Si, E, Item> SinkErrInto<Si, Item, E>
+where
+ Si: Sink<Item>,
+ Si::Error: Into<E>,
+{
+ pub(super) fn new(sink: Si) -> Self {
+ Self { sink: SinkExt::sink_map_err(sink, Into::into) }
+ }
+
+ delegate_access_inner!(sink, Si, (.));
+}
+
+impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E>
+where
+ Si: Sink<Item>,
+ Si::Error: Into<E>,
+{
+ type Error = E;
+
+ delegate_sink!(sink, Item);
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
+where
+ S: Sink<Item> + Stream,
+ S::Error: Into<E>,
+{
+ type Item = S::Item;
+
+ delegate_stream!(sink);
+}
+
+impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E>
+where
+ S: Sink<Item> + FusedStream,
+ S::Error: Into<E>,
+{
+ fn is_terminated(&self) -> bool {
+ self.sink.is_terminated()
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/fanout.rs b/third_party/rust/futures-util/src/sink/fanout.rs
new file mode 100644
index 0000000000..fe2038f27f
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/fanout.rs
@@ -0,0 +1,111 @@
+use core::fmt::{Debug, Formatter, Result as FmtResult};
+use core::pin::Pin;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink that clones incoming items and forwards them to two sinks at the same time.
+ ///
+ /// Backpressure from any downstream sink propagates up, which means that this sink
+ /// can only process items as fast as its _slowest_ downstream sink.
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct Fanout<Si1, Si2> {
+ #[pin]
+ sink1: Si1,
+ #[pin]
+ sink2: Si2
+ }
+}
+
+impl<Si1, Si2> Fanout<Si1, Si2> {
+ pub(super) fn new(sink1: Si1, sink2: Si2) -> Self {
+ Self { sink1, sink2 }
+ }
+
+ /// Get a shared reference to the inner sinks.
+ pub fn get_ref(&self) -> (&Si1, &Si2) {
+ (&self.sink1, &self.sink2)
+ }
+
+ /// Get a mutable reference to the inner sinks.
+ pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) {
+ (&mut self.sink1, &mut self.sink2)
+ }
+
+ /// Get a pinned mutable reference to the inner sinks.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
+ let this = self.project();
+ (this.sink1, this.sink2)
+ }
+
+ /// Consumes this combinator, returning the underlying sinks.
+ ///
+ /// Note that this may discard intermediate state of this combinator,
+ /// so care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> (Si1, Si2) {
+ (self.sink1, self.sink2)
+ }
+}
+
+impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
+ f.debug_struct("Fanout").field("sink1", &self.sink1).field("sink2", &self.sink2).finish()
+ }
+}
+
+impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
+where
+ Si1: Sink<Item>,
+ Item: Clone,
+ Si2: Sink<Item, Error = Si1::Error>,
+{
+ type Error = Si1::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ let this = self.project();
+
+ let sink1_ready = this.sink1.poll_ready(cx)?.is_ready();
+ let sink2_ready = this.sink2.poll_ready(cx)?.is_ready();
+ let ready = sink1_ready && sink2_ready;
+ if ready {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ let this = self.project();
+
+ this.sink1.start_send(item.clone())?;
+ this.sink2.start_send(item)?;
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ let this = self.project();
+
+ let sink1_ready = this.sink1.poll_flush(cx)?.is_ready();
+ let sink2_ready = this.sink2.poll_flush(cx)?.is_ready();
+ let ready = sink1_ready && sink2_ready;
+ if ready {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ let this = self.project();
+
+ let sink1_ready = this.sink1.poll_close(cx)?.is_ready();
+ let sink2_ready = this.sink2.poll_close(cx)?.is_ready();
+ let ready = sink1_ready && sink2_ready;
+ if ready {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/feed.rs b/third_party/rust/futures-util/src/sink/feed.rs
new file mode 100644
index 0000000000..6701f7a1b4
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/feed.rs
@@ -0,0 +1,43 @@
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+
+/// Future for the [`feed`](super::SinkExt::feed) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Feed<'a, Si: ?Sized, Item> {
+ sink: &'a mut Si,
+ item: Option<Item>,
+}
+
+// Pinning is never projected to children
+impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {}
+
+impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
+ pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
+ Feed { sink, item: Some(item) }
+ }
+
+ pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
+ Pin::new(self.sink)
+ }
+
+ pub(super) fn is_item_pending(&self) -> bool {
+ self.item.is_some()
+ }
+}
+
+impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> {
+ type Output = Result<(), Si::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.get_mut();
+ let mut sink = Pin::new(&mut this.sink);
+ ready!(sink.as_mut().poll_ready(cx))?;
+ let item = this.item.take().expect("polled Feed after completion");
+ sink.as_mut().start_send(item)?;
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/flush.rs b/third_party/rust/futures-util/src/sink/flush.rs
new file mode 100644
index 0000000000..35a8372de7
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/flush.rs
@@ -0,0 +1,36 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+
+/// Future for the [`flush`](super::SinkExt::flush) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Flush<'a, Si: ?Sized, Item> {
+ sink: &'a mut Si,
+ _phantom: PhantomData<fn(Item)>,
+}
+
+// Pin is never projected to a field.
+impl<Si: Unpin + ?Sized, Item> Unpin for Flush<'_, Si, Item> {}
+
+/// A future that completes when the sink has finished processing all
+/// pending requests.
+///
+/// The sink itself is returned after flushing is complete; this adapter is
+/// intended to be used when you want to stop sending to the sink until
+/// all current requests are processed.
+impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Flush<'a, Si, Item> {
+ pub(super) fn new(sink: &'a mut Si) -> Self {
+ Self { sink, _phantom: PhantomData }
+ }
+}
+
+impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Flush<'_, Si, Item> {
+ type Output = Result<(), Si::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.sink).poll_flush(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/map_err.rs b/third_party/rust/futures-util/src/sink/map_err.rs
new file mode 100644
index 0000000000..9d2ab7b24b
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/map_err.rs
@@ -0,0 +1,65 @@
+use core::pin::Pin;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
+ #[derive(Debug, Clone)]
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct SinkMapErr<Si, F> {
+ #[pin]
+ sink: Si,
+ f: Option<F>,
+ }
+}
+
+impl<Si, F> SinkMapErr<Si, F> {
+ pub(super) fn new(sink: Si, f: F) -> Self {
+ Self { sink, f: Some(f) }
+ }
+
+ delegate_access_inner!(sink, Si, ());
+
+ fn take_f(self: Pin<&mut Self>) -> F {
+ self.project().f.take().expect("polled MapErr after completion")
+ }
+}
+
+impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
+where
+ Si: Sink<Item>,
+ F: FnOnce(Si::Error) -> E,
+{
+ type Error = E;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e))
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e))
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e))
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e))
+ }
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S: Stream, F> Stream for SinkMapErr<S, F> {
+ type Item = S::Item;
+
+ delegate_stream!(sink);
+}
+
+impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
+ fn is_terminated(&self) -> bool {
+ self.sink.is_terminated()
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/mod.rs b/third_party/rust/futures-util/src/sink/mod.rs
new file mode 100644
index 0000000000..147e9adc93
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/mod.rs
@@ -0,0 +1,344 @@
+//! Asynchronous sinks.
+//!
+//! This module contains:
+//!
+//! - The [`Sink`] trait, which allows you to asynchronously write data.
+//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
+//! sinks.
+
+use crate::future::{assert_future, Either};
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::stream::{Stream, TryStream};
+use futures_core::task::{Context, Poll};
+
+#[cfg(feature = "compat")]
+use crate::compat::CompatSink;
+
+pub use futures_sink::Sink;
+
+mod close;
+pub use self::close::Close;
+
+mod drain;
+pub use self::drain::{drain, Drain};
+
+mod fanout;
+pub use self::fanout::Fanout;
+
+mod feed;
+pub use self::feed::Feed;
+
+mod flush;
+pub use self::flush::Flush;
+
+mod err_into;
+pub use self::err_into::SinkErrInto;
+
+mod map_err;
+pub use self::map_err::SinkMapErr;
+
+mod send;
+pub use self::send::Send;
+
+mod send_all;
+pub use self::send_all::SendAll;
+
+mod unfold;
+pub use self::unfold::{unfold, Unfold};
+
+mod with;
+pub use self::with::With;
+
+mod with_flat_map;
+pub use self::with_flat_map::WithFlatMap;
+
+#[cfg(feature = "alloc")]
+mod buffer;
+#[cfg(feature = "alloc")]
+pub use self::buffer::Buffer;
+
+impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
+
+/// An extension trait for `Sink`s that provides a variety of convenient
+/// combinator functions.
+pub trait SinkExt<Item>: Sink<Item> {
+ /// Composes a function *in front of* the sink.
+ ///
+ /// This adapter produces a new sink that passes each value through the
+ /// given function `f` before sending it to `self`.
+ ///
+ /// To process each value, `f` produces a *future*, which is then polled to
+ /// completion before passing its result down to the underlying sink. If the
+ /// future produces an error, that error is returned by the new sink.
+ ///
+ /// Note that this function consumes the given sink, returning a wrapped
+ /// version, much like `Iterator::map`.
+ fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
+ where
+ F: FnMut(U) -> Fut,
+ Fut: Future<Output = Result<Item, E>>,
+ E: From<Self::Error>,
+ Self: Sized,
+ {
+ assert_sink::<U, E, _>(With::new(self, f))
+ }
+
+ /// Composes a function *in front of* the sink.
+ ///
+ /// This adapter produces a new sink that passes each value through the
+ /// given function `f` before sending it to `self`.
+ ///
+ /// To process each value, `f` produces a *stream*, of which each value
+ /// is passed to the underlying sink. A new value will not be accepted until
+ /// the stream has been drained
+ ///
+ /// Note that this function consumes the given sink, returning a wrapped
+ /// version, much like `Iterator::flat_map`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::sink::SinkExt;
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let (tx, rx) = mpsc::channel(5);
+ ///
+ /// let mut tx = tx.with_flat_map(|x| {
+ /// stream::iter(vec![Ok(42); x])
+ /// });
+ ///
+ /// tx.send(5).await.unwrap();
+ /// drop(tx);
+ /// let received: Vec<i32> = rx.collect().await;
+ /// assert_eq!(received, vec![42, 42, 42, 42, 42]);
+ /// # });
+ /// ```
+ fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
+ where
+ F: FnMut(U) -> St,
+ St: Stream<Item = Result<Item, Self::Error>>,
+ Self: Sized,
+ {
+ assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f))
+ }
+
+ /*
+ fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
+ where F: FnMut(U) -> Self::SinkItem,
+ Self: Sized;
+
+ fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
+ where F: FnMut(Self::SinkItem) -> bool,
+ Self: Sized;
+
+ fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
+ where F: FnMut(U) -> Option<Self::SinkItem>,
+ Self: Sized;
+ */
+
+ /// Transforms the error returned by the sink.
+ fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
+ where
+ F: FnOnce(Self::Error) -> E,
+ Self: Sized,
+ {
+ assert_sink::<Item, E, _>(SinkMapErr::new(self, f))
+ }
+
+ /// Map this sink's error to a different error type using the `Into` trait.
+ ///
+ /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
+ fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
+ where
+ Self: Sized,
+ Self::Error: Into<E>,
+ {
+ assert_sink::<Item, E, _>(SinkErrInto::new(self))
+ }
+
+ /// Adds a fixed-size buffer to the current sink.
+ ///
+ /// The resulting sink will buffer up to `capacity` items when the
+ /// underlying sink is unwilling to accept additional items. Calling `flush`
+ /// on the buffered sink will attempt to both empty the buffer and complete
+ /// processing on the underlying sink.
+ ///
+ /// Note that this function consumes the given sink, returning a wrapped
+ /// version, much like `Iterator::map`.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "alloc")]
+ fn buffer(self, capacity: usize) -> Buffer<Self, Item>
+ where
+ Self: Sized,
+ {
+ assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity))
+ }
+
+ /// Close the sink.
+ fn close(&mut self) -> Close<'_, Self, Item>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<(), Self::Error>, _>(Close::new(self))
+ }
+
+ /// Fanout items to multiple sinks.
+ ///
+ /// This adapter clones each incoming item and forwards it to both this as well as
+ /// the other sink at the same time.
+ fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
+ where
+ Self: Sized,
+ Item: Clone,
+ Si: Sink<Item, Error = Self::Error>,
+ {
+ assert_sink::<Item, Self::Error, _>(Fanout::new(self, other))
+ }
+
+ /// Flush the sink, processing all pending items.
+ ///
+ /// This adapter is intended to be used when you want to stop sending to the sink
+ /// until all current requests are processed.
+ fn flush(&mut self) -> Flush<'_, Self, Item>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
+ }
+
+ /// A future that completes after the given item has been fully processed
+ /// into the sink, including flushing.
+ ///
+ /// Note that, **because of the flushing requirement, it is usually better
+ /// to batch together items to send via `feed` or `send_all`,
+ /// rather than flushing between each item.**
+ fn send(&mut self, item: Item) -> Send<'_, Self, Item>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
+ }
+
+ /// A future that completes after the given item has been received
+ /// by the sink.
+ ///
+ /// Unlike `send`, the returned future does not flush the sink.
+ /// It is the caller's responsibility to ensure all pending items
+ /// are processed, which can be done via `flush` or `close`.
+ fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
+ }
+
+ /// A future that completes after the given stream has been fully processed
+ /// into the sink, including flushing.
+ ///
+ /// This future will drive the stream to keep producing items until it is
+ /// exhausted, sending each item to the sink. It will complete once both the
+ /// stream is exhausted, the sink has received all items, and the sink has
+ /// been flushed. Note that the sink is **not** closed. If the stream produces
+ /// an error, that error will be returned by this future without flushing the sink.
+ ///
+ /// Doing `sink.send_all(stream)` is roughly equivalent to
+ /// `stream.forward(sink)`. The returned future will exhaust all items from
+ /// `stream` and send them to `self`.
+ fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
+ where
+ St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
+ // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
+ Self: Unpin,
+ {
+ // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>`
+ // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
+ SendAll::new(self, stream)
+ }
+
+ /// Wrap this sink in an `Either` sink, making it the left-hand variant
+ /// of that `Either`.
+ ///
+ /// This can be used in combination with the `right_sink` method to write `if`
+ /// statements that evaluate to different streams in different branches.
+ fn left_sink<Si2>(self) -> Either<Self, Si2>
+ where
+ Si2: Sink<Item, Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_sink::<Item, Self::Error, _>(Either::Left(self))
+ }
+
+ /// Wrap this stream in an `Either` stream, making it the right-hand variant
+ /// of that `Either`.
+ ///
+ /// This can be used in combination with the `left_sink` method to write `if`
+ /// statements that evaluate to different streams in different branches.
+ fn right_sink<Si1>(self) -> Either<Si1, Self>
+ where
+ Si1: Sink<Item, Error = Self::Error>,
+ Self: Sized,
+ {
+ assert_sink::<Item, Self::Error, _>(Either::Right(self))
+ }
+
+ /// Wraps a [`Sink`] into a sink compatible with libraries using
+ /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled.
+ #[cfg(feature = "compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
+ fn compat(self) -> CompatSink<Self, Item>
+ where
+ Self: Sized + Unpin,
+ {
+ CompatSink::new(self)
+ }
+
+ /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
+ /// sink types.
+ fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).poll_ready(cx)
+ }
+
+ /// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
+ /// sink types.
+ fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).start_send(item)
+ }
+
+ /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
+ /// sink types.
+ fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).poll_flush(cx)
+ }
+
+ /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
+ /// sink types.
+ fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).poll_close(cx)
+ }
+}
+
+// Just a helper function to ensure the sinks we're returning all have the
+// right implementations.
+pub(crate) fn assert_sink<T, E, S>(sink: S) -> S
+where
+ S: Sink<T, Error = E>,
+{
+ sink
+}
diff --git a/third_party/rust/futures-util/src/sink/send.rs b/third_party/rust/futures-util/src/sink/send.rs
new file mode 100644
index 0000000000..6d21f33fe4
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/send.rs
@@ -0,0 +1,41 @@
+use super::Feed;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+
+/// Future for the [`send`](super::SinkExt::send) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Send<'a, Si: ?Sized, Item> {
+ feed: Feed<'a, Si, Item>,
+}
+
+// Pinning is never projected to children
+impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {}
+
+impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> {
+ pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
+ Self { feed: Feed::new(sink, item) }
+ }
+}
+
+impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> {
+ type Output = Result<(), Si::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+
+ if this.feed.is_item_pending() {
+ ready!(Pin::new(&mut this.feed).poll(cx))?;
+ debug_assert!(!this.feed.is_item_pending());
+ }
+
+ // we're done sending the item, but want to block on flushing the
+ // sink
+ ready!(this.feed.sink_pin_mut().poll_flush(cx))?;
+
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/send_all.rs b/third_party/rust/futures-util/src/sink/send_all.rs
new file mode 100644
index 0000000000..1302dd2148
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/send_all.rs
@@ -0,0 +1,100 @@
+use crate::stream::{Fuse, StreamExt, TryStreamExt};
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::stream::{Stream, TryStream};
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+
+/// Future for the [`send_all`](super::SinkExt::send_all) method.
+#[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct SendAll<'a, Si, St>
+where
+ Si: ?Sized,
+ St: ?Sized + TryStream,
+{
+ sink: &'a mut Si,
+ stream: Fuse<&'a mut St>,
+ buffered: Option<St::Ok>,
+}
+
+impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
+where
+ Si: fmt::Debug + ?Sized,
+ St: fmt::Debug + ?Sized + TryStream,
+ St::Ok: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("SendAll")
+ .field("sink", &self.sink)
+ .field("stream", &self.stream)
+ .field("buffered", &self.buffered)
+ .finish()
+ }
+}
+
+// Pinning is never projected to any fields
+impl<Si, St> Unpin for SendAll<'_, Si, St>
+where
+ Si: Unpin + ?Sized,
+ St: TryStream + Unpin + ?Sized,
+{
+}
+
+impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St>
+where
+ Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
+ St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized,
+{
+ pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> Self {
+ Self { sink, stream: stream.fuse(), buffered: None }
+ }
+
+ fn try_start_send(
+ &mut self,
+ cx: &mut Context<'_>,
+ item: St::Ok,
+ ) -> Poll<Result<(), Si::Error>> {
+ debug_assert!(self.buffered.is_none());
+ match Pin::new(&mut self.sink).poll_ready(cx)? {
+ Poll::Ready(()) => Poll::Ready(Pin::new(&mut self.sink).start_send(item)),
+ Poll::Pending => {
+ self.buffered = Some(item);
+ Poll::Pending
+ }
+ }
+ }
+}
+
+impl<Si, St, Ok, Error> Future for SendAll<'_, Si, St>
+where
+ Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
+ St: Stream<Item = Result<Ok, Error>> + Unpin + ?Sized,
+{
+ type Output = Result<(), Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ // If we've got an item buffered already, we need to write it to the
+ // sink before we can do anything else
+ if let Some(item) = this.buffered.take() {
+ ready!(this.try_start_send(cx, item))?
+ }
+
+ loop {
+ match this.stream.try_poll_next_unpin(cx)? {
+ Poll::Ready(Some(item)) => ready!(this.try_start_send(cx, item))?,
+ Poll::Ready(None) => {
+ ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
+ return Poll::Ready(Ok(()));
+ }
+ Poll::Pending => {
+ ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
+ return Poll::Pending;
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/unfold.rs b/third_party/rust/futures-util/src/sink/unfold.rs
new file mode 100644
index 0000000000..dea1307b66
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/unfold.rs
@@ -0,0 +1,89 @@
+use super::assert_sink;
+use crate::unfold_state::UnfoldState;
+use core::{future::Future, pin::Pin};
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink for the [`unfold`] function.
+ #[derive(Debug)]
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct Unfold<T, F, R> {
+ function: F,
+ #[pin]
+ state: UnfoldState<T, R>,
+ }
+}
+
+/// Create a sink from a function which processes one item at a time.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::sink::{self, SinkExt};
+///
+/// let unfold = sink::unfold(0, |mut sum, i: i32| {
+/// async move {
+/// sum += i;
+/// eprintln!("{}", i);
+/// Ok::<_, futures::never::Never>(sum)
+/// }
+/// });
+/// futures::pin_mut!(unfold);
+/// unfold.send(5).await?;
+/// # Ok::<(), futures::never::Never>(()) }).unwrap();
+/// ```
+pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
+where
+ F: FnMut(T, Item) -> R,
+ R: Future<Output = Result<T, E>>,
+{
+ assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
+}
+
+impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
+where
+ F: FnMut(T, Item) -> R,
+ R: Future<Output = Result<T, E>>,
+{
+ type Error = E;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_flush(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ let mut this = self.project();
+ let future = match this.state.as_mut().take_value() {
+ Some(value) => (this.function)(value, item),
+ None => panic!("start_send called without poll_ready being called first"),
+ };
+ this.state.set(UnfoldState::Future { future });
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ let mut this = self.project();
+ Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
+ match ready!(future.poll(cx)) {
+ Ok(state) => {
+ this.state.set(UnfoldState::Value { value: state });
+ Ok(())
+ }
+ Err(err) => {
+ this.state.set(UnfoldState::Empty);
+ Err(err)
+ }
+ }
+ } else {
+ Ok(())
+ })
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_flush(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/with.rs b/third_party/rust/futures-util/src/sink/with.rs
new file mode 100644
index 0000000000..86d3dcc7b8
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/with.rs
@@ -0,0 +1,134 @@
+use core::fmt;
+use core::marker::PhantomData;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink for the [`with`](super::SinkExt::with) method.
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct With<Si, Item, U, Fut, F> {
+ #[pin]
+ sink: Si,
+ f: F,
+ #[pin]
+ state: Option<Fut>,
+ _phantom: PhantomData<fn(U) -> Item>,
+ }
+}
+
+impl<Si, Item, U, Fut, F> fmt::Debug for With<Si, Item, U, Fut, F>
+where
+ Si: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("With").field("sink", &self.sink).field("state", &self.state).finish()
+ }
+}
+
+impl<Si, Item, U, Fut, F> With<Si, Item, U, Fut, F>
+where
+ Si: Sink<Item>,
+ F: FnMut(U) -> Fut,
+ Fut: Future,
+{
+ pub(super) fn new<E>(sink: Si, f: F) -> Self
+ where
+ Fut: Future<Output = Result<Item, E>>,
+ E: From<Si::Error>,
+ {
+ Self { state: None, sink, f, _phantom: PhantomData }
+ }
+}
+
+impl<Si, Item, U, Fut, F> Clone for With<Si, Item, U, Fut, F>
+where
+ Si: Clone,
+ F: Clone,
+ Fut: Clone,
+{
+ fn clone(&self) -> Self {
+ Self {
+ state: self.state.clone(),
+ sink: self.sink.clone(),
+ f: self.f.clone(),
+ _phantom: PhantomData,
+ }
+ }
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F>
+where
+ S: Stream + Sink<Item>,
+ F: FnMut(U) -> Fut,
+ Fut: Future,
+{
+ type Item = S::Item;
+
+ delegate_stream!(sink);
+}
+
+impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F>
+where
+ Si: Sink<Item>,
+ F: FnMut(U) -> Fut,
+ Fut: Future<Output = Result<Item, E>>,
+ E: From<Si::Error>,
+{
+ delegate_access_inner!(sink, Si, ());
+
+ /// Completes the processing of previous item if any.
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
+ let mut this = self.project();
+
+ let item = match this.state.as_mut().as_pin_mut() {
+ None => return Poll::Ready(Ok(())),
+ Some(fut) => ready!(fut.poll(cx))?,
+ };
+ this.state.set(None);
+ this.sink.start_send(item)?;
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F>
+where
+ Si: Sink<Item>,
+ F: FnMut(U) -> Fut,
+ Fut: Future<Output = Result<Item, E>>,
+ E: From<Si::Error>,
+{
+ type Error = E;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().poll(cx))?;
+ ready!(self.project().sink.poll_ready(cx)?);
+ Poll::Ready(Ok(()))
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> {
+ let mut this = self.project();
+
+ assert!(this.state.is_none());
+ this.state.set(Some((this.f)(item)));
+ Ok(())
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().poll(cx))?;
+ ready!(self.project().sink.poll_flush(cx)?);
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().poll(cx))?;
+ ready!(self.project().sink.poll_close(cx)?);
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/sink/with_flat_map.rs b/third_party/rust/futures-util/src/sink/with_flat_map.rs
new file mode 100644
index 0000000000..2ae877a24b
--- /dev/null
+++ b/third_party/rust/futures-util/src/sink/with_flat_map.rs
@@ -0,0 +1,127 @@
+use core::fmt;
+use core::marker::PhantomData;
+use core::pin::Pin;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method.
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct WithFlatMap<Si, Item, U, St, F> {
+ #[pin]
+ sink: Si,
+ f: F,
+ #[pin]
+ stream: Option<St>,
+ buffer: Option<Item>,
+ _marker: PhantomData<fn(U)>,
+ }
+}
+
+impl<Si, Item, U, St, F> fmt::Debug for WithFlatMap<Si, Item, U, St, F>
+where
+ Si: fmt::Debug,
+ St: fmt::Debug,
+ Item: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("WithFlatMap")
+ .field("sink", &self.sink)
+ .field("stream", &self.stream)
+ .field("buffer", &self.buffer)
+ .finish()
+ }
+}
+
+impl<Si, Item, U, St, F> WithFlatMap<Si, Item, U, St, F>
+where
+ Si: Sink<Item>,
+ F: FnMut(U) -> St,
+ St: Stream<Item = Result<Item, Si::Error>>,
+{
+ pub(super) fn new(sink: Si, f: F) -> Self {
+ Self { sink, f, stream: None, buffer: None, _marker: PhantomData }
+ }
+
+ delegate_access_inner!(sink, Si, ());
+
+ fn try_empty_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> {
+ let mut this = self.project();
+
+ if this.buffer.is_some() {
+ ready!(this.sink.as_mut().poll_ready(cx))?;
+ let item = this.buffer.take().unwrap();
+ this.sink.as_mut().start_send(item)?;
+ }
+ if let Some(mut some_stream) = this.stream.as_mut().as_pin_mut() {
+ while let Some(item) = ready!(some_stream.as_mut().poll_next(cx)?) {
+ match this.sink.as_mut().poll_ready(cx)? {
+ Poll::Ready(()) => this.sink.as_mut().start_send(item)?,
+ Poll::Pending => {
+ *this.buffer = Some(item);
+ return Poll::Pending;
+ }
+ };
+ }
+ }
+ this.stream.set(None);
+ Poll::Ready(Ok(()))
+ }
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S, Item, U, St, F> Stream for WithFlatMap<S, Item, U, St, F>
+where
+ S: Stream + Sink<Item>,
+ F: FnMut(U) -> St,
+ St: Stream<Item = Result<Item, S::Error>>,
+{
+ type Item = S::Item;
+
+ delegate_stream!(sink);
+}
+
+impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F>
+where
+ S: FusedStream + Sink<Item>,
+ F: FnMut(U) -> St,
+ St: Stream<Item = Result<Item, S::Error>>,
+{
+ fn is_terminated(&self) -> bool {
+ self.sink.is_terminated()
+ }
+}
+
+impl<Si, Item, U, St, F> Sink<U> for WithFlatMap<Si, Item, U, St, F>
+where
+ Si: Sink<Item>,
+ F: FnMut(U) -> St,
+ St: Stream<Item = Result<Item, Si::Error>>,
+{
+ type Error = Si::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.try_empty_stream(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> {
+ let mut this = self.project();
+
+ assert!(this.stream.is_none());
+ this.stream.set(Some((this.f)(item)));
+ Ok(())
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().try_empty_stream(cx)?);
+ self.project().sink.poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().try_empty_stream(cx)?);
+ self.project().sink.poll_close(cx)
+ }
+}