summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-stream/src/stream_ext/fuse.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_ext/fuse.rs')
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/fuse.rs53
1 files changed, 53 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_ext/fuse.rs b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs
new file mode 100644
index 0000000000..2500641d95
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs
@@ -0,0 +1,53 @@
+use crate::Stream;
+
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// Stream returned by [`fuse()`][super::StreamExt::fuse].
+ #[derive(Debug)]
+ pub struct Fuse<T> {
+ #[pin]
+ stream: Option<T>,
+ }
+}
+
+impl<T> Fuse<T>
+where
+ T: Stream,
+{
+ pub(crate) fn new(stream: T) -> Fuse<T> {
+ Fuse {
+ stream: Some(stream),
+ }
+ }
+}
+
+impl<T> Stream for Fuse<T>
+where
+ T: Stream,
+{
+ type Item = T::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ let res = match Option::as_pin_mut(self.as_mut().project().stream) {
+ Some(stream) => ready!(stream.poll_next(cx)),
+ None => return Poll::Ready(None),
+ };
+
+ if res.is_none() {
+ // Do not poll the stream anymore
+ self.as_mut().project().stream.set(None);
+ }
+
+ Poll::Ready(res)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self.stream {
+ Some(ref stream) => stream.size_hint(),
+ None => (0, Some(0)),
+ }
+ }
+}