diff options
Diffstat (limited to 'third_party/rust/futures-util/src/sink')
-rw-r--r-- | third_party/rust/futures-util/src/sink/buffer.rs | 105 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/close.rs | 32 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/drain.rs | 59 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/err_into.rs | 57 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/fanout.rs | 111 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/feed.rs | 43 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/flush.rs | 36 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/map_err.rs | 65 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/mod.rs | 344 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/send.rs | 41 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/send_all.rs | 100 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/unfold.rs | 86 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/with.rs | 134 | ||||
-rw-r--r-- | third_party/rust/futures-util/src/sink/with_flat_map.rs | 127 |
14 files changed, 1340 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/sink/buffer.rs b/third_party/rust/futures-util/src/sink/buffer.rs new file mode 100644 index 0000000000..4aa6c36033 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/buffer.rs @@ -0,0 +1,105 @@ +use alloc::collections::VecDeque; +use core::pin::Pin; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Sink for the [`buffer`](super::SinkExt::buffer) method. + #[derive(Debug)] + #[must_use = "sinks do nothing unless polled"] + pub struct Buffer<Si, Item> { + #[pin] + sink: Si, + buf: VecDeque<Item>, + + // Track capacity separately from the `VecDeque`, which may be rounded up + capacity: usize, + } +} + +impl<Si: Sink<Item>, Item> Buffer<Si, Item> { + pub(super) fn new(sink: Si, capacity: usize) -> Self { + Self { sink, buf: VecDeque::with_capacity(capacity), capacity } + } + + delegate_access_inner!(sink, Si, ()); + + fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { + let mut this = self.project(); + ready!(this.sink.as_mut().poll_ready(cx))?; + while let Some(item) = this.buf.pop_front() { + this.sink.as_mut().start_send(item)?; + if !this.buf.is_empty() { + ready!(this.sink.as_mut().poll_ready(cx))?; + } + } + Poll::Ready(Ok(())) + } +} + +// Forwarding impl of Stream from the underlying sink +impl<S, Item> Stream for Buffer<S, Item> +where + S: Sink<Item> + Stream, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { + self.project().sink.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.sink.size_hint() + } +} + +impl<S, Item> FusedStream for Buffer<S, Item> +where + S: Sink<Item> + FusedStream, +{ + fn is_terminated(&self) -> bool { + self.sink.is_terminated() + } +} + +impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { + type Error = Si::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.capacity == 0 { + return self.project().sink.poll_ready(cx); + } + + let _ = self.as_mut().try_empty_buffer(cx)?; + + if self.buf.len() >= self.capacity { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + if self.capacity == 0 { + self.project().sink.start_send(item) + } else { + self.project().buf.push_back(item); + Ok(()) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().try_empty_buffer(cx))?; + debug_assert!(self.buf.is_empty()); + self.project().sink.poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().try_empty_buffer(cx))?; + debug_assert!(self.buf.is_empty()); + self.project().sink.poll_close(cx) + } +} diff --git a/third_party/rust/futures-util/src/sink/close.rs b/third_party/rust/futures-util/src/sink/close.rs new file mode 100644 index 0000000000..43eea74b0f --- /dev/null +++ b/third_party/rust/futures-util/src/sink/close.rs @@ -0,0 +1,32 @@ +use core::marker::PhantomData; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; + +/// Future for the [`close`](super::SinkExt::close) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Close<'a, Si: ?Sized, Item> { + sink: &'a mut Si, + _phantom: PhantomData<fn(Item)>, +} + +impl<Si: Unpin + ?Sized, Item> Unpin for Close<'_, Si, Item> {} + +/// A future that completes when the sink has finished closing. +/// +/// The sink itself is returned after closing is complete. +impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Close<'a, Si, Item> { + pub(super) fn new(sink: &'a mut Si) -> Self { + Self { sink, _phantom: PhantomData } + } +} + +impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Close<'_, Si, Item> { + type Output = Result<(), Si::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.sink).poll_close(cx) + } +} diff --git a/third_party/rust/futures-util/src/sink/drain.rs b/third_party/rust/futures-util/src/sink/drain.rs new file mode 100644 index 0000000000..1a5480c0d6 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/drain.rs @@ -0,0 +1,59 @@ +use super::assert_sink; +use crate::never::Never; +use core::marker::PhantomData; +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; + +/// Sink for the [`drain`] function. +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct Drain<T> { + marker: PhantomData<T>, +} + +/// Create a sink that will just discard all items given to it. +/// +/// Similar to [`io::Sink`](::std::io::Sink). +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::sink::{self, SinkExt}; +/// +/// let mut drain = sink::drain(); +/// drain.send(5).await?; +/// # Ok::<(), futures::never::Never>(()) }).unwrap(); +/// ``` +pub fn drain<T>() -> Drain<T> { + assert_sink::<T, Never, _>(Drain { marker: PhantomData }) +} + +impl<T> Unpin for Drain<T> {} + +impl<T> Clone for Drain<T> { + fn clone(&self) -> Self { + drain() + } +} + +impl<T> Sink<T> for Drain<T> { + type Error = Never; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, _item: T) -> Result<(), Self::Error> { + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-util/src/sink/err_into.rs b/third_party/rust/futures-util/src/sink/err_into.rs new file mode 100644 index 0000000000..a64d1337ba --- /dev/null +++ b/third_party/rust/futures-util/src/sink/err_into.rs @@ -0,0 +1,57 @@ +use crate::sink::{SinkExt, SinkMapErr}; +use futures_core::stream::{FusedStream, Stream}; +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method. + #[derive(Debug)] + #[must_use = "sinks do nothing unless polled"] + pub struct SinkErrInto<Si: Sink<Item>, Item, E> { + #[pin] + sink: SinkMapErr<Si, fn(Si::Error) -> E>, + } +} + +impl<Si, E, Item> SinkErrInto<Si, Item, E> +where + Si: Sink<Item>, + Si::Error: Into<E>, +{ + pub(super) fn new(sink: Si) -> Self { + Self { sink: SinkExt::sink_map_err(sink, Into::into) } + } + + delegate_access_inner!(sink, Si, (.)); +} + +impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E> +where + Si: Sink<Item>, + Si::Error: Into<E>, +{ + type Error = E; + + delegate_sink!(sink, Item); +} + +// Forwarding impl of Stream from the underlying sink +impl<S, Item, E> Stream for SinkErrInto<S, Item, E> +where + S: Sink<Item> + Stream, + S::Error: Into<E>, +{ + type Item = S::Item; + + delegate_stream!(sink); +} + +impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E> +where + S: Sink<Item> + FusedStream, + S::Error: Into<E>, +{ + fn is_terminated(&self) -> bool { + self.sink.is_terminated() + } +} diff --git a/third_party/rust/futures-util/src/sink/fanout.rs b/third_party/rust/futures-util/src/sink/fanout.rs new file mode 100644 index 0000000000..fe2038f27f --- /dev/null +++ b/third_party/rust/futures-util/src/sink/fanout.rs @@ -0,0 +1,111 @@ +use core::fmt::{Debug, Formatter, Result as FmtResult}; +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Sink that clones incoming items and forwards them to two sinks at the same time. + /// + /// Backpressure from any downstream sink propagates up, which means that this sink + /// can only process items as fast as its _slowest_ downstream sink. + #[must_use = "sinks do nothing unless polled"] + pub struct Fanout<Si1, Si2> { + #[pin] + sink1: Si1, + #[pin] + sink2: Si2 + } +} + +impl<Si1, Si2> Fanout<Si1, Si2> { + pub(super) fn new(sink1: Si1, sink2: Si2) -> Self { + Self { sink1, sink2 } + } + + /// Get a shared reference to the inner sinks. + pub fn get_ref(&self) -> (&Si1, &Si2) { + (&self.sink1, &self.sink2) + } + + /// Get a mutable reference to the inner sinks. + pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) { + (&mut self.sink1, &mut self.sink2) + } + + /// Get a pinned mutable reference to the inner sinks. + pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) { + let this = self.project(); + (this.sink1, this.sink2) + } + + /// Consumes this combinator, returning the underlying sinks. + /// + /// Note that this may discard intermediate state of this combinator, + /// so care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> (Si1, Si2) { + (self.sink1, self.sink2) + } +} + +impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + f.debug_struct("Fanout").field("sink1", &self.sink1).field("sink2", &self.sink2).finish() + } +} + +impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2> +where + Si1: Sink<Item>, + Item: Clone, + Si2: Sink<Item, Error = Si1::Error>, +{ + type Error = Si1::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + let this = self.project(); + + let sink1_ready = this.sink1.poll_ready(cx)?.is_ready(); + let sink2_ready = this.sink2.poll_ready(cx)?.is_ready(); + let ready = sink1_ready && sink2_ready; + if ready { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + let this = self.project(); + + this.sink1.start_send(item.clone())?; + this.sink2.start_send(item)?; + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + let this = self.project(); + + let sink1_ready = this.sink1.poll_flush(cx)?.is_ready(); + let sink2_ready = this.sink2.poll_flush(cx)?.is_ready(); + let ready = sink1_ready && sink2_ready; + if ready { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + let this = self.project(); + + let sink1_ready = this.sink1.poll_close(cx)?.is_ready(); + let sink2_ready = this.sink2.poll_close(cx)?.is_ready(); + let ready = sink1_ready && sink2_ready; + if ready { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } +} diff --git a/third_party/rust/futures-util/src/sink/feed.rs b/third_party/rust/futures-util/src/sink/feed.rs new file mode 100644 index 0000000000..6701f7a1b4 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/feed.rs @@ -0,0 +1,43 @@ +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; + +/// Future for the [`feed`](super::SinkExt::feed) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Feed<'a, Si: ?Sized, Item> { + sink: &'a mut Si, + item: Option<Item>, +} + +// Pinning is never projected to children +impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {} + +impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> { + pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { + Feed { sink, item: Some(item) } + } + + pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> { + Pin::new(self.sink) + } + + pub(super) fn is_item_pending(&self) -> bool { + self.item.is_some() + } +} + +impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> { + type Output = Result<(), Si::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.get_mut(); + let mut sink = Pin::new(&mut this.sink); + ready!(sink.as_mut().poll_ready(cx))?; + let item = this.item.take().expect("polled Feed after completion"); + sink.as_mut().start_send(item)?; + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-util/src/sink/flush.rs b/third_party/rust/futures-util/src/sink/flush.rs new file mode 100644 index 0000000000..35a8372de7 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/flush.rs @@ -0,0 +1,36 @@ +use core::marker::PhantomData; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; + +/// Future for the [`flush`](super::SinkExt::flush) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Flush<'a, Si: ?Sized, Item> { + sink: &'a mut Si, + _phantom: PhantomData<fn(Item)>, +} + +// Pin is never projected to a field. +impl<Si: Unpin + ?Sized, Item> Unpin for Flush<'_, Si, Item> {} + +/// A future that completes when the sink has finished processing all +/// pending requests. +/// +/// The sink itself is returned after flushing is complete; this adapter is +/// intended to be used when you want to stop sending to the sink until +/// all current requests are processed. +impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Flush<'a, Si, Item> { + pub(super) fn new(sink: &'a mut Si) -> Self { + Self { sink, _phantom: PhantomData } + } +} + +impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Flush<'_, Si, Item> { + type Output = Result<(), Si::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.sink).poll_flush(cx) + } +} diff --git a/third_party/rust/futures-util/src/sink/map_err.rs b/third_party/rust/futures-util/src/sink/map_err.rs new file mode 100644 index 0000000000..9d2ab7b24b --- /dev/null +++ b/third_party/rust/futures-util/src/sink/map_err.rs @@ -0,0 +1,65 @@ +use core::pin::Pin; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method. + #[derive(Debug, Clone)] + #[must_use = "sinks do nothing unless polled"] + pub struct SinkMapErr<Si, F> { + #[pin] + sink: Si, + f: Option<F>, + } +} + +impl<Si, F> SinkMapErr<Si, F> { + pub(super) fn new(sink: Si, f: F) -> Self { + Self { sink, f: Some(f) } + } + + delegate_access_inner!(sink, Si, ()); + + fn take_f(self: Pin<&mut Self>) -> F { + self.project().f.take().expect("polled MapErr after completion") + } +} + +impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F> +where + Si: Sink<Item>, + F: FnOnce(Si::Error) -> E, +{ + type Error = E; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e)) + } + + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e)) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e)) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e)) + } +} + +// Forwarding impl of Stream from the underlying sink +impl<S: Stream, F> Stream for SinkMapErr<S, F> { + type Item = S::Item; + + delegate_stream!(sink); +} + +impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> { + fn is_terminated(&self) -> bool { + self.sink.is_terminated() + } +} diff --git a/third_party/rust/futures-util/src/sink/mod.rs b/third_party/rust/futures-util/src/sink/mod.rs new file mode 100644 index 0000000000..147e9adc93 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/mod.rs @@ -0,0 +1,344 @@ +//! Asynchronous sinks. +//! +//! This module contains: +//! +//! - The [`Sink`] trait, which allows you to asynchronously write data. +//! - The [`SinkExt`] trait, which provides adapters for chaining and composing +//! sinks. + +use crate::future::{assert_future, Either}; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::stream::{Stream, TryStream}; +use futures_core::task::{Context, Poll}; + +#[cfg(feature = "compat")] +use crate::compat::CompatSink; + +pub use futures_sink::Sink; + +mod close; +pub use self::close::Close; + +mod drain; +pub use self::drain::{drain, Drain}; + +mod fanout; +pub use self::fanout::Fanout; + +mod feed; +pub use self::feed::Feed; + +mod flush; +pub use self::flush::Flush; + +mod err_into; +pub use self::err_into::SinkErrInto; + +mod map_err; +pub use self::map_err::SinkMapErr; + +mod send; +pub use self::send::Send; + +mod send_all; +pub use self::send_all::SendAll; + +mod unfold; +pub use self::unfold::{unfold, Unfold}; + +mod with; +pub use self::with::With; + +mod with_flat_map; +pub use self::with_flat_map::WithFlatMap; + +#[cfg(feature = "alloc")] +mod buffer; +#[cfg(feature = "alloc")] +pub use self::buffer::Buffer; + +impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {} + +/// An extension trait for `Sink`s that provides a variety of convenient +/// combinator functions. +pub trait SinkExt<Item>: Sink<Item> { + /// Composes a function *in front of* the sink. + /// + /// This adapter produces a new sink that passes each value through the + /// given function `f` before sending it to `self`. + /// + /// To process each value, `f` produces a *future*, which is then polled to + /// completion before passing its result down to the underlying sink. If the + /// future produces an error, that error is returned by the new sink. + /// + /// Note that this function consumes the given sink, returning a wrapped + /// version, much like `Iterator::map`. + fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F> + where + F: FnMut(U) -> Fut, + Fut: Future<Output = Result<Item, E>>, + E: From<Self::Error>, + Self: Sized, + { + assert_sink::<U, E, _>(With::new(self, f)) + } + + /// Composes a function *in front of* the sink. + /// + /// This adapter produces a new sink that passes each value through the + /// given function `f` before sending it to `self`. + /// + /// To process each value, `f` produces a *stream*, of which each value + /// is passed to the underlying sink. A new value will not be accepted until + /// the stream has been drained + /// + /// Note that this function consumes the given sink, returning a wrapped + /// version, much like `Iterator::flat_map`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::sink::SinkExt; + /// use futures::stream::{self, StreamExt}; + /// + /// let (tx, rx) = mpsc::channel(5); + /// + /// let mut tx = tx.with_flat_map(|x| { + /// stream::iter(vec![Ok(42); x]) + /// }); + /// + /// tx.send(5).await.unwrap(); + /// drop(tx); + /// let received: Vec<i32> = rx.collect().await; + /// assert_eq!(received, vec![42, 42, 42, 42, 42]); + /// # }); + /// ``` + fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F> + where + F: FnMut(U) -> St, + St: Stream<Item = Result<Item, Self::Error>>, + Self: Sized, + { + assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f)) + } + + /* + fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F> + where F: FnMut(U) -> Self::SinkItem, + Self: Sized; + + fn with_filter<F>(self, f: F) -> WithFilter<Self, F> + where F: FnMut(Self::SinkItem) -> bool, + Self: Sized; + + fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F> + where F: FnMut(U) -> Option<Self::SinkItem>, + Self: Sized; + */ + + /// Transforms the error returned by the sink. + fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F> + where + F: FnOnce(Self::Error) -> E, + Self: Sized, + { + assert_sink::<Item, E, _>(SinkMapErr::new(self, f)) + } + + /// Map this sink's error to a different error type using the `Into` trait. + /// + /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`. + fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E> + where + Self: Sized, + Self::Error: Into<E>, + { + assert_sink::<Item, E, _>(SinkErrInto::new(self)) + } + + /// Adds a fixed-size buffer to the current sink. + /// + /// The resulting sink will buffer up to `capacity` items when the + /// underlying sink is unwilling to accept additional items. Calling `flush` + /// on the buffered sink will attempt to both empty the buffer and complete + /// processing on the underlying sink. + /// + /// Note that this function consumes the given sink, returning a wrapped + /// version, much like `Iterator::map`. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "alloc")] + fn buffer(self, capacity: usize) -> Buffer<Self, Item> + where + Self: Sized, + { + assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity)) + } + + /// Close the sink. + fn close(&mut self) -> Close<'_, Self, Item> + where + Self: Unpin, + { + assert_future::<Result<(), Self::Error>, _>(Close::new(self)) + } + + /// Fanout items to multiple sinks. + /// + /// This adapter clones each incoming item and forwards it to both this as well as + /// the other sink at the same time. + fn fanout<Si>(self, other: Si) -> Fanout<Self, Si> + where + Self: Sized, + Item: Clone, + Si: Sink<Item, Error = Self::Error>, + { + assert_sink::<Item, Self::Error, _>(Fanout::new(self, other)) + } + + /// Flush the sink, processing all pending items. + /// + /// This adapter is intended to be used when you want to stop sending to the sink + /// until all current requests are processed. + fn flush(&mut self) -> Flush<'_, Self, Item> + where + Self: Unpin, + { + assert_future::<Result<(), Self::Error>, _>(Flush::new(self)) + } + + /// A future that completes after the given item has been fully processed + /// into the sink, including flushing. + /// + /// Note that, **because of the flushing requirement, it is usually better + /// to batch together items to send via `feed` or `send_all`, + /// rather than flushing between each item.** + fn send(&mut self, item: Item) -> Send<'_, Self, Item> + where + Self: Unpin, + { + assert_future::<Result<(), Self::Error>, _>(Send::new(self, item)) + } + + /// A future that completes after the given item has been received + /// by the sink. + /// + /// Unlike `send`, the returned future does not flush the sink. + /// It is the caller's responsibility to ensure all pending items + /// are processed, which can be done via `flush` or `close`. + fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> + where + Self: Unpin, + { + assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item)) + } + + /// A future that completes after the given stream has been fully processed + /// into the sink, including flushing. + /// + /// This future will drive the stream to keep producing items until it is + /// exhausted, sending each item to the sink. It will complete once both the + /// stream is exhausted, the sink has received all items, and the sink has + /// been flushed. Note that the sink is **not** closed. If the stream produces + /// an error, that error will be returned by this future without flushing the sink. + /// + /// Doing `sink.send_all(stream)` is roughly equivalent to + /// `stream.forward(sink)`. The returned future will exhaust all items from + /// `stream` and send them to `self`. + fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> + where + St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, + // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized, + Self: Unpin, + { + // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>` + // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream)) + SendAll::new(self, stream) + } + + /// Wrap this sink in an `Either` sink, making it the left-hand variant + /// of that `Either`. + /// + /// This can be used in combination with the `right_sink` method to write `if` + /// statements that evaluate to different streams in different branches. + fn left_sink<Si2>(self) -> Either<Self, Si2> + where + Si2: Sink<Item, Error = Self::Error>, + Self: Sized, + { + assert_sink::<Item, Self::Error, _>(Either::Left(self)) + } + + /// Wrap this stream in an `Either` stream, making it the right-hand variant + /// of that `Either`. + /// + /// This can be used in combination with the `left_sink` method to write `if` + /// statements that evaluate to different streams in different branches. + fn right_sink<Si1>(self) -> Either<Si1, Self> + where + Si1: Sink<Item, Error = Self::Error>, + Self: Sized, + { + assert_sink::<Item, Self::Error, _>(Either::Right(self)) + } + + /// Wraps a [`Sink`] into a sink compatible with libraries using + /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled. + #[cfg(feature = "compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] + fn compat(self) -> CompatSink<Self, Item> + where + Self: Sized + Unpin, + { + CompatSink::new(self) + } + + /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`] + /// sink types. + fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> + where + Self: Unpin, + { + Pin::new(self).poll_ready(cx) + } + + /// A convenience method for calling [`Sink::start_send`] on [`Unpin`] + /// sink types. + fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> + where + Self: Unpin, + { + Pin::new(self).start_send(item) + } + + /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`] + /// sink types. + fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> + where + Self: Unpin, + { + Pin::new(self).poll_flush(cx) + } + + /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`] + /// sink types. + fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> + where + Self: Unpin, + { + Pin::new(self).poll_close(cx) + } +} + +// Just a helper function to ensure the sinks we're returning all have the +// right implementations. +pub(crate) fn assert_sink<T, E, S>(sink: S) -> S +where + S: Sink<T, Error = E>, +{ + sink +} diff --git a/third_party/rust/futures-util/src/sink/send.rs b/third_party/rust/futures-util/src/sink/send.rs new file mode 100644 index 0000000000..6d21f33fe4 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/send.rs @@ -0,0 +1,41 @@ +use super::Feed; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; + +/// Future for the [`send`](super::SinkExt::send) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Send<'a, Si: ?Sized, Item> { + feed: Feed<'a, Si, Item>, +} + +// Pinning is never projected to children +impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {} + +impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> { + pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { + Self { feed: Feed::new(sink, item) } + } +} + +impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> { + type Output = Result<(), Si::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + + if this.feed.is_item_pending() { + ready!(Pin::new(&mut this.feed).poll(cx))?; + debug_assert!(!this.feed.is_item_pending()); + } + + // we're done sending the item, but want to block on flushing the + // sink + ready!(this.feed.sink_pin_mut().poll_flush(cx))?; + + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-util/src/sink/send_all.rs b/third_party/rust/futures-util/src/sink/send_all.rs new file mode 100644 index 0000000000..1302dd2148 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/send_all.rs @@ -0,0 +1,100 @@ +use crate::stream::{Fuse, StreamExt, TryStreamExt}; +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::stream::{Stream, TryStream}; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; + +/// Future for the [`send_all`](super::SinkExt::send_all) method. +#[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993 +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SendAll<'a, Si, St> +where + Si: ?Sized, + St: ?Sized + TryStream, +{ + sink: &'a mut Si, + stream: Fuse<&'a mut St>, + buffered: Option<St::Ok>, +} + +impl<Si, St> fmt::Debug for SendAll<'_, Si, St> +where + Si: fmt::Debug + ?Sized, + St: fmt::Debug + ?Sized + TryStream, + St::Ok: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendAll") + .field("sink", &self.sink) + .field("stream", &self.stream) + .field("buffered", &self.buffered) + .finish() + } +} + +// Pinning is never projected to any fields +impl<Si, St> Unpin for SendAll<'_, Si, St> +where + Si: Unpin + ?Sized, + St: TryStream + Unpin + ?Sized, +{ +} + +impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St> +where + Si: Sink<Ok, Error = Error> + Unpin + ?Sized, + St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized, +{ + pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> Self { + Self { sink, stream: stream.fuse(), buffered: None } + } + + fn try_start_send( + &mut self, + cx: &mut Context<'_>, + item: St::Ok, + ) -> Poll<Result<(), Si::Error>> { + debug_assert!(self.buffered.is_none()); + match Pin::new(&mut self.sink).poll_ready(cx)? { + Poll::Ready(()) => Poll::Ready(Pin::new(&mut self.sink).start_send(item)), + Poll::Pending => { + self.buffered = Some(item); + Poll::Pending + } + } + } +} + +impl<Si, St, Ok, Error> Future for SendAll<'_, Si, St> +where + Si: Sink<Ok, Error = Error> + Unpin + ?Sized, + St: Stream<Item = Result<Ok, Error>> + Unpin + ?Sized, +{ + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + // If we've got an item buffered already, we need to write it to the + // sink before we can do anything else + if let Some(item) = this.buffered.take() { + ready!(this.try_start_send(cx, item))? + } + + loop { + match this.stream.try_poll_next_unpin(cx)? { + Poll::Ready(Some(item)) => ready!(this.try_start_send(cx, item))?, + Poll::Ready(None) => { + ready!(Pin::new(&mut this.sink).poll_flush(cx))?; + return Poll::Ready(Ok(())); + } + Poll::Pending => { + ready!(Pin::new(&mut this.sink).poll_flush(cx))?; + return Poll::Pending; + } + } + } + } +} diff --git a/third_party/rust/futures-util/src/sink/unfold.rs b/third_party/rust/futures-util/src/sink/unfold.rs new file mode 100644 index 0000000000..330a068c31 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/unfold.rs @@ -0,0 +1,86 @@ +use super::assert_sink; +use crate::unfold_state::UnfoldState; +use core::{future::Future, pin::Pin}; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Sink for the [`unfold`] function. + #[derive(Debug)] + #[must_use = "sinks do nothing unless polled"] + pub struct Unfold<T, F, R> { + function: F, + #[pin] + state: UnfoldState<T, R>, + } +} + +/// Create a sink from a function which processes one item at a time. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::sink::{self, SinkExt}; +/// +/// let unfold = sink::unfold(0, |mut sum, i: i32| { +/// async move { +/// sum += i; +/// eprintln!("{}", i); +/// Ok::<_, futures::never::Never>(sum) +/// } +/// }); +/// futures::pin_mut!(unfold); +/// unfold.send(5).await?; +/// # Ok::<(), futures::never::Never>(()) }).unwrap(); +/// ``` +pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R> +where + F: FnMut(T, Item) -> R, + R: Future<Output = Result<T, E>>, +{ + assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } }) +} + +impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R> +where + F: FnMut(T, Item) -> R, + R: Future<Output = Result<T, E>>, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + let mut this = self.project(); + let future = match this.state.as_mut().take_value() { + Some(value) => (this.function)(value, item), + None => panic!("start_send called without poll_ready being called first"), + }; + this.state.set(UnfoldState::Future { future }); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + let mut this = self.project(); + Poll::Ready(if let Some(future) = this.state.as_mut().project_future() { + match ready!(future.poll(cx)) { + Ok(state) => { + this.state.set(UnfoldState::Value { value: state }); + Ok(()) + } + Err(err) => Err(err), + } + } else { + Ok(()) + }) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) + } +} diff --git a/third_party/rust/futures-util/src/sink/with.rs b/third_party/rust/futures-util/src/sink/with.rs new file mode 100644 index 0000000000..86d3dcc7b8 --- /dev/null +++ b/third_party/rust/futures-util/src/sink/with.rs @@ -0,0 +1,134 @@ +use core::fmt; +use core::marker::PhantomData; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Sink for the [`with`](super::SinkExt::with) method. + #[must_use = "sinks do nothing unless polled"] + pub struct With<Si, Item, U, Fut, F> { + #[pin] + sink: Si, + f: F, + #[pin] + state: Option<Fut>, + _phantom: PhantomData<fn(U) -> Item>, + } +} + +impl<Si, Item, U, Fut, F> fmt::Debug for With<Si, Item, U, Fut, F> +where + Si: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("With").field("sink", &self.sink).field("state", &self.state).finish() + } +} + +impl<Si, Item, U, Fut, F> With<Si, Item, U, Fut, F> +where + Si: Sink<Item>, + F: FnMut(U) -> Fut, + Fut: Future, +{ + pub(super) fn new<E>(sink: Si, f: F) -> Self + where + Fut: Future<Output = Result<Item, E>>, + E: From<Si::Error>, + { + Self { state: None, sink, f, _phantom: PhantomData } + } +} + +impl<Si, Item, U, Fut, F> Clone for With<Si, Item, U, Fut, F> +where + Si: Clone, + F: Clone, + Fut: Clone, +{ + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + sink: self.sink.clone(), + f: self.f.clone(), + _phantom: PhantomData, + } + } +} + +// Forwarding impl of Stream from the underlying sink +impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F> +where + S: Stream + Sink<Item>, + F: FnMut(U) -> Fut, + Fut: Future, +{ + type Item = S::Item; + + delegate_stream!(sink); +} + +impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F> +where + Si: Sink<Item>, + F: FnMut(U) -> Fut, + Fut: Future<Output = Result<Item, E>>, + E: From<Si::Error>, +{ + delegate_access_inner!(sink, Si, ()); + + /// Completes the processing of previous item if any. + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), E>> { + let mut this = self.project(); + + let item = match this.state.as_mut().as_pin_mut() { + None => return Poll::Ready(Ok(())), + Some(fut) => ready!(fut.poll(cx))?, + }; + this.state.set(None); + this.sink.start_send(item)?; + Poll::Ready(Ok(())) + } +} + +impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F> +where + Si: Sink<Item>, + F: FnMut(U) -> Fut, + Fut: Future<Output = Result<Item, E>>, + E: From<Si::Error>, +{ + type Error = E; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().poll(cx))?; + ready!(self.project().sink.poll_ready(cx)?); + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { + let mut this = self.project(); + + assert!(this.state.is_none()); + this.state.set(Some((this.f)(item))); + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().poll(cx))?; + ready!(self.project().sink.poll_flush(cx)?); + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().poll(cx))?; + ready!(self.project().sink.poll_close(cx)?); + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-util/src/sink/with_flat_map.rs b/third_party/rust/futures-util/src/sink/with_flat_map.rs new file mode 100644 index 0000000000..2ae877a24b --- /dev/null +++ b/third_party/rust/futures-util/src/sink/with_flat_map.rs @@ -0,0 +1,127 @@ +use core::fmt; +use core::marker::PhantomData; +use core::pin::Pin; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method. + #[must_use = "sinks do nothing unless polled"] + pub struct WithFlatMap<Si, Item, U, St, F> { + #[pin] + sink: Si, + f: F, + #[pin] + stream: Option<St>, + buffer: Option<Item>, + _marker: PhantomData<fn(U)>, + } +} + +impl<Si, Item, U, St, F> fmt::Debug for WithFlatMap<Si, Item, U, St, F> +where + Si: fmt::Debug, + St: fmt::Debug, + Item: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WithFlatMap") + .field("sink", &self.sink) + .field("stream", &self.stream) + .field("buffer", &self.buffer) + .finish() + } +} + +impl<Si, Item, U, St, F> WithFlatMap<Si, Item, U, St, F> +where + Si: Sink<Item>, + F: FnMut(U) -> St, + St: Stream<Item = Result<Item, Si::Error>>, +{ + pub(super) fn new(sink: Si, f: F) -> Self { + Self { sink, f, stream: None, buffer: None, _marker: PhantomData } + } + + delegate_access_inner!(sink, Si, ()); + + fn try_empty_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { + let mut this = self.project(); + + if this.buffer.is_some() { + ready!(this.sink.as_mut().poll_ready(cx))?; + let item = this.buffer.take().unwrap(); + this.sink.as_mut().start_send(item)?; + } + if let Some(mut some_stream) = this.stream.as_mut().as_pin_mut() { + while let Some(item) = ready!(some_stream.as_mut().poll_next(cx)?) { + match this.sink.as_mut().poll_ready(cx)? { + Poll::Ready(()) => this.sink.as_mut().start_send(item)?, + Poll::Pending => { + *this.buffer = Some(item); + return Poll::Pending; + } + }; + } + } + this.stream.set(None); + Poll::Ready(Ok(())) + } +} + +// Forwarding impl of Stream from the underlying sink +impl<S, Item, U, St, F> Stream for WithFlatMap<S, Item, U, St, F> +where + S: Stream + Sink<Item>, + F: FnMut(U) -> St, + St: Stream<Item = Result<Item, S::Error>>, +{ + type Item = S::Item; + + delegate_stream!(sink); +} + +impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F> +where + S: FusedStream + Sink<Item>, + F: FnMut(U) -> St, + St: Stream<Item = Result<Item, S::Error>>, +{ + fn is_terminated(&self) -> bool { + self.sink.is_terminated() + } +} + +impl<Si, Item, U, St, F> Sink<U> for WithFlatMap<Si, Item, U, St, F> +where + Si: Sink<Item>, + F: FnMut(U) -> St, + St: Stream<Item = Result<Item, Si::Error>>, +{ + type Error = Si::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.try_empty_stream(cx) + } + + fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { + let mut this = self.project(); + + assert!(this.stream.is_none()); + this.stream.set(Some((this.f)(item))); + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().try_empty_stream(cx)?); + self.project().sink.poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().try_empty_stream(cx)?); + self.project().sink.poll_close(cx) + } +} |