summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-stream/src/stream_ext
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_ext')
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/all.rs58
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/any.rs58
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/chain.rs50
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs86
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/collect.rs229
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/filter.rs58
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/filter_map.rs58
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/fold.rs57
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/fuse.rs53
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/map.rs51
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/map_while.rs52
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/merge.rs90
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/next.rs44
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/skip.rs63
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/skip_while.rs73
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/take.rs76
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/take_while.rs79
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/then.rs83
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/throttle.rs96
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/timeout.rs107
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/try_next.rs45
21 files changed, 1566 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_ext/all.rs b/third_party/rust/tokio-stream/src/stream_ext/all.rs
new file mode 100644
index 0000000000..b4dbc1e97c
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/all.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`all`](super::StreamExt::all) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct AllFuture<'a, St: ?Sized, F> {
+ stream: &'a mut St,
+ f: F,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, St: ?Sized, F> AllFuture<'a, St, F> {
+ pub(super) fn new(stream: &'a mut St, f: F) -> Self {
+ Self {
+ stream,
+ f,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<St, F> Future for AllFuture<'_, St, F>
+where
+ St: ?Sized + Stream + Unpin,
+ F: FnMut(St::Item) -> bool,
+{
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ let mut stream = Pin::new(me.stream);
+
+ // Take a maximum of 32 items from the stream before yielding.
+ for _ in 0..32 {
+ match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+ Some(v) => {
+ if !(me.f)(v) {
+ return Poll::Ready(false);
+ }
+ }
+ None => return Poll::Ready(true),
+ }
+ }
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/any.rs b/third_party/rust/tokio-stream/src/stream_ext/any.rs
new file mode 100644
index 0000000000..31394f249b
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/any.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`any`](super::StreamExt::any) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct AnyFuture<'a, St: ?Sized, F> {
+ stream: &'a mut St,
+ f: F,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> {
+ pub(super) fn new(stream: &'a mut St, f: F) -> Self {
+ Self {
+ stream,
+ f,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<St, F> Future for AnyFuture<'_, St, F>
+where
+ St: ?Sized + Stream + Unpin,
+ F: FnMut(St::Item) -> bool,
+{
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ let mut stream = Pin::new(me.stream);
+
+ // Take a maximum of 32 items from the stream before yielding.
+ for _ in 0..32 {
+ match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+ Some(v) => {
+ if (me.f)(v) {
+ return Poll::Ready(true);
+ }
+ }
+ None => return Poll::Ready(false),
+ }
+ }
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/chain.rs b/third_party/rust/tokio-stream/src/stream_ext/chain.rs
new file mode 100644
index 0000000000..bd64f33ce4
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/chain.rs
@@ -0,0 +1,50 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`chain`](super::StreamExt::chain) method.
+ pub struct Chain<T, U> {
+ #[pin]
+ a: Fuse<T>,
+ #[pin]
+ b: U,
+ }
+}
+
+impl<T, U> Chain<T, U> {
+ pub(super) fn new(a: T, b: U) -> Chain<T, U>
+ where
+ T: Stream,
+ U: Stream,
+ {
+ Chain { a: Fuse::new(a), b }
+ }
+}
+
+impl<T, U> Stream for Chain<T, U>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ use Poll::Ready;
+
+ let me = self.project();
+
+ if let Some(v) = ready!(me.a.poll_next(cx)) {
+ return Ready(Some(v));
+ }
+
+ me.b.poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs
new file mode 100644
index 0000000000..48acd9328b
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs
@@ -0,0 +1,86 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+use tokio::time::{sleep, Sleep};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::time::Duration;
+
+pin_project! {
+ /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct ChunksTimeout<S: Stream> {
+ #[pin]
+ stream: Fuse<S>,
+ #[pin]
+ deadline: Option<Sleep>,
+ duration: Duration,
+ items: Vec<S::Item>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
+}
+
+impl<S: Stream> ChunksTimeout<S> {
+ pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
+ ChunksTimeout {
+ stream: Fuse::new(stream),
+ deadline: None,
+ duration,
+ items: Vec::with_capacity(max_size),
+ cap: max_size,
+ }
+ }
+}
+
+impl<S: Stream> Stream for ChunksTimeout<S> {
+ type Item = Vec<S::Item>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut me = self.as_mut().project();
+ loop {
+ match me.stream.as_mut().poll_next(cx) {
+ Poll::Pending => break,
+ Poll::Ready(Some(item)) => {
+ if me.items.is_empty() {
+ me.deadline.set(Some(sleep(*me.duration)));
+ me.items.reserve_exact(*me.cap);
+ }
+ me.items.push(item);
+ if me.items.len() >= *me.cap {
+ return Poll::Ready(Some(std::mem::take(me.items)));
+ }
+ }
+ Poll::Ready(None) => {
+ // Returning Some here is only correct because we fuse the inner stream.
+ let last = if me.items.is_empty() {
+ None
+ } else {
+ Some(std::mem::take(me.items))
+ };
+
+ return Poll::Ready(last);
+ }
+ }
+ }
+
+ if !me.items.is_empty() {
+ if let Some(deadline) = me.deadline.as_pin_mut() {
+ ready!(deadline.poll(cx));
+ }
+ return Poll::Ready(Some(std::mem::take(me.items)));
+ }
+
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let (lower, upper) = self.stream.size_hint();
+ let lower = (lower / self.cap).saturating_add(chunk_len);
+ let upper = upper.and_then(|x| x.checked_add(chunk_len));
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/collect.rs b/third_party/rust/tokio-stream/src/stream_ext/collect.rs
new file mode 100644
index 0000000000..8548b74556
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/collect.rs
@@ -0,0 +1,229 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::mem;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+// Do not export this struct until `FromStream` can be unsealed.
+pin_project! {
+ /// Future returned by the [`collect`](super::StreamExt::collect) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ #[derive(Debug)]
+ pub struct Collect<T, U>
+ where
+ T: Stream,
+ U: FromStream<T::Item>,
+ {
+ #[pin]
+ stream: T,
+ collection: U::InternalCollection,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+/// Convert from a [`Stream`](crate::Stream).
+///
+/// This trait is not intended to be used directly. Instead, call
+/// [`StreamExt::collect()`](super::StreamExt::collect).
+///
+/// # Implementing
+///
+/// Currently, this trait may not be implemented by third parties. The trait is
+/// sealed in order to make changes in the future. Stabilization is pending
+/// enhancements to the Rust language.
+pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
+
+impl<T, U> Collect<T, U>
+where
+ T: Stream,
+ U: FromStream<T::Item>,
+{
+ pub(super) fn new(stream: T) -> Collect<T, U> {
+ let (lower, upper) = stream.size_hint();
+ let collection = U::initialize(sealed::Internal, lower, upper);
+
+ Collect {
+ stream,
+ collection,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<T, U> Future for Collect<T, U>
+where
+ T: Stream,
+ U: FromStream<T::Item>,
+{
+ type Output = U;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
+ use Poll::Ready;
+
+ loop {
+ let me = self.as_mut().project();
+
+ let item = match ready!(me.stream.poll_next(cx)) {
+ Some(item) => item,
+ None => {
+ return Ready(U::finalize(sealed::Internal, me.collection));
+ }
+ };
+
+ if !U::extend(sealed::Internal, me.collection, item) {
+ return Ready(U::finalize(sealed::Internal, me.collection));
+ }
+ }
+ }
+}
+
+// ===== FromStream implementations
+
+impl FromStream<()> for () {}
+
+impl sealed::FromStreamPriv<()> for () {
+ type InternalCollection = ();
+
+ fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
+
+ fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
+ true
+ }
+
+ fn finalize(_: sealed::Internal, _collection: &mut ()) {}
+}
+
+impl<T: AsRef<str>> FromStream<T> for String {}
+
+impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
+ type InternalCollection = String;
+
+ fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
+ String::new()
+ }
+
+ fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
+ collection.push_str(item.as_ref());
+ true
+ }
+
+ fn finalize(_: sealed::Internal, collection: &mut String) -> String {
+ mem::take(collection)
+ }
+}
+
+impl<T> FromStream<T> for Vec<T> {}
+
+impl<T> sealed::FromStreamPriv<T> for Vec<T> {
+ type InternalCollection = Vec<T>;
+
+ fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
+ Vec::with_capacity(lower)
+ }
+
+ fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
+ collection.push(item);
+ true
+ }
+
+ fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
+ mem::take(collection)
+ }
+}
+
+impl<T> FromStream<T> for Box<[T]> {}
+
+impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
+ type InternalCollection = Vec<T>;
+
+ fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
+ <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
+ }
+
+ fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
+ <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
+ }
+
+ fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
+ <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
+ .into_boxed_slice()
+ }
+}
+
+impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
+
+impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
+where
+ U: FromStream<T>,
+{
+ type InternalCollection = Result<U::InternalCollection, E>;
+
+ fn initialize(
+ _: sealed::Internal,
+ lower: usize,
+ upper: Option<usize>,
+ ) -> Result<U::InternalCollection, E> {
+ Ok(U::initialize(sealed::Internal, lower, upper))
+ }
+
+ fn extend(
+ _: sealed::Internal,
+ collection: &mut Self::InternalCollection,
+ item: Result<T, E>,
+ ) -> bool {
+ assert!(collection.is_ok());
+ match item {
+ Ok(item) => {
+ let collection = collection.as_mut().ok().expect("invalid state");
+ U::extend(sealed::Internal, collection, item)
+ }
+ Err(err) => {
+ *collection = Err(err);
+ false
+ }
+ }
+ }
+
+ fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
+ if let Ok(collection) = collection.as_mut() {
+ Ok(U::finalize(sealed::Internal, collection))
+ } else {
+ let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
+
+ Err(res.map(drop).unwrap_err())
+ }
+ }
+}
+
+pub(crate) mod sealed {
+ #[doc(hidden)]
+ pub trait FromStreamPriv<T> {
+ /// Intermediate type used during collection process
+ ///
+ /// The name of this type is internal and cannot be relied upon.
+ type InternalCollection;
+
+ /// Initialize the collection
+ fn initialize(
+ internal: Internal,
+ lower: usize,
+ upper: Option<usize>,
+ ) -> Self::InternalCollection;
+
+ /// Extend the collection with the received item
+ ///
+ /// Return `true` to continue streaming, `false` complete collection.
+ fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
+
+ /// Finalize collection into target type.
+ fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
+ }
+
+ #[allow(missing_debug_implementations)]
+ pub struct Internal;
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/filter.rs b/third_party/rust/tokio-stream/src/stream_ext/filter.rs
new file mode 100644
index 0000000000..f3dd8716b4
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/filter.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`filter`](super::StreamExt::filter) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Filter<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for Filter<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Filter")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> Filter<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f }
+ }
+}
+
+impl<St, F> Stream for Filter<St, F>
+where
+ St: Stream,
+ F: FnMut(&St::Item) -> bool,
+{
+ type Item = St::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
+ loop {
+ match ready!(self.as_mut().project().stream.poll_next(cx)) {
+ Some(e) => {
+ if (self.as_mut().project().f)(&e) {
+ return Poll::Ready(Some(e));
+ }
+ }
+ None => return Poll::Ready(None),
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs b/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs
new file mode 100644
index 0000000000..fe604a6f4b
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`filter_map`](super::StreamExt::filter_map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct FilterMap<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for FilterMap<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FilterMap")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> FilterMap<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for FilterMap<St, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Option<T>,
+{
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ loop {
+ match ready!(self.as_mut().project().stream.poll_next(cx)) {
+ Some(e) => {
+ if let Some(e) = (self.as_mut().project().f)(e) {
+ return Poll::Ready(Some(e));
+ }
+ }
+ None => return Poll::Ready(None),
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/fold.rs b/third_party/rust/tokio-stream/src/stream_ext/fold.rs
new file mode 100644
index 0000000000..e2e97d8f37
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/fold.rs
@@ -0,0 +1,57 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future returned by the [`fold`](super::StreamExt::fold) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct FoldFuture<St, B, F> {
+ #[pin]
+ stream: St,
+ acc: Option<B>,
+ f: F,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<St, B, F> FoldFuture<St, B, F> {
+ pub(super) fn new(stream: St, init: B, f: F) -> Self {
+ Self {
+ stream,
+ acc: Some(init),
+ f,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<St, B, F> Future for FoldFuture<St, B, F>
+where
+ St: Stream,
+ F: FnMut(B, St::Item) -> B,
+{
+ type Output = B;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ loop {
+ let next = ready!(me.stream.as_mut().poll_next(cx));
+
+ match next {
+ Some(v) => {
+ let old = me.acc.take().unwrap();
+ let new = (me.f)(old, v);
+ *me.acc = Some(new);
+ }
+ None => return Poll::Ready(me.acc.take().unwrap()),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/fuse.rs b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs
new file mode 100644
index 0000000000..2500641d95
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs
@@ -0,0 +1,53 @@
+use crate::Stream;
+
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// Stream returned by [`fuse()`][super::StreamExt::fuse].
+ #[derive(Debug)]
+ pub struct Fuse<T> {
+ #[pin]
+ stream: Option<T>,
+ }
+}
+
+impl<T> Fuse<T>
+where
+ T: Stream,
+{
+ pub(crate) fn new(stream: T) -> Fuse<T> {
+ Fuse {
+ stream: Some(stream),
+ }
+ }
+}
+
+impl<T> Stream for Fuse<T>
+where
+ T: Stream,
+{
+ type Item = T::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ let res = match Option::as_pin_mut(self.as_mut().project().stream) {
+ Some(stream) => ready!(stream.poll_next(cx)),
+ None => return Poll::Ready(None),
+ };
+
+ if res.is_none() {
+ // Do not poll the stream anymore
+ self.as_mut().project().stream.set(None);
+ }
+
+ Poll::Ready(res)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self.stream {
+ Some(ref stream) => stream.size_hint(),
+ None => (0, Some(0)),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/map.rs b/third_party/rust/tokio-stream/src/stream_ext/map.rs
new file mode 100644
index 0000000000..e6b47cd258
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/map.rs
@@ -0,0 +1,51 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`map`](super::StreamExt::map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Map<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for Map<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Map").field("stream", &self.stream).finish()
+ }
+}
+
+impl<St, F> Map<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Map { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for Map<St, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> T,
+{
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.as_mut()
+ .project()
+ .stream
+ .poll_next(cx)
+ .map(|opt| opt.map(|x| (self.as_mut().project().f)(x)))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/map_while.rs b/third_party/rust/tokio-stream/src/stream_ext/map_while.rs
new file mode 100644
index 0000000000..d4fd825656
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/map_while.rs
@@ -0,0 +1,52 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`map_while`](super::StreamExt::map_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct MapWhile<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for MapWhile<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapWhile")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> MapWhile<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ MapWhile { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for MapWhile<St, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Option<T>,
+{
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let me = self.project();
+ let f = me.f;
+ me.stream.poll_next(cx).map(|opt| opt.and_then(f))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (_, upper) = self.stream.size_hint();
+ (0, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/merge.rs b/third_party/rust/tokio-stream/src/stream_ext/merge.rs
new file mode 100644
index 0000000000..9d5123c85a
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/merge.rs
@@ -0,0 +1,90 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`merge`](super::StreamExt::merge) method.
+ pub struct Merge<T, U> {
+ #[pin]
+ a: Fuse<T>,
+ #[pin]
+ b: Fuse<U>,
+ // When `true`, poll `a` first, otherwise, `poll` b`.
+ a_first: bool,
+ }
+}
+
+impl<T, U> Merge<T, U> {
+ pub(super) fn new(a: T, b: U) -> Merge<T, U>
+ where
+ T: Stream,
+ U: Stream,
+ {
+ Merge {
+ a: Fuse::new(a),
+ b: Fuse::new(b),
+ a_first: true,
+ }
+ }
+}
+
+impl<T, U> Stream for Merge<T, U>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ let me = self.project();
+ let a_first = *me.a_first;
+
+ // Toggle the flag
+ *me.a_first = !a_first;
+
+ if a_first {
+ poll_next(me.a, me.b, cx)
+ } else {
+ poll_next(me.b, me.a, cx)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
+ }
+}
+
+fn poll_next<T, U>(
+ first: Pin<&mut T>,
+ second: Pin<&mut U>,
+ cx: &mut Context<'_>,
+) -> Poll<Option<T::Item>>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ use Poll::*;
+
+ let mut done = true;
+
+ match first.poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {}
+ Pending => done = false,
+ }
+
+ match second.poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {}
+ Pending => done = false,
+ }
+
+ if done {
+ Ready(None)
+ } else {
+ Pending
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/next.rs b/third_party/rust/tokio-stream/src/stream_ext/next.rs
new file mode 100644
index 0000000000..706069fa6e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/next.rs
@@ -0,0 +1,44 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`next`](super::StreamExt::next) method.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. It only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Next<'a, St: ?Sized> {
+ stream: &'a mut St,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, St: ?Sized> Next<'a, St> {
+ pub(super) fn new(stream: &'a mut St) -> Self {
+ Next {
+ stream,
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
+ type Output = Option<St::Item>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ Pin::new(me.stream).poll_next(cx)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/skip.rs b/third_party/rust/tokio-stream/src/stream_ext/skip.rs
new file mode 100644
index 0000000000..80a0a0aff0
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/skip.rs
@@ -0,0 +1,63 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`skip`](super::StreamExt::skip) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Skip<St> {
+ #[pin]
+ stream: St,
+ remaining: usize,
+ }
+}
+
+impl<St> fmt::Debug for Skip<St>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Skip")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St> Skip<St> {
+ pub(super) fn new(stream: St, remaining: usize) -> Self {
+ Self { stream, remaining }
+ }
+}
+
+impl<St> Stream for Skip<St>
+where
+ St: Stream,
+{
+ type Item = St::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ loop {
+ match ready!(self.as_mut().project().stream.poll_next(cx)) {
+ Some(e) => {
+ if self.remaining == 0 {
+ return Poll::Ready(Some(e));
+ }
+ *self.as_mut().project().remaining -= 1;
+ }
+ None => return Poll::Ready(None),
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, upper) = self.stream.size_hint();
+
+ let lower = lower.saturating_sub(self.remaining);
+ let upper = upper.map(|x| x.saturating_sub(self.remaining));
+
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs b/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs
new file mode 100644
index 0000000000..985a92666e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs
@@ -0,0 +1,73 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct SkipWhile<St, F> {
+ #[pin]
+ stream: St,
+ predicate: Option<F>,
+ }
+}
+
+impl<St, F> fmt::Debug for SkipWhile<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("SkipWhile")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> SkipWhile<St, F> {
+ pub(super) fn new(stream: St, predicate: F) -> Self {
+ Self {
+ stream,
+ predicate: Some(predicate),
+ }
+ }
+}
+
+impl<St, F> Stream for SkipWhile<St, F>
+where
+ St: Stream,
+ F: FnMut(&St::Item) -> bool,
+{
+ type Item = St::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ if let Some(predicate) = this.predicate {
+ loop {
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(item) => {
+ if !(predicate)(&item) {
+ *this.predicate = None;
+ return Poll::Ready(Some(item));
+ }
+ }
+ None => return Poll::Ready(None),
+ }
+ }
+ } else {
+ this.stream.poll_next(cx)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, upper) = self.stream.size_hint();
+
+ if self.predicate.is_some() {
+ return (0, upper);
+ }
+
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/take.rs b/third_party/rust/tokio-stream/src/stream_ext/take.rs
new file mode 100644
index 0000000000..c75648f606
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/take.rs
@@ -0,0 +1,76 @@
+use crate::Stream;
+
+use core::cmp;
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`take`](super::StreamExt::take) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Take<St> {
+ #[pin]
+ stream: St,
+ remaining: usize,
+ }
+}
+
+impl<St> fmt::Debug for Take<St>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Take")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St> Take<St> {
+ pub(super) fn new(stream: St, remaining: usize) -> Self {
+ Self { stream, remaining }
+ }
+}
+
+impl<St> Stream for Take<St>
+where
+ St: Stream,
+{
+ type Item = St::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if *self.as_mut().project().remaining > 0 {
+ self.as_mut().project().stream.poll_next(cx).map(|ready| {
+ match &ready {
+ Some(_) => {
+ *self.as_mut().project().remaining -= 1;
+ }
+ None => {
+ *self.as_mut().project().remaining = 0;
+ }
+ }
+ ready
+ })
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.remaining == 0 {
+ return (0, Some(0));
+ }
+
+ let (lower, upper) = self.stream.size_hint();
+
+ let lower = cmp::min(lower, self.remaining as usize);
+
+ let upper = match upper {
+ Some(x) if x < self.remaining as usize => Some(x),
+ _ => Some(self.remaining as usize),
+ };
+
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/take_while.rs b/third_party/rust/tokio-stream/src/stream_ext/take_while.rs
new file mode 100644
index 0000000000..5ce4dd98a9
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/take_while.rs
@@ -0,0 +1,79 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`take_while`](super::StreamExt::take_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TakeWhile<St, F> {
+ #[pin]
+ stream: St,
+ predicate: F,
+ done: bool,
+ }
+}
+
+impl<St, F> fmt::Debug for TakeWhile<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TakeWhile")
+ .field("stream", &self.stream)
+ .field("done", &self.done)
+ .finish()
+ }
+}
+
+impl<St, F> TakeWhile<St, F> {
+ pub(super) fn new(stream: St, predicate: F) -> Self {
+ Self {
+ stream,
+ predicate,
+ done: false,
+ }
+ }
+}
+
+impl<St, F> Stream for TakeWhile<St, F>
+where
+ St: Stream,
+ F: FnMut(&St::Item) -> bool,
+{
+ type Item = St::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if !*self.as_mut().project().done {
+ self.as_mut().project().stream.poll_next(cx).map(|ready| {
+ let ready = ready.and_then(|item| {
+ if !(self.as_mut().project().predicate)(&item) {
+ None
+ } else {
+ Some(item)
+ }
+ });
+
+ if ready.is_none() {
+ *self.as_mut().project().done = true;
+ }
+
+ ready
+ })
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.done {
+ return (0, Some(0));
+ }
+
+ let (_, upper) = self.stream.size_hint();
+
+ (0, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/then.rs b/third_party/rust/tokio-stream/src/stream_ext/then.rs
new file mode 100644
index 0000000000..cc7caa721e
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/then.rs
@@ -0,0 +1,83 @@
+use crate::Stream;
+
+use core::fmt;
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`then`](super::StreamExt::then) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Then<St, Fut, F> {
+ #[pin]
+ stream: St,
+ #[pin]
+ future: Option<Fut>,
+ f: F,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Then")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> Then<St, Fut, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Then {
+ stream,
+ future: None,
+ f,
+ }
+ }
+}
+
+impl<St, F, Fut> Stream for Then<St, Fut, F>
+where
+ St: Stream,
+ Fut: Future,
+ F: FnMut(St::Item) -> Fut,
+{
+ type Item = Fut::Output;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> {
+ let mut me = self.project();
+
+ loop {
+ if let Some(future) = me.future.as_mut().as_pin_mut() {
+ match future.poll(cx) {
+ Poll::Ready(item) => {
+ me.future.set(None);
+ return Poll::Ready(Some(item));
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+
+ match me.stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(item)) => {
+ me.future.set(Some((me.f)(item)));
+ }
+ Poll::Ready(None) => return Poll::Ready(None),
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let future_len = usize::from(self.future.is_some());
+ let (lower, upper) = self.stream.size_hint();
+
+ let lower = lower.saturating_add(future_len);
+ let upper = upper.and_then(|upper| upper.checked_add(future_len));
+
+ (lower, upper)
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/throttle.rs b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs
new file mode 100644
index 0000000000..50001392ee
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs
@@ -0,0 +1,96 @@
+//! Slow down a stream by enforcing a delay between items.
+
+use crate::Stream;
+use tokio::time::{Duration, Instant, Sleep};
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{self, Poll};
+
+use pin_project_lite::pin_project;
+
+pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
+where
+ T: Stream,
+{
+ Throttle {
+ delay: tokio::time::sleep_until(Instant::now() + duration),
+ duration,
+ has_delayed: true,
+ stream,
+ }
+}
+
+pin_project! {
+ /// Stream for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to
+ /// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Throttle<T> {
+ #[pin]
+ delay: Sleep,
+ duration: Duration,
+
+ // Set to true when `delay` has returned ready, but `stream` hasn't.
+ has_delayed: bool,
+
+ // The stream to throttle
+ #[pin]
+ stream: T,
+ }
+}
+
+impl<T> Throttle<T> {
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &T {
+ &self.stream
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this combinator
+ /// is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the stream
+ /// which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.stream
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so care
+ /// should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> T {
+ self.stream
+ }
+}
+
+impl<T: Stream> Stream for Throttle<T> {
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut me = self.project();
+ let dur = *me.duration;
+
+ if !*me.has_delayed && !is_zero(dur) {
+ ready!(me.delay.as_mut().poll(cx));
+ *me.has_delayed = true;
+ }
+
+ let value = ready!(me.stream.poll_next(cx));
+
+ if value.is_some() {
+ if !is_zero(dur) {
+ me.delay.reset(Instant::now() + dur);
+ }
+
+ *me.has_delayed = false;
+ }
+
+ Poll::Ready(value)
+ }
+}
+
+fn is_zero(dur: Duration) -> bool {
+ dur == Duration::from_millis(0)
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/timeout.rs
new file mode 100644
index 0000000000..a440d203ec
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/timeout.rs
@@ -0,0 +1,107 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+use tokio::time::{Instant, Sleep};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::fmt;
+use std::time::Duration;
+
+pin_project! {
+ /// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct Timeout<S> {
+ #[pin]
+ stream: Fuse<S>,
+ #[pin]
+ deadline: Sleep,
+ duration: Duration,
+ poll_deadline: bool,
+ }
+}
+
+/// Error returned by `Timeout`.
+#[derive(Debug, PartialEq, Eq)]
+pub struct Elapsed(());
+
+impl<S: Stream> Timeout<S> {
+ pub(super) fn new(stream: S, duration: Duration) -> Self {
+ let next = Instant::now() + duration;
+ let deadline = tokio::time::sleep_until(next);
+
+ Timeout {
+ stream: Fuse::new(stream),
+ deadline,
+ duration,
+ poll_deadline: true,
+ }
+ }
+}
+
+impl<S: Stream> Stream for Timeout<S> {
+ type Item = Result<S::Item, Elapsed>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let me = self.project();
+
+ match me.stream.poll_next(cx) {
+ Poll::Ready(v) => {
+ if v.is_some() {
+ let next = Instant::now() + *me.duration;
+ me.deadline.reset(next);
+ *me.poll_deadline = true;
+ }
+ return Poll::Ready(v.map(Ok));
+ }
+ Poll::Pending => {}
+ };
+
+ if *me.poll_deadline {
+ ready!(me.deadline.poll(cx));
+ *me.poll_deadline = false;
+ return Poll::Ready(Some(Err(Elapsed::new())));
+ }
+
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, upper) = self.stream.size_hint();
+
+ // The timeout stream may insert an error before and after each message
+ // from the underlying stream, but no more than one error between each
+ // message. Hence the upper bound is computed as 2x+1.
+
+ // Using a helper function to enable use of question mark operator.
+ fn twice_plus_one(value: Option<usize>) -> Option<usize> {
+ value?.checked_mul(2)?.checked_add(1)
+ }
+
+ (lower, twice_plus_one(upper))
+ }
+}
+
+// ===== impl Elapsed =====
+
+impl Elapsed {
+ pub(crate) fn new() -> Self {
+ Elapsed(())
+ }
+}
+
+impl fmt::Display for Elapsed {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "deadline has elapsed".fmt(fmt)
+ }
+}
+
+impl std::error::Error for Elapsed {}
+
+impl From<Elapsed> for std::io::Error {
+ fn from(_err: Elapsed) -> std::io::Error {
+ std::io::ErrorKind::TimedOut.into()
+ }
+}
diff --git a/third_party/rust/tokio-stream/src/stream_ext/try_next.rs b/third_party/rust/tokio-stream/src/stream_ext/try_next.rs
new file mode 100644
index 0000000000..93aa3bc15f
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/try_next.rs
@@ -0,0 +1,45 @@
+use crate::stream_ext::Next;
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`try_next`](super::StreamExt::try_next) method.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. It only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryNext<'a, St: ?Sized> {
+ #[pin]
+ inner: Next<'a, St>,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<'a, St: ?Sized> TryNext<'a, St> {
+ pub(super) fn new(stream: &'a mut St) -> Self {
+ Self {
+ inner: Next::new(stream),
+ _pin: PhantomPinned,
+ }
+ }
+}
+
+impl<T, E, St: ?Sized + Stream<Item = Result<T, E>> + Unpin> Future for TryNext<'_, St> {
+ type Output = Result<Option<T>, E>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ me.inner.poll(cx).map(Option::transpose)
+ }
+}