diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/src/stream/merge.rs | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/stream/merge.rs')
-rw-r--r-- | third_party/rust/tokio/src/stream/merge.rs | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/stream/merge.rs b/third_party/rust/tokio/src/stream/merge.rs new file mode 100644 index 0000000000..4850cd40c7 --- /dev/null +++ b/third_party/rust/tokio/src/stream/merge.rs @@ -0,0 +1,97 @@ +use crate::stream::{Fuse, Stream}; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`merge`](super::StreamExt::merge) method. + pub struct Merge<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: Fuse<U>, + // When `true`, poll `a` first, otherwise, `poll` b`. + a_first: bool, + } +} + +impl<T, U> Merge<T, U> { + pub(super) fn new(a: T, b: U) -> Merge<T, U> + where + T: Stream, + U: Stream, + { + Merge { + a: Fuse::new(a), + b: Fuse::new(b), + a_first: true, + } + } +} + +impl<T, U> Stream for Merge<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let me = self.project(); + let a_first = *me.a_first; + + // Toggle the flag + *me.a_first = !a_first; + + if a_first { + poll_next(me.a, me.b, cx) + } else { + poll_next(me.b, me.a, cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (a_lower, a_upper) = self.a.size_hint(); + let (b_lower, b_upper) = self.b.size_hint(); + + let upper = match (a_upper, b_upper) { + (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper), + _ => None, + }; + + (a_lower + b_lower, upper) + } +} + +fn poll_next<T, U>( + first: Pin<&mut T>, + second: Pin<&mut U>, + cx: &mut Context<'_>, +) -> Poll<Option<T::Item>> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + use Poll::*; + + let mut done = true; + + match first.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + match second.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + if done { + Ready(None) + } else { + Pending + } +} |