summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-stream/src/wrappers
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/tokio-stream/src/wrappers.rs62
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/broadcast.rs79
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/interval.rs50
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/lines.rs60
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs65
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs59
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/read_dir.rs47
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/signal_unix.rs46
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/signal_windows.rs88
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/split.rs60
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs54
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/unix_listener.rs54
-rw-r--r--third_party/rust/tokio-stream/src/wrappers/watch.rs132
13 files changed, 856 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/wrappers.rs b/third_party/rust/tokio-stream/src/wrappers.rs
new file mode 100644
index 0000000000..62cabe4f7d
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers.rs
@@ -0,0 +1,62 @@
+//! Wrappers for Tokio types that implement `Stream`.
+
+/// Error types for the wrappers.
+pub mod errors {
+ cfg_sync! {
+ pub use crate::wrappers::broadcast::BroadcastStreamRecvError;
+ }
+}
+
+mod mpsc_bounded;
+pub use mpsc_bounded::ReceiverStream;
+
+mod mpsc_unbounded;
+pub use mpsc_unbounded::UnboundedReceiverStream;
+
+cfg_sync! {
+ mod broadcast;
+ pub use broadcast::BroadcastStream;
+
+ mod watch;
+ pub use watch::WatchStream;
+}
+
+cfg_signal! {
+ #[cfg(unix)]
+ mod signal_unix;
+ #[cfg(unix)]
+ pub use signal_unix::SignalStream;
+
+ #[cfg(any(windows, docsrs))]
+ mod signal_windows;
+ #[cfg(any(windows, docsrs))]
+ pub use signal_windows::{CtrlCStream, CtrlBreakStream};
+}
+
+cfg_time! {
+ mod interval;
+ pub use interval::IntervalStream;
+}
+
+cfg_net! {
+ mod tcp_listener;
+ pub use tcp_listener::TcpListenerStream;
+
+ #[cfg(unix)]
+ mod unix_listener;
+ #[cfg(unix)]
+ pub use unix_listener::UnixListenerStream;
+}
+
+cfg_io_util! {
+ mod split;
+ pub use split::SplitStream;
+
+ mod lines;
+ pub use lines::LinesStream;
+}
+
+cfg_fs! {
+ mod read_dir;
+ pub use read_dir::ReadDirStream;
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/broadcast.rs b/third_party/rust/tokio-stream/src/wrappers/broadcast.rs
new file mode 100644
index 0000000000..711066466a
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/broadcast.rs
@@ -0,0 +1,79 @@
+use std::pin::Pin;
+use tokio::sync::broadcast::error::RecvError;
+use tokio::sync::broadcast::Receiver;
+
+use futures_core::Stream;
+use tokio_util::sync::ReusableBoxFuture;
+
+use std::fmt;
+use std::task::{Context, Poll};
+
+/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+pub struct BroadcastStream<T> {
+ inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>,
+}
+
+/// An error returned from the inner stream of a [`BroadcastStream`].
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum BroadcastStreamRecvError {
+ /// The receiver lagged too far behind. Attempting to receive again will
+ /// return the oldest message still retained by the channel.
+ ///
+ /// Includes the number of skipped messages.
+ Lagged(u64),
+}
+
+impl fmt::Display for BroadcastStreamRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ BroadcastStreamRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
+ }
+ }
+}
+
+impl std::error::Error for BroadcastStreamRecvError {}
+
+async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
+ let result = rx.recv().await;
+ (result, rx)
+}
+
+impl<T: 'static + Clone + Send> BroadcastStream<T> {
+ /// Create a new `BroadcastStream`.
+ pub fn new(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
+}
+
+impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
+ type Item = Result<T, BroadcastStreamRecvError>;
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, rx) = ready!(self.inner.poll(cx));
+ self.inner.set(make_future(rx));
+ match result {
+ Ok(item) => Poll::Ready(Some(Ok(item))),
+ Err(RecvError::Closed) => Poll::Ready(None),
+ Err(RecvError::Lagged(n)) => {
+ Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n))))
+ }
+ }
+ }
+}
+
+impl<T> fmt::Debug for BroadcastStream<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BroadcastStream").finish()
+ }
+}
+
+impl<T: 'static + Clone + Send> From<Receiver<T>> for BroadcastStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/interval.rs b/third_party/rust/tokio-stream/src/wrappers/interval.rs
new file mode 100644
index 0000000000..2bf0194bd0
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/interval.rs
@@ -0,0 +1,50 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::time::{Instant, Interval};
+
+/// A wrapper around [`Interval`] that implements [`Stream`].
+///
+/// [`Interval`]: struct@tokio::time::Interval
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+pub struct IntervalStream {
+ inner: Interval,
+}
+
+impl IntervalStream {
+ /// Create a new `IntervalStream`.
+ pub fn new(interval: Interval) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `Interval`.
+ pub fn into_inner(self) -> Interval {
+ self.inner
+ }
+}
+
+impl Stream for IntervalStream {
+ type Item = Instant;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
+ self.inner.poll_tick(cx).map(Some)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (std::usize::MAX, None)
+ }
+}
+
+impl AsRef<Interval> for IntervalStream {
+ fn as_ref(&self) -> &Interval {
+ &self.inner
+ }
+}
+
+impl AsMut<Interval> for IntervalStream {
+ fn as_mut(&mut self) -> &mut Interval {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/lines.rs b/third_party/rust/tokio-stream/src/wrappers/lines.rs
new file mode 100644
index 0000000000..ad3c25349f
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/lines.rs
@@ -0,0 +1,60 @@
+use crate::Stream;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, Lines};
+
+pin_project! {
+ /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`].
+ ///
+ /// [`tokio::io::Lines`]: struct@tokio::io::Lines
+ /// [`Stream`]: trait@crate::Stream
+ #[derive(Debug)]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
+ pub struct LinesStream<R> {
+ #[pin]
+ inner: Lines<R>,
+ }
+}
+
+impl<R> LinesStream<R> {
+ /// Create a new `LinesStream`.
+ pub fn new(lines: Lines<R>) -> Self {
+ Self { inner: lines }
+ }
+
+ /// Get back the inner `Lines`.
+ pub fn into_inner(self) -> Lines<R> {
+ self.inner
+ }
+
+ /// Obtain a pinned reference to the inner `Lines<R>`.
+ #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546
+ pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>> {
+ self.project().inner
+ }
+}
+
+impl<R: AsyncBufRead> Stream for LinesStream<R> {
+ type Item = io::Result<String>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project()
+ .inner
+ .poll_next_line(cx)
+ .map(Result::transpose)
+ }
+}
+
+impl<R> AsRef<Lines<R>> for LinesStream<R> {
+ fn as_ref(&self) -> &Lines<R> {
+ &self.inner
+ }
+}
+
+impl<R> AsMut<Lines<R>> for LinesStream<R> {
+ fn as_mut(&mut self) -> &mut Lines<R> {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs b/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs
new file mode 100644
index 0000000000..b5362680ee
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/mpsc_bounded.rs
@@ -0,0 +1,65 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::Receiver;
+
+/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+pub struct ReceiverStream<T> {
+ inner: Receiver<T>,
+}
+
+impl<T> ReceiverStream<T> {
+ /// Create a new `ReceiverStream`.
+ pub fn new(recv: Receiver<T>) -> Self {
+ Self { inner: recv }
+ }
+
+ /// Get back the inner `Receiver`.
+ pub fn into_inner(self) -> Receiver<T> {
+ self.inner
+ }
+
+ /// Closes the receiving half of a channel without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered. Any
+ /// outstanding [`Permit`] values will still be able to send messages.
+ ///
+ /// To guarantee no messages are dropped, after calling `close()`, you must
+ /// receive all items from the stream until `None` is returned.
+ ///
+ /// [`Permit`]: struct@tokio::sync::mpsc::Permit
+ pub fn close(&mut self) {
+ self.inner.close()
+ }
+}
+
+impl<T> Stream for ReceiverStream<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
+ fn as_ref(&self) -> &Receiver<T> {
+ &self.inner
+ }
+}
+
+impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
+ fn as_mut(&mut self) -> &mut Receiver<T> {
+ &mut self.inner
+ }
+}
+
+impl<T> From<Receiver<T>> for ReceiverStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs b/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs
new file mode 100644
index 0000000000..54597b7f6f
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/mpsc_unbounded.rs
@@ -0,0 +1,59 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::UnboundedReceiver;
+
+/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+pub struct UnboundedReceiverStream<T> {
+ inner: UnboundedReceiver<T>,
+}
+
+impl<T> UnboundedReceiverStream<T> {
+ /// Create a new `UnboundedReceiverStream`.
+ pub fn new(recv: UnboundedReceiver<T>) -> Self {
+ Self { inner: recv }
+ }
+
+ /// Get back the inner `UnboundedReceiver`.
+ pub fn into_inner(self) -> UnboundedReceiver<T> {
+ self.inner
+ }
+
+ /// Closes the receiving half of a channel without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.inner.close()
+ }
+}
+
+impl<T> Stream for UnboundedReceiverStream<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+ fn as_ref(&self) -> &UnboundedReceiver<T> {
+ &self.inner
+ }
+}
+
+impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+ fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
+ &mut self.inner
+ }
+}
+
+impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+ fn from(recv: UnboundedReceiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/read_dir.rs b/third_party/rust/tokio-stream/src/wrappers/read_dir.rs
new file mode 100644
index 0000000000..b5cf54f79e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/read_dir.rs
@@ -0,0 +1,47 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::fs::{DirEntry, ReadDir};
+
+/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`].
+///
+/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
+pub struct ReadDirStream {
+ inner: ReadDir,
+}
+
+impl ReadDirStream {
+ /// Create a new `ReadDirStream`.
+ pub fn new(read_dir: ReadDir) -> Self {
+ Self { inner: read_dir }
+ }
+
+ /// Get back the inner `ReadDir`.
+ pub fn into_inner(self) -> ReadDir {
+ self.inner
+ }
+}
+
+impl Stream for ReadDirStream {
+ type Item = io::Result<DirEntry>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.inner.poll_next_entry(cx).map(Result::transpose)
+ }
+}
+
+impl AsRef<ReadDir> for ReadDirStream {
+ fn as_ref(&self) -> &ReadDir {
+ &self.inner
+ }
+}
+
+impl AsMut<ReadDir> for ReadDirStream {
+ fn as_mut(&mut self) -> &mut ReadDir {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs b/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs
new file mode 100644
index 0000000000..2f74e7d152
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/signal_unix.rs
@@ -0,0 +1,46 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::signal::unix::Signal;
+
+/// A wrapper around [`Signal`] that implements [`Stream`].
+///
+/// [`Signal`]: struct@tokio::signal::unix::Signal
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "signal"))))]
+pub struct SignalStream {
+ inner: Signal,
+}
+
+impl SignalStream {
+ /// Create a new `SignalStream`.
+ pub fn new(interval: Signal) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `Signal`.
+ pub fn into_inner(self) -> Signal {
+ self.inner
+ }
+}
+
+impl Stream for SignalStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<Signal> for SignalStream {
+ fn as_ref(&self) -> &Signal {
+ &self.inner
+ }
+}
+
+impl AsMut<Signal> for SignalStream {
+ fn as_mut(&mut self) -> &mut Signal {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs b/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs
new file mode 100644
index 0000000000..4631fbad8d
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/signal_windows.rs
@@ -0,0 +1,88 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::signal::windows::{CtrlBreak, CtrlC};
+
+/// A wrapper around [`CtrlC`] that implements [`Stream`].
+///
+/// [`CtrlC`]: struct@tokio::signal::windows::CtrlC
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))]
+pub struct CtrlCStream {
+ inner: CtrlC,
+}
+
+impl CtrlCStream {
+ /// Create a new `CtrlCStream`.
+ pub fn new(interval: CtrlC) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `CtrlC`.
+ pub fn into_inner(self) -> CtrlC {
+ self.inner
+ }
+}
+
+impl Stream for CtrlCStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<CtrlC> for CtrlCStream {
+ fn as_ref(&self) -> &CtrlC {
+ &self.inner
+ }
+}
+
+impl AsMut<CtrlC> for CtrlCStream {
+ fn as_mut(&mut self) -> &mut CtrlC {
+ &mut self.inner
+ }
+}
+
+/// A wrapper around [`CtrlBreak`] that implements [`Stream`].
+///
+/// [`CtrlBreak`]: struct@tokio::signal::windows::CtrlBreak
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))]
+pub struct CtrlBreakStream {
+ inner: CtrlBreak,
+}
+
+impl CtrlBreakStream {
+ /// Create a new `CtrlBreakStream`.
+ pub fn new(interval: CtrlBreak) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `CtrlBreak`.
+ pub fn into_inner(self) -> CtrlBreak {
+ self.inner
+ }
+}
+
+impl Stream for CtrlBreakStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<CtrlBreak> for CtrlBreakStream {
+ fn as_ref(&self) -> &CtrlBreak {
+ &self.inner
+ }
+}
+
+impl AsMut<CtrlBreak> for CtrlBreakStream {
+ fn as_mut(&mut self) -> &mut CtrlBreak {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/split.rs b/third_party/rust/tokio-stream/src/wrappers/split.rs
new file mode 100644
index 0000000000..5a6bb2d408
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/split.rs
@@ -0,0 +1,60 @@
+use crate::Stream;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, Split};
+
+pin_project! {
+ /// A wrapper around [`tokio::io::Split`] that implements [`Stream`].
+ ///
+ /// [`tokio::io::Split`]: struct@tokio::io::Split
+ /// [`Stream`]: trait@crate::Stream
+ #[derive(Debug)]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
+ pub struct SplitStream<R> {
+ #[pin]
+ inner: Split<R>,
+ }
+}
+
+impl<R> SplitStream<R> {
+ /// Create a new `SplitStream`.
+ pub fn new(split: Split<R>) -> Self {
+ Self { inner: split }
+ }
+
+ /// Get back the inner `Split`.
+ pub fn into_inner(self) -> Split<R> {
+ self.inner
+ }
+
+ /// Obtain a pinned reference to the inner `Split<R>`.
+ #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546
+ pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Split<R>> {
+ self.project().inner
+ }
+}
+
+impl<R: AsyncBufRead> Stream for SplitStream<R> {
+ type Item = io::Result<Vec<u8>>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project()
+ .inner
+ .poll_next_segment(cx)
+ .map(Result::transpose)
+ }
+}
+
+impl<R> AsRef<Split<R>> for SplitStream<R> {
+ fn as_ref(&self) -> &Split<R> {
+ &self.inner
+ }
+}
+
+impl<R> AsMut<Split<R>> for SplitStream<R> {
+ fn as_mut(&mut self) -> &mut Split<R> {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs b/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs
new file mode 100644
index 0000000000..ce7cb16350
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/tcp_listener.rs
@@ -0,0 +1,54 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::net::{TcpListener, TcpStream};
+
+/// A wrapper around [`TcpListener`] that implements [`Stream`].
+///
+/// [`TcpListener`]: struct@tokio::net::TcpListener
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
+pub struct TcpListenerStream {
+ inner: TcpListener,
+}
+
+impl TcpListenerStream {
+ /// Create a new `TcpListenerStream`.
+ pub fn new(listener: TcpListener) -> Self {
+ Self { inner: listener }
+ }
+
+ /// Get back the inner `TcpListener`.
+ pub fn into_inner(self) -> TcpListener {
+ self.inner
+ }
+}
+
+impl Stream for TcpListenerStream {
+ type Item = io::Result<TcpStream>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<io::Result<TcpStream>>> {
+ match self.inner.poll_accept(cx) {
+ Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
+ Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
+impl AsRef<TcpListener> for TcpListenerStream {
+ fn as_ref(&self) -> &TcpListener {
+ &self.inner
+ }
+}
+
+impl AsMut<TcpListener> for TcpListenerStream {
+ fn as_mut(&mut self) -> &mut TcpListener {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs b/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs
new file mode 100644
index 0000000000..0beba588c2
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/unix_listener.rs
@@ -0,0 +1,54 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::net::{UnixListener, UnixStream};
+
+/// A wrapper around [`UnixListener`] that implements [`Stream`].
+///
+/// [`UnixListener`]: struct@tokio::net::UnixListener
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))]
+pub struct UnixListenerStream {
+ inner: UnixListener,
+}
+
+impl UnixListenerStream {
+ /// Create a new `UnixListenerStream`.
+ pub fn new(listener: UnixListener) -> Self {
+ Self { inner: listener }
+ }
+
+ /// Get back the inner `UnixListener`.
+ pub fn into_inner(self) -> UnixListener {
+ self.inner
+ }
+}
+
+impl Stream for UnixListenerStream {
+ type Item = io::Result<UnixStream>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<io::Result<UnixStream>>> {
+ match self.inner.poll_accept(cx) {
+ Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
+ Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
+impl AsRef<UnixListener> for UnixListenerStream {
+ fn as_ref(&self) -> &UnixListener {
+ &self.inner
+ }
+}
+
+impl AsMut<UnixListener> for UnixListenerStream {
+ fn as_mut(&mut self) -> &mut UnixListener {
+ &mut self.inner
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/wrappers/watch.rs b/third_party/rust/tokio-stream/src/wrappers/watch.rs
new file mode 100644
index 0000000000..ec8ead06da
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/wrappers/watch.rs
@@ -0,0 +1,132 @@
+use std::pin::Pin;
+use tokio::sync::watch::Receiver;
+
+use futures_core::Stream;
+use tokio_util::sync::ReusableBoxFuture;
+
+use std::fmt;
+use std::task::{Context, Poll};
+use tokio::sync::watch::error::RecvError;
+
+/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
+///
+/// This stream will start by yielding the current value when the WatchStream is polled,
+/// regardless of whether it was the initial value or sent afterwards,
+/// unless you use [`WatchStream<T>::from_changes`].
+///
+/// # Examples
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+/// use tokio::sync::watch;
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::new(rx);
+///
+/// assert_eq!(rx.next().await, Some("hello"));
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+/// use tokio::sync::watch;
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::new(rx);
+///
+/// // existing rx output with "hello" is ignored here
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// Example with [`WatchStream<T>::from_changes`]:
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use futures::future::FutureExt;
+/// use tokio::sync::watch;
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::from_changes(rx);
+///
+/// // no output from rx is available at this point - let's check this:
+/// assert!(rx.next().now_or_never().is_none());
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+pub struct WatchStream<T> {
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>,
+}
+
+async fn make_future<T: Clone + Send + Sync>(
+ mut rx: Receiver<T>,
+) -> (Result<(), RecvError>, Receiver<T>) {
+ let result = rx.changed().await;
+ (result, rx)
+}
+
+impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
+ /// Create a new `WatchStream`.
+ pub fn new(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
+ }
+ }
+
+ /// Create a new `WatchStream` that waits for the value to be changed.
+ pub fn from_changes(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
+}
+
+impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, mut rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow_and_update()).clone();
+ self.inner.set(make_future(rx));
+ Poll::Ready(Some(received))
+ }
+ Err(_) => {
+ self.inner.set(make_future(rx));
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl<T> Unpin for WatchStream<T> {}
+
+impl<T> fmt::Debug for WatchStream<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("WatchStream").finish()
+ }
+}
+
+impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}