diff options
Diffstat (limited to 'vendor/futures-util/src/stream')
38 files changed, 1238 insertions, 254 deletions
diff --git a/vendor/futures-util/src/stream/futures_ordered.rs b/vendor/futures-util/src/stream/futures_ordered.rs index f596b3b0e..618bf1b7b 100644 --- a/vendor/futures-util/src/stream/futures_ordered.rs +++ b/vendor/futures-util/src/stream/futures_ordered.rs @@ -19,7 +19,7 @@ pin_project! { struct OrderWrapper<T> { #[pin] data: T, // A future or a future's output - index: usize, + index: isize, } } @@ -58,7 +58,7 @@ where /// An unbounded queue of futures. /// -/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order +/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order /// on top of the set of futures. While futures in the set will race to /// completion in parallel, results will only be returned in the order their /// originating futures were added to the queue. @@ -95,8 +95,8 @@ where pub struct FuturesOrdered<T: Future> { in_progress_queue: FuturesUnordered<OrderWrapper<T>>, queued_outputs: BinaryHeap<OrderWrapper<T::Output>>, - next_incoming_index: usize, - next_outgoing_index: usize, + next_incoming_index: isize, + next_outgoing_index: isize, } impl<T: Future> Unpin for FuturesOrdered<T> {} @@ -135,11 +135,35 @@ impl<Fut: Future> FuturesOrdered<Fut> { /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. + #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { + self.push_back(future); + } + + /// Pushes a future to the back of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. + pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } + + /// Pushes a future to the front of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. This future will be the next future to be returned + /// complete. + pub fn push_front(&mut self, future: Fut) { + let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; + self.next_outgoing_index -= 1; + self.in_progress_queue.push(wrapped); + } } impl<Fut: Future> Default for FuturesOrdered<Fut> { @@ -196,7 +220,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { { let acc = Self::new(); iter.into_iter().fold(acc, |mut acc, item| { - acc.push(item); + acc.push_back(item); acc }) } @@ -214,7 +238,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { I: IntoIterator<Item = Fut>, { for item in iter { - self.push(item); + self.push_back(item); } } } diff --git a/vendor/futures-util/src/stream/futures_unordered/iter.rs b/vendor/futures-util/src/stream/futures_unordered/iter.rs index 04db5ee75..20248c70f 100644 --- a/vendor/futures-util/src/stream/futures_unordered/iter.rs +++ b/vendor/futures-util/src/stream/futures_unordered/iter.rs @@ -2,6 +2,7 @@ use super::task::Task; use super::FuturesUnordered; use core::marker::PhantomData; use core::pin::Pin; +use core::ptr; use core::sync::atomic::Ordering::Relaxed; /// Mutable iterator over all futures in the unordered set. @@ -58,6 +59,9 @@ impl<Fut: Unpin> Iterator for IntoIter<Fut> { // valid `next_all` checks can be skipped. let next = (**task).next_all.load(Relaxed); *task = next; + if !task.is_null() { + *(**task).prev_all.get() = ptr::null_mut(); + } self.len -= 1; Some(future) } diff --git a/vendor/futures-util/src/stream/futures_unordered/mod.rs b/vendor/futures-util/src/stream/futures_unordered/mod.rs index aab2bb446..6b5804dc4 100644 --- a/vendor/futures-util/src/stream/futures_unordered/mod.rs +++ b/vendor/futures-util/src/stream/futures_unordered/mod.rs @@ -6,7 +6,6 @@ use crate::task::AtomicWaker; use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::cmp; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::marker::PhantomData; @@ -23,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352 pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; @@ -31,35 +31,11 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; -/// Constant used for a `FuturesUnordered` to determine how many times it is -/// allowed to poll underlying futures without yielding. -/// -/// A single call to `poll_next` may potentially do a lot of work before -/// yielding. This happens in particular if the underlying futures are awoken -/// frequently but continue to return `Pending`. This is problematic if other -/// tasks are waiting on the executor, since they do not get to run. This value -/// caps the number of calls to `poll` on underlying futures a single call to -/// `poll_next` is allowed to make. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough -/// that amortize wakeup and scheduling costs, but low enough that we do not -/// starve other tasks for long. -/// -/// See also https://github.com/rust-lang/futures-rs/issues/2047. -/// -/// Note that using the length of the `FuturesUnordered` instead of this value -/// may cause problems if the number of futures is large. -/// See also https://github.com/rust-lang/futures-rs/pull/2527. -/// -/// Additionally, polling the same future twice per iteration may cause another -/// problem. So, when using this value, it is necessary to limit the max value -/// based on the length of the `FuturesUnordered`. -/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`) -/// See also https://github.com/rust-lang/futures-rs/pull/2333. -const YIELD_EVERY: usize = 32; - /// A set of futures which may complete in any order. /// +/// See [`FuturesOrdered`](crate::stream::FuturesOrdered) for a version of this +/// type that preserves a FIFO order. +/// /// This structure is optimized to manage a large number of futures. /// Futures managed by [`FuturesUnordered`] will only be polled when they /// generate wake-up notifications. This reduces the required amount of work @@ -149,8 +125,9 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Weak::new(), + woken: AtomicBool::new(false), }); - let stub_ptr = &*stub as *const Task<Fut>; + let stub_ptr = Arc::as_ptr(&stub); let ready_to_run_queue = Arc::new(ReadyToRunQueue { waker: AtomicWaker::new(), head: AtomicPtr::new(stub_ptr as *mut _), @@ -195,6 +172,7 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), + woken: AtomicBool::new(false), }); // Reset the `is_terminated` flag if we've previously marked ourselves @@ -403,7 +381,7 @@ impl<Fut> FuturesUnordered<Fut> { // The `ReadyToRunQueue` stub is never inserted into the `head_all` // list, and its pointer value will remain valid for the lifetime of // this `FuturesUnordered`, so we can make use of its value here. - &*self.ready_to_run_queue.stub as *const _ as *mut _ + Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _ } } @@ -411,12 +389,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - // See YIELD_EVERY docs for more. - let yield_every = cmp::min(self.len(), YIELD_EVERY); + let len = self.len(); // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; + let mut yielded = 0; // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -527,7 +505,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { // the internal allocation, appropriately accessing fields and // deallocating the task if need be. let res = { - let waker = Task::waker_ref(bomb.task.as_ref().unwrap()); + let task = bomb.task.as_ref().unwrap(); + // We are only interested in whether the future is awoken before it + // finishes polling, so reset the flag here. + task.woken.store(false, Relaxed); + let waker = Task::waker_ref(task); let mut cx = Context::from_waker(&waker); // Safety: We won't move the future ever again @@ -540,12 +522,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { match res { Poll::Pending => { let task = bomb.task.take().unwrap(); + // If the future was awoken during polling, we assume + // the future wanted to explicitly yield. + yielded += task.woken.load(Relaxed) as usize; bomb.queue.link(task); - if polled == yield_every { - // We have polled a large number of futures in a row without yielding. - // To ensure we do not starve other tasks waiting on the executor, - // we yield here, but immediately wake ourselves up to continue. + // If a future yields, we respect it and yield here. + // If all futures have been polled, we also yield here to + // avoid starving other tasks waiting on the executor. + // (polling the same future twice per iteration may cause + // the problem: https://github.com/rust-lang/futures-rs/pull/2333) + if yielded >= 2 || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 5ef6cde83..451870532 100644 --- a/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -83,7 +83,7 @@ impl<Fut> ReadyToRunQueue<Fut> { } pub(super) fn stub(&self) -> *const Task<Fut> { - &*self.stub + Arc::as_ptr(&self.stub) } // Clear the queue of tasks. diff --git a/vendor/futures-util/src/stream/futures_unordered/task.rs b/vendor/futures-util/src/stream/futures_unordered/task.rs index da2cd67d9..ec2114eff 100644 --- a/vendor/futures-util/src/stream/futures_unordered/task.rs +++ b/vendor/futures-util/src/stream/futures_unordered/task.rs @@ -1,6 +1,6 @@ use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::sync::atomic::Ordering::{self, SeqCst}; +use core::sync::atomic::Ordering::{self, Relaxed, SeqCst}; use core::sync::atomic::{AtomicBool, AtomicPtr}; use super::abort::abort; @@ -31,6 +31,11 @@ pub(super) struct Task<Fut> { // Whether or not this task is currently in the ready to run queue pub(super) queued: AtomicBool, + + // Whether the future was awoken during polling + // It is possible for this flag to be set to true after the polling, + // but it will be ignored. + pub(super) woken: AtomicBool, } // `Task` can be sent across threads safely because it ensures that @@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> { None => return, }; + arc_self.woken.store(true, Relaxed); + // It's our job to enqueue this task it into the ready to run queue. To // do this we set the `queued` flag, and if successful we then do the // actual queueing operation, ensuring that we're only queued once. @@ -62,7 +69,7 @@ impl<Fut> ArcWake for Task<Fut> { // still. let prev = arc_self.queued.swap(true, SeqCst); if !prev { - inner.enqueue(&**arc_self); + inner.enqueue(Arc::as_ptr(arc_self)); inner.waker.wake(); } } diff --git a/vendor/futures-util/src/stream/mod.rs b/vendor/futures-util/src/stream/mod.rs index ec685b984..bf9506147 100644 --- a/vendor/futures-util/src/stream/mod.rs +++ b/vendor/futures-util/src/stream/mod.rs @@ -18,9 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; #[allow(clippy::module_inception)] mod stream; pub use self::stream::{ - Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, - Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, + All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, + Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, + SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, + Unzip, Zip, }; #[cfg(feature = "std")] @@ -38,7 +39,9 @@ pub use self::stream::Forward; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] -pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent}; +pub use self::stream::{ + BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent, +}; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "sink")] @@ -60,7 +63,9 @@ pub use self::try_stream::IntoAsyncRead; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] -pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; +pub use self::try_stream::{ + TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent, +}; #[cfg(feature = "alloc")] pub use self::try_stream::{TryChunks, TryChunksError}; diff --git a/vendor/futures-util/src/stream/select_all.rs b/vendor/futures-util/src/stream/select_all.rs index 3474331ad..121b6a0e5 100644 --- a/vendor/futures-util/src/stream/select_all.rs +++ b/vendor/futures-util/src/stream/select_all.rs @@ -8,29 +8,24 @@ use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project_lite::pin_project; - use super::assert_stream; use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; -pin_project! { - /// An unbounded set of streams - /// - /// This "combinator" provides the ability to maintain a set of streams - /// and drive them all to completion. - /// - /// Streams are pushed into this set and their realized values are - /// yielded as they become ready. Streams will only be polled when they - /// generate notifications. This allows to coordinate a large number of streams. - /// - /// Note that you can create a ready-made `SelectAll` via the - /// `select_all` function in the `stream` module, or you can start with an - /// empty set with the `SelectAll::new` constructor. - #[must_use = "streams do nothing unless polled"] - pub struct SelectAll<St> { - #[pin] - inner: FuturesUnordered<StreamFuture<St>>, - } +/// An unbounded set of streams +/// +/// This "combinator" provides the ability to maintain a set of streams +/// and drive them all to completion. +/// +/// Streams are pushed into this set and their realized values are +/// yielded as they become ready. Streams will only be polled when they +/// generate notifications. This allows to coordinate a large number of streams. +/// +/// Note that you can create a ready-made `SelectAll` via the +/// `select_all` function in the `stream` module, or you can start with an +/// empty set with the `SelectAll::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct SelectAll<St> { + inner: FuturesUnordered<StreamFuture<St>>, } impl<St: Debug> Debug for SelectAll<St> { diff --git a/vendor/futures-util/src/stream/select_with_strategy.rs b/vendor/futures-util/src/stream/select_with_strategy.rs index bd86990cd..224d5f821 100644 --- a/vendor/futures-util/src/stream/select_with_strategy.rs +++ b/vendor/futures-util/src/stream/select_with_strategy.rs @@ -1,5 +1,4 @@ use super::assert_stream; -use crate::stream::{Fuse, StreamExt}; use core::{fmt, pin::Pin}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -18,13 +17,15 @@ impl PollNext { /// Toggle the value and return the old one. pub fn toggle(&mut self) -> Self { let old = *self; + *self = self.other(); + old + } + fn other(&self) -> PollNext { match self { - PollNext::Left => *self = PollNext::Right, - PollNext::Right => *self = PollNext::Left, + PollNext::Left => PollNext::Right, + PollNext::Right => PollNext::Left, } - - old } } @@ -34,14 +35,41 @@ impl Default for PollNext { } } +enum InternalState { + Start, + LeftFinished, + RightFinished, + BothFinished, +} + +impl InternalState { + fn finish(&mut self, ps: PollNext) { + match (&self, ps) { + (InternalState::Start, PollNext::Left) => { + *self = InternalState::LeftFinished; + } + (InternalState::Start, PollNext::Right) => { + *self = InternalState::RightFinished; + } + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { + *self = InternalState::BothFinished; + } + _ => {} + } + } +} + pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] + #[project = SelectWithStrategyProj] pub struct SelectWithStrategy<St1, St2, Clos, State> { #[pin] - stream1: Fuse<St1>, + stream1: St1, #[pin] - stream2: Fuse<St2>, + stream2: St2, + internal_state: InternalState, state: State, clos: Clos, } @@ -120,9 +148,10 @@ where State: Default, { assert_stream::<St1::Item, _>(SelectWithStrategy { - stream1: stream1.fuse(), - stream2: stream2.fuse(), + stream1, + stream2, state: Default::default(), + internal_state: InternalState::Start, clos: which, }) } @@ -131,7 +160,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { - (self.stream1.get_ref(), self.stream2.get_ref()) + (&self.stream1, &self.stream2) } /// Acquires a mutable reference to the underlying streams that this @@ -140,7 +169,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { - (self.stream1.get_mut(), self.stream2.get_mut()) + (&mut self.stream1, &mut self.stream2) } /// Acquires a pinned mutable reference to the underlying streams that this @@ -150,7 +179,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { let this = self.project(); - (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + (this.stream1, this.stream2) } /// Consumes this combinator, returning the underlying streams. @@ -158,7 +187,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> (St1, St2) { - (self.stream1.into_inner(), self.stream2.into_inner()) + (self.stream1, self.stream2) } } @@ -169,47 +198,93 @@ where Clos: FnMut(&mut State) -> PollNext, { fn is_terminated(&self) -> bool { - self.stream1.is_terminated() && self.stream2.is_terminated() + match self.internal_state { + InternalState::BothFinished => true, + _ => false, + } } } -impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State> +#[inline] +fn poll_side<St1, St2, Clos, State>( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, + cx: &mut Context<'_>, +) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>, - Clos: FnMut(&mut State) -> PollNext, { - type Item = St1::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { - let this = self.project(); - - match (this.clos)(this.state) { - PollNext::Left => poll_inner(this.stream1, this.stream2, cx), - PollNext::Right => poll_inner(this.stream2, this.stream1, cx), - } + match side { + PollNext::Left => select.stream1.as_mut().poll_next(cx), + PollNext::Right => select.stream2.as_mut().poll_next(cx), } } -fn poll_inner<St1, St2>( - a: Pin<&mut St1>, - b: Pin<&mut St2>, +#[inline] +fn poll_inner<St1, St2, Clos, State>( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>, { - let a_done = match a.poll_next(cx) { + let first_done = match poll_side(select, side, cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => true, + Poll::Ready(None) => { + select.internal_state.finish(side); + true + } Poll::Pending => false, }; + let other = side.other(); + match poll_side(select, other, cx) { + Poll::Ready(None) => { + select.internal_state.finish(other); + if first_done { + Poll::Ready(None) + } else { + Poll::Pending + } + } + a => a, + } +} + +impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State> +where + St1: Stream, + St2: Stream<Item = St1::Item>, + Clos: FnMut(&mut State) -> PollNext, +{ + type Item = St1::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { + let mut this = self.project(); - match b.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) if a_done => Poll::Ready(None), - Poll::Ready(None) | Poll::Pending => Poll::Pending, + match this.internal_state { + InternalState::Start => { + let next_side = (this.clos)(this.state); + poll_inner(&mut this, next_side, cx) + } + InternalState::LeftFinished => match this.stream2.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::RightFinished => match this.stream1.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::BothFinished => Poll::Ready(None), + } } } diff --git a/vendor/futures-util/src/stream/stream/buffer_unordered.rs b/vendor/futures-util/src/stream/stream/buffer_unordered.rs index d64c142b4..91b0f6bcc 100644 --- a/vendor/futures-util/src/stream/stream/buffer_unordered.rs +++ b/vendor/futures-util/src/stream/stream/buffer_unordered.rs @@ -41,11 +41,7 @@ where St: Stream, St::Item: Future, { - pub(super) fn new(stream: St, n: usize) -> Self - where - St: Stream, - St::Item: Future, - { + pub(super) fn new(stream: St, n: usize) -> Self { Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesUnordered::new(), diff --git a/vendor/futures-util/src/stream/stream/buffered.rs b/vendor/futures-util/src/stream/stream/buffered.rs index 6052a737b..5854eb7ea 100644 --- a/vendor/futures-util/src/stream/stream/buffered.rs +++ b/vendor/futures-util/src/stream/stream/buffered.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, FuturesOrdered, StreamExt}; +use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt}; use core::fmt; use core::pin::Pin; use futures_core::future::Future; @@ -64,7 +64,7 @@ where // our queue of futures. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } @@ -95,6 +95,16 @@ where } } +impl<St> FusedStream for Buffered<St> +where + St: Stream, + St::Item: Future, +{ + fn is_terminated(&self) -> bool { + self.stream.is_done() && self.in_progress_queue.is_terminated() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Item> Sink<Item> for Buffered<S> diff --git a/vendor/futures-util/src/stream/stream/chain.rs b/vendor/futures-util/src/stream/stream/chain.rs index c5da35e25..36ff1e533 100644 --- a/vendor/futures-util/src/stream/stream/chain.rs +++ b/vendor/futures-util/src/stream/stream/chain.rs @@ -50,8 +50,9 @@ where if let Some(item) = ready!(first.poll_next(cx)) { return Poll::Ready(Some(item)); } + + this.first.set(None); } - this.first.set(None); this.second.poll_next(cx) } diff --git a/vendor/futures-util/src/stream/stream/chunks.rs b/vendor/futures-util/src/stream/stream/chunks.rs index 845786999..2a71ebc6c 100644 --- a/vendor/futures-util/src/stream/stream/chunks.rs +++ b/vendor/futures-util/src/stream/stream/chunks.rs @@ -21,10 +21,7 @@ pin_project! { } } -impl<St: Stream> Chunks<St> -where - St: Stream, -{ +impl<St: Stream> Chunks<St> { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); @@ -66,7 +63,7 @@ impl<St: Stream> Stream for Chunks<St> { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -77,9 +74,9 @@ impl<St: Stream> Stream for Chunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/vendor/futures-util/src/stream/stream/collect.rs b/vendor/futures-util/src/stream/stream/collect.rs index b0e81b9ce..970ac26db 100644 --- a/vendor/futures-util/src/stream/stream/collect.rs +++ b/vendor/futures-util/src/stream/stream/collect.rs @@ -19,7 +19,7 @@ pin_project! { impl<St: Stream, C: Default> Collect<St, C> { fn finish(self: Pin<&mut Self>) -> C { - mem::replace(self.project().collection, Default::default()) + mem::take(self.project().collection) } pub(super) fn new(stream: St) -> Self { diff --git a/vendor/futures-util/src/stream/stream/filter.rs b/vendor/futures-util/src/stream/stream/filter.rs index ccf1a5122..997fe9977 100644 --- a/vendor/futures-util/src/stream/stream/filter.rs +++ b/vendor/futures-util/src/stream/stream/filter.rs @@ -93,7 +93,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/stream/filter_map.rs b/vendor/futures-util/src/stream/stream/filter_map.rs index 02a0a4386..6b7d0070d 100644 --- a/vendor/futures-util/src/stream/stream/filter_map.rs +++ b/vendor/futures-util/src/stream/stream/filter_map.rs @@ -87,7 +87,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/stream/flatten_unordered.rs b/vendor/futures-util/src/stream/stream/flatten_unordered.rs new file mode 100644 index 000000000..44c6ace2f --- /dev/null +++ b/vendor/futures-util/src/stream/stream/flatten_unordered.rs @@ -0,0 +1,536 @@ +use alloc::sync::Arc; +use core::{ + cell::UnsafeCell, + convert::identity, + fmt, + marker::PhantomData, + num::NonZeroUsize, + pin::Pin, + sync::atomic::{AtomicU8, Ordering}, +}; + +use pin_project_lite::pin_project; + +use futures_core::{ + future::Future, + ready, + stream::{FusedStream, Stream}, + task::{Context, Poll, Waker}, +}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use futures_task::{waker, ArcWake}; + +use crate::stream::FuturesUnordered; + +/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) +/// method. +pub type FlattenUnordered<St> = FlattenUnorderedWithFlowController<St, ()>; + +/// There is nothing to poll and stream isn't being polled/waking/woken at the moment. +const NONE: u8 = 0; + +/// Inner streams need to be polled. +const NEED_TO_POLL_INNER_STREAMS: u8 = 1; + +/// The base stream needs to be polled. +const NEED_TO_POLL_STREAM: u8 = 0b10; + +/// Both base stream and inner streams need to be polled. +const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM; + +/// The current stream is being polled at the moment. +const POLLING: u8 = 0b100; + +/// Stream is being woken at the moment. +const WAKING: u8 = 0b1000; + +/// The stream was waked and will be polled. +const WOKEN: u8 = 0b10000; + +/// Internal polling state of the stream. +#[derive(Clone, Debug)] +struct SharedPollState { + state: Arc<AtomicU8>, +} + +impl SharedPollState { + /// Constructs new `SharedPollState` with the given state. + fn new(value: u8) -> SharedPollState { + SharedPollState { state: Arc::new(AtomicU8::new(value)) } + } + + /// Attempts to start polling, returning stored state in case of success. + /// Returns `None` if either waker is waking at the moment. + fn start_polling( + &self, + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + if value & WAKING == NONE { + Some(POLLING) + } else { + None + } + }) + .ok()?; + let bomb = PollStateBomb::new(self, SharedPollState::reset); + + Some((value, bomb)) + } + + /// Attempts to start the waking process and performs bitwise or with the given value. + /// + /// If some waker is already in progress or stream is already woken/being polled, waking process won't start, however + /// state will be disjuncted with the given value. + fn start_waking( + &self, + to_poll: u8, + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + let mut next_value = value | to_poll; + if value & (WOKEN | POLLING) == NONE { + next_value |= WAKING; + } + + if next_value != value { + Some(next_value) + } else { + None + } + }) + .ok()?; + + // Only start the waking process if we're not in the polling/waking phase and the stream isn't woken already + if value & (WOKEN | POLLING | WAKING) == NONE { + let bomb = PollStateBomb::new(self, SharedPollState::stop_waking); + + Some((value, bomb)) + } else { + None + } + } + + /// Sets current state to + /// - `!POLLING` allowing to use wakers + /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called, + /// or `will_be_woken` flag supplied + /// - `!WAKING` as + /// * Wakers called during the `POLLING` phase won't propagate their calls + /// * `POLLING` phase can't start if some of the wakers are active + /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. + fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| { + let mut next_value = to_poll; + + value &= NEED_TO_POLL_ALL; + if value != NONE || will_be_woken { + next_value |= WOKEN; + } + next_value |= value; + + Some(next_value & !POLLING & !WAKING) + }) + .unwrap() + } + + /// Toggles state to non-waking, allowing to start polling. + fn stop_waking(&self) -> u8 { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + let next_value = value & !WAKING | WOKEN; + + if next_value != value { + Some(next_value) + } else { + None + } + }) + .unwrap_or_else(identity); + + debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING); + value + } + + /// Resets current state allowing to poll the stream and wake up wakers. + fn reset(&self) -> u8 { + self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst) + } +} + +/// Used to execute some function on the given state when dropped. +struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> { + state: &'a SharedPollState, + drop: Option<F>, +} + +impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> { + /// Constructs new bomb with the given state. + fn new(state: &'a SharedPollState, drop: F) -> Self { + Self { state, drop: Some(drop) } + } + + /// Deactivates bomb, forces it to not call provided function when dropped. + fn deactivate(mut self) { + self.drop.take(); + } +} + +impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> { + fn drop(&mut self) { + if let Some(drop) = self.drop.take() { + (drop)(self.state); + } + } +} + +/// Will update state with the provided value on `wake_by_ref` call +/// and then, if there is a need, call `inner_waker`. +struct WrappedWaker { + inner_waker: UnsafeCell<Option<Waker>>, + poll_state: SharedPollState, + need_to_poll: u8, +} + +unsafe impl Send for WrappedWaker {} +unsafe impl Sync for WrappedWaker {} + +impl WrappedWaker { + /// Replaces given waker's inner_waker for polling stream/futures which will + /// update poll state on `wake_by_ref` call. Use only if you need several + /// contexts. + /// + /// ## Safety + /// + /// This function will modify waker's `inner_waker` via `UnsafeCell`, so + /// it should be used only during `POLLING` phase by one thread at the time. + unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) { + *self_arc.inner_waker.get() = cx.waker().clone().into(); + } + + /// Attempts to start the waking process for the waker with the given value. + /// If succeeded, then the stream isn't yet woken and not being polled at the moment. + fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + self.poll_state.start_waking(self.need_to_poll) + } +} + +impl ArcWake for WrappedWaker { + fn wake_by_ref(self_arc: &Arc<Self>) { + if let Some((_, state_bomb)) = self_arc.start_waking() { + // Safety: now state is not `POLLING` + let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() }; + + if let Some(inner_waker) = waker_opt.clone() { + // Stop waking to allow polling stream + drop(state_bomb); + + // Wake up inner waker + inner_waker.wake(); + } + } + } +} + +pin_project! { + /// Future which polls optional inner stream. + /// + /// If it's `Some`, it will attempt to call `poll_next` on it, + /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))` + /// or `None` in case of `Poll::Ready(None)`. + /// + /// If `poll_next` will return `Poll::Pending`, it will be forwarded to + /// the future and current task will be notified by waker. + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct PollStreamFut<St> { + #[pin] + stream: Option<St>, + } +} + +impl<St> PollStreamFut<St> { + /// Constructs new `PollStreamFut` using given `stream`. + fn new(stream: impl Into<Option<St>>) -> Self { + Self { stream: stream.into() } + } +} + +impl<St: Stream + Unpin> Future for PollStreamFut<St> { + type Output = Option<(St::Item, PollStreamFut<St>)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut stream = self.project().stream; + + let item = if let Some(stream) = stream.as_mut().as_pin_mut() { + ready!(stream.poll_next(cx)) + } else { + None + }; + let next_item_fut = PollStreamFut::new(stream.get_mut().take()); + let out = item.map(|item| (item, next_item_fut)); + + Poll::Ready(out) + } +} + +pin_project! { + /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) + /// method with ability to specify flow controller. + #[project = FlattenUnorderedWithFlowControllerProj] + #[must_use = "streams do nothing unless polled"] + pub struct FlattenUnorderedWithFlowController<St, Fc> where St: Stream { + #[pin] + inner_streams: FuturesUnordered<PollStreamFut<St::Item>>, + #[pin] + stream: St, + poll_state: SharedPollState, + limit: Option<NonZeroUsize>, + is_stream_done: bool, + inner_streams_waker: Arc<WrappedWaker>, + stream_waker: Arc<WrappedWaker>, + flow_controller: PhantomData<Fc> + } +} + +impl<St, Fc> fmt::Debug for FlattenUnorderedWithFlowController<St, Fc> +where + St: Stream + fmt::Debug, + St::Item: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlattenUnorderedWithFlowController") + .field("poll_state", &self.poll_state) + .field("inner_streams", &self.inner_streams) + .field("limit", &self.limit) + .field("stream", &self.stream) + .field("is_stream_done", &self.is_stream_done) + .field("flow_controller", &self.flow_controller) + .finish() + } +} + +impl<St, Fc> FlattenUnorderedWithFlowController<St, Fc> +where + St: Stream, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>, + St::Item: Stream + Unpin, +{ + pub(crate) fn new( + stream: St, + limit: Option<usize>, + ) -> FlattenUnorderedWithFlowController<St, Fc> { + let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM); + + FlattenUnorderedWithFlowController { + inner_streams: FuturesUnordered::new(), + stream, + is_stream_done: false, + limit: limit.and_then(NonZeroUsize::new), + inner_streams_waker: Arc::new(WrappedWaker { + inner_waker: UnsafeCell::new(None), + poll_state: poll_state.clone(), + need_to_poll: NEED_TO_POLL_INNER_STREAMS, + }), + stream_waker: Arc::new(WrappedWaker { + inner_waker: UnsafeCell::new(None), + poll_state: poll_state.clone(), + need_to_poll: NEED_TO_POLL_STREAM, + }), + poll_state, + flow_controller: PhantomData, + } + } + + delegate_access_inner!(stream, St, ()); +} + +/// Returns the next flow step based on the received item. +pub trait FlowController<I, O> { + /// Handles an item producing `FlowStep` describing the next flow step. + fn next_step(item: I) -> FlowStep<I, O>; +} + +impl<I, O> FlowController<I, O> for () { + fn next_step(item: I) -> FlowStep<I, O> { + FlowStep::Continue(item) + } +} + +/// Describes the next flow step. +#[derive(Debug, Clone)] +pub enum FlowStep<C, R> { + /// Just yields an item and continues standard flow. + Continue(C), + /// Immediately returns an underlying item from the function. + Return(R), +} + +impl<St, Fc> FlattenUnorderedWithFlowControllerProj<'_, St, Fc> +where + St: Stream, +{ + /// Checks if current `inner_streams` bucket size is greater than optional limit. + fn is_exceeded_limit(&self) -> bool { + self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get()) + } +} + +impl<St, Fc> FusedStream for FlattenUnorderedWithFlowController<St, Fc> +where + St: FusedStream, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>, + St::Item: Stream + Unpin, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.inner_streams.is_empty() + } +} + +impl<St, Fc> Stream for FlattenUnorderedWithFlowController<St, Fc> +where + St: Stream, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>, + St::Item: Stream + Unpin, +{ + type Item = <St::Item as Stream>::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut next_item = None; + let mut need_to_poll_next = NONE; + + let mut this = self.as_mut().project(); + + // Attempt to start polling, in case some waker is holding the lock, wait in loop + let (mut poll_state_value, state_bomb) = loop { + if let Some(value) = this.poll_state.start_polling() { + break value; + } + }; + + // Safety: now state is `POLLING`. + unsafe { + WrappedWaker::replace_waker(this.stream_waker, cx); + WrappedWaker::replace_waker(this.inner_streams_waker, cx) + }; + + if poll_state_value & NEED_TO_POLL_STREAM != NONE { + let mut stream_waker = None; + + // Here we need to poll the base stream. + // + // To improve performance, we will attempt to place as many items as we can + // to the `FuturesUnordered` bucket before polling inner streams + loop { + if this.is_exceeded_limit() || *this.is_stream_done { + // We either exceeded the limit or the stream is exhausted + if !*this.is_stream_done { + // The stream needs to be polled in the next iteration + need_to_poll_next |= NEED_TO_POLL_STREAM; + } + + break; + } else { + let mut cx = Context::from_waker( + stream_waker.get_or_insert_with(|| waker(this.stream_waker.clone())), + ); + + match this.stream.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(item)) => { + let next_item_fut = match Fc::next_step(item) { + // Propagates an item immediately (the main use-case is for errors) + FlowStep::Return(item) => { + need_to_poll_next |= NEED_TO_POLL_STREAM + | (poll_state_value & NEED_TO_POLL_INNER_STREAMS); + poll_state_value &= !NEED_TO_POLL_INNER_STREAMS; + + next_item = Some(item); + + break; + } + // Yields an item and continues processing (normal case) + FlowStep::Continue(inner_stream) => { + PollStreamFut::new(inner_stream) + } + }; + // Add new stream to the inner streams bucket + this.inner_streams.as_mut().push(next_item_fut); + // Inner streams must be polled afterward + poll_state_value |= NEED_TO_POLL_INNER_STREAMS; + } + Poll::Ready(None) => { + // Mark the base stream as done + *this.is_stream_done = true; + } + Poll::Pending => { + break; + } + } + } + } + } + + if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE { + let inner_streams_waker = waker(this.inner_streams_waker.clone()); + let mut cx = Context::from_waker(&inner_streams_waker); + + match this.inner_streams.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(Some((item, next_item_fut)))) => { + // Push next inner stream item future to the list of inner streams futures + this.inner_streams.as_mut().push(next_item_fut); + // Take the received item + next_item = Some(item); + // On the next iteration, inner streams must be polled again + need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS; + } + Poll::Ready(Some(None)) => { + // On the next iteration, inner streams must be polled again + need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS; + } + _ => {} + } + } + + // We didn't have any `poll_next` panic, so it's time to deactivate the bomb + state_bomb.deactivate(); + + // Call the waker at the end of polling if + let mut force_wake = + // we need to poll the stream and didn't reach the limit yet + need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() + // or we need to poll the inner streams again + || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE; + + // Stop polling and swap the latest state + poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake); + // If state was changed during `POLLING` phase, we also need to manually call a waker + force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE; + + let is_done = *this.is_stream_done && this.inner_streams.is_empty(); + + if next_item.is_some() || is_done { + Poll::Ready(next_item) + } else { + if force_wake { + cx.waker().wake_by_ref(); + } + + Poll::Pending + } + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<St, Item, Fc> Sink<Item> for FlattenUnorderedWithFlowController<St, Fc> +where + St: Stream + Sink<Item>, +{ + type Error = St::Error; + + delegate_sink!(stream, Item); +} diff --git a/vendor/futures-util/src/stream/stream/mod.rs b/vendor/futures-util/src/stream/stream/mod.rs index 9cfcc09ba..558dc22bd 100644 --- a/vendor/futures-util/src/stream/stream/mod.rs +++ b/vendor/futures-util/src/stream/stream/mod.rs @@ -199,6 +199,25 @@ pub use self::buffered::Buffered; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] +pub(crate) mod flatten_unordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] +pub use self::flatten_unordered::FlattenUnordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +delegate_all!( + /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. + FlatMapUnordered<St, U, F>( + FlattenUnordered<Map<St, F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)] + where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U +); + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] mod for_each_concurrent; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] @@ -390,9 +409,9 @@ pub trait StreamExt: Stream { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(1..=10); - /// let evens = stream.filter(|x| future::ready(x % 2 == 0)); + /// let events = stream.filter(|x| future::ready(x % 2 == 0)); /// - /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await); + /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await); /// # }); /// ``` fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> @@ -422,11 +441,11 @@ pub trait StreamExt: Stream { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(1..=10); - /// let evens = stream.filter_map(|x| async move { + /// let events = stream.filter_map(|x| async move { /// if x % 2 == 0 { Some(x + 1) } else { None } /// }); /// - /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await); + /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await); /// # }); /// ``` fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> @@ -754,13 +773,64 @@ pub trait StreamExt: Stream { assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) } + /// Flattens a stream of streams into just one continuous stream. Polls + /// inner streams produced by the base stream concurrently. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::StreamExt; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(1).unwrap(); + /// tx1.unbounded_send(2).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(3).unwrap(); + /// tx2.unbounded_send(4).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(rx1).unwrap(); + /// tx3.unbounded_send(rx2).unwrap(); + /// }); + /// + /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await; + /// output.sort(); + /// + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> + where + Self::Item: Stream + Unpin, + Self: Sized, + { + assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into())) + } + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. /// /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, /// you would have to chain combinators like `.map(f).flatten()` while this /// combinator provides ability to write `.flat_map(f)` instead of chaining. /// - /// The provided closure which produce inner streams is executed over all elements + /// The provided closure which produces inner streams is executed over all elements /// of stream as last inner stream is terminated and next stream item is available. /// /// Note that this function consumes the stream passed into it and returns a @@ -788,6 +858,59 @@ pub trait StreamExt: Stream { assert_stream::<U::Item, _>(FlatMap::new(self, f)) } + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s + /// and polls them concurrently, yielding items in any order, as they made + /// available. + /// + /// [`StreamExt::map`] is very useful, but if it produces `Stream`s + /// instead, and you need to poll all of them concurrently, you would + /// have to use something like `for_each_concurrent` and merge values + /// by hand. This combinator provides ability to collect all values + /// from concurrently polled streams into one stream. + /// + /// The first argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// The provided closure which produces inner streams is executed over + /// all elements of stream as next stream item is available and limit + /// of concurrently processed streams isn't exceeded. + /// + /// Note that this function consumes the stream passed into it and + /// returns a wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..5); + /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x])); + /// let mut values = stream.collect::<Vec<_>>().await; + /// values.sort(); + /// + /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn flat_map_unordered<U, F>( + self, + limit: impl Into<Option<usize>>, + f: F, + ) -> FlatMapUnordered<Self, U, F> + where + U: Stream + Unpin, + F: FnMut(Self::Item) -> U, + Self: Sized, + { + assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f)) + } + /// Combinator similar to [`StreamExt::fold`] that holds internal state /// and produces a new stream. /// @@ -1397,8 +1520,7 @@ pub trait StreamExt: Stream { /// be immediately returned. /// /// If the underlying stream ended and only a partial vector was created, - /// it'll be returned. Additionally if an error happens from the underlying - /// stream then the currently buffered items will be yielded. + /// it will be returned. /// /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. @@ -1558,6 +1680,8 @@ pub trait StreamExt: Stream { /// assert_eq!(total, 6); /// # }); /// ``` + /// + /// [`select!`]: crate::select fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream, diff --git a/vendor/futures-util/src/stream/stream/peek.rs b/vendor/futures-util/src/stream/stream/peek.rs index c72dfc366..ea3d6243f 100644 --- a/vendor/futures-util/src/stream/stream/peek.rs +++ b/vendor/futures-util/src/stream/stream/peek.rs @@ -204,7 +204,7 @@ impl<S: Stream> Stream for Peekable<S> { } fn size_hint(&self) -> (usize, Option<usize>) { - let peek_len = if self.peeked.is_some() { 1 } else { 0 }; + let peek_len = usize::from(self.peeked.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(peek_len); let upper = match upper { diff --git a/vendor/futures-util/src/stream/stream/ready_chunks.rs b/vendor/futures-util/src/stream/stream/ready_chunks.rs index 5ebc9582d..192054c4a 100644 --- a/vendor/futures-util/src/stream/stream/ready_chunks.rs +++ b/vendor/futures-util/src/stream/stream/ready_chunks.rs @@ -1,6 +1,5 @@ -use crate::stream::Fuse; +use crate::stream::{Fuse, StreamExt}; use alloc::vec::Vec; -use core::mem; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -15,23 +14,15 @@ pin_project! { pub struct ReadyChunks<St: Stream> { #[pin] stream: Fuse<St>, - items: Vec<St::Item>, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } } -impl<St: Stream> ReadyChunks<St> -where - St: Stream, -{ +impl<St: Stream> ReadyChunks<St> { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Self { - stream: super::Fuse::new(stream), - items: Vec::with_capacity(capacity), - cap: capacity, - } + Self { stream: stream.fuse(), cap: capacity } } delegate_access_inner!(stream, St, (.)); @@ -43,40 +34,33 @@ impl<St: Stream> Stream for ReadyChunks<St> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); + let mut items: Vec<St::Item> = Vec::new(); + loop { match this.stream.as_mut().poll_next(cx) { // Flush all collected data if underlying stream doesn't contain // more ready values Poll::Pending => { - return if this.items.is_empty() { - Poll::Pending - } else { - Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap)))) - } + return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) } } // Push the ready item into the buffer and check whether it is full. // If so, replace our buffer with a new and empty one and return // the full one. Poll::Ready(Some(item)) => { - this.items.push(item); - if this.items.len() >= *this.cap { - return Poll::Ready(Some(mem::replace( - this.items, - Vec::with_capacity(*this.cap), - ))); + if items.is_empty() { + items.reserve(*this.cap); + } + items.push(item); + if items.len() >= *this.cap { + return Poll::Ready(Some(items)); } } // Since the underlying stream ran out of values, return what we // have buffered, if we have anything. Poll::Ready(None) => { - let last = if this.items.is_empty() { - None - } else { - let full_buf = mem::replace(this.items, Vec::new()); - Some(full_buf) - }; + let last = if items.is_empty() { None } else { Some(items) }; return Poll::Ready(last); } @@ -85,20 +69,15 @@ impl<St: Stream> Stream for ReadyChunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); - let upper = match upper { - Some(x) => x.checked_add(chunk_len), - None => None, - }; + let lower = lower / self.cap; (lower, upper) } } -impl<St: FusedStream> FusedStream for ReadyChunks<St> { +impl<St: Stream> FusedStream for ReadyChunks<St> { fn is_terminated(&self) -> bool { - self.stream.is_terminated() && self.items.is_empty() + self.stream.is_terminated() } } diff --git a/vendor/futures-util/src/stream/stream/skip_while.rs b/vendor/futures-util/src/stream/stream/skip_while.rs index 50a21a21a..dabd5eefa 100644 --- a/vendor/futures-util/src/stream/stream/skip_while.rs +++ b/vendor/futures-util/src/stream/stream/skip_while.rs @@ -99,7 +99,7 @@ where if self.done_skipping { self.stream.size_hint() } else { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/stream/split.rs b/vendor/futures-util/src/stream/stream/split.rs index 3a72fee30..e2034e0c2 100644 --- a/vendor/futures-util/src/stream/stream/split.rs +++ b/vendor/futures-util/src/stream/stream/split.rs @@ -35,7 +35,7 @@ impl<S: Stream> Stream for SplitStream<S> { } } -#[allow(bad_style)] +#[allow(non_snake_case)] fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> { SplitSink { lock, slot: None } } diff --git a/vendor/futures-util/src/stream/stream/take.rs b/vendor/futures-util/src/stream/stream/take.rs index b1c728e33..29d6c39ee 100644 --- a/vendor/futures-util/src/stream/stream/take.rs +++ b/vendor/futures-util/src/stream/stream/take.rs @@ -54,11 +54,11 @@ where let (lower, upper) = self.stream.size_hint(); - let lower = cmp::min(lower, self.remaining as usize); + let lower = cmp::min(lower, self.remaining); let upper = match upper { - Some(x) if x < self.remaining as usize => Some(x), - _ => Some(self.remaining as usize), + Some(x) if x < self.remaining => Some(x), + _ => Some(self.remaining), }; (lower, upper) diff --git a/vendor/futures-util/src/stream/stream/take_while.rs b/vendor/futures-util/src/stream/stream/take_while.rs index 01b27654b..925694301 100644 --- a/vendor/futures-util/src/stream/stream/take_while.rs +++ b/vendor/futures-util/src/stream/stream/take_while.rs @@ -91,7 +91,7 @@ where return (0, Some(0)); } - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/stream/then.rs b/vendor/futures-util/src/stream/stream/then.rs index d4531d4b9..9192c0b0c 100644 --- a/vendor/futures-util/src/stream/stream/then.rs +++ b/vendor/futures-util/src/stream/stream/then.rs @@ -78,7 +78,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/vendor/futures-util/src/stream/stream/unzip.rs b/vendor/futures-util/src/stream/stream/unzip.rs index 15f22e80b..a88cf0326 100644 --- a/vendor/futures-util/src/stream/stream/unzip.rs +++ b/vendor/futures-util/src/stream/stream/unzip.rs @@ -21,7 +21,7 @@ pin_project! { impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> { fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { let this = self.project(); - (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) + (mem::take(this.left), mem::take(this.right)) } pub(super) fn new(stream: St) -> Self { diff --git a/vendor/futures-util/src/stream/stream/zip.rs b/vendor/futures-util/src/stream/stream/zip.rs index 360a8b63b..25a47e96b 100644 --- a/vendor/futures-util/src/stream/stream/zip.rs +++ b/vendor/futures-util/src/stream/stream/zip.rs @@ -102,8 +102,8 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let queued1_len = if self.queued1.is_some() { 1 } else { 0 }; - let queued2_len = if self.queued2.is_some() { 1 } else { 0 }; + let queued1_len = usize::from(self.queued1.is_some()); + let queued2_len = usize::from(self.queued2.is_some()); let (stream1_lower, stream1_upper) = self.stream1.size_hint(); let (stream2_lower, stream2_upper) = self.stream2.size_hint(); diff --git a/vendor/futures-util/src/stream/try_stream/and_then.rs b/vendor/futures-util/src/stream/try_stream/and_then.rs index a7b50db0b..2f8b6f258 100644 --- a/vendor/futures-util/src/stream/try_stream/and_then.rs +++ b/vendor/futures-util/src/stream/try_stream/and_then.rs @@ -71,7 +71,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/vendor/futures-util/src/stream/try_stream/into_async_read.rs b/vendor/futures-util/src/stream/try_stream/into_async_read.rs index 914b277a0..ffbfc7eae 100644 --- a/vendor/futures-util/src/stream/try_stream/into_async_read.rs +++ b/vendor/futures-util/src/stream/try_stream/into_async_read.rs @@ -1,30 +1,26 @@ -use crate::stream::TryStreamExt; use core::pin::Pin; use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::cmp; use std::io::{Error, Result}; -/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. -#[derive(Debug)] -#[must_use = "readers do nothing unless polled"] -#[cfg_attr(docsrs, doc(cfg(feature = "io")))] -pub struct IntoAsyncRead<St> -where - St: TryStream<Error = Error> + Unpin, - St::Ok: AsRef<[u8]>, -{ - stream: St, - state: ReadState<St::Ok>, -} - -impl<St> Unpin for IntoAsyncRead<St> -where - St: TryStream<Error = Error> + Unpin, - St::Ok: AsRef<[u8]>, -{ +pin_project! { + /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. + #[derive(Debug)] + #[must_use = "readers do nothing unless polled"] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + pub struct IntoAsyncRead<St> + where + St: TryStream<Error = Error>, + St::Ok: AsRef<[u8]>, + { + #[pin] + stream: St, + state: ReadState<St::Ok>, + } } #[derive(Debug)] @@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> { impl<St> IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { @@ -46,16 +42,18 @@ where impl<St> AsyncRead for IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>> { + let mut this = self.project(); + loop { - match &mut self.state { + match this.state { ReadState::Ready { chunk, chunk_start } => { let chunk = chunk.as_ref(); let len = cmp::min(buf.len(), chunk.len() - *chunk_start); @@ -64,23 +62,23 @@ where *chunk_start += len; if chunk.len() == *chunk_start { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } return Poll::Ready(Ok(len)); } - ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(0)); } }, @@ -94,51 +92,52 @@ where impl<St> AsyncWrite for IntoAsyncRead<St> where - St: TryStream<Error = Error> + AsyncWrite + Unpin, + St: TryStream<Error = Error> + AsyncWrite, St::Ok: AsRef<[u8]>, { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<Result<usize>> { - Pin::new(&mut self.stream).poll_write(cx, buf) + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { + let this = self.project(); + this.stream.poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - Pin::new(&mut self.stream).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + this.stream.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - Pin::new(&mut self.stream).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + this.stream.poll_close(cx) } } impl<St> AsyncBufRead for IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { - while let ReadState::PendingChunk = self.state { - match ready!(self.stream.try_poll_next_unpin(cx)) { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { + let mut this = self.project(); + + while let ReadState::PendingChunk = this.state { + match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(&[])); } } } - if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { + if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { let chunk = chunk.as_ref(); return Poll::Ready(Ok(&chunk[chunk_start..])); } @@ -147,16 +146,18 @@ where Poll::Ready(Ok(&[])) } - fn consume(mut self: Pin<&mut Self>, amount: usize) { + fn consume(self: Pin<&mut Self>, amount: usize) { + let this = self.project(); + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 if amount == 0 { return; } - if let ReadState::Ready { chunk, chunk_start } = &mut self.state { + if let ReadState::Ready { chunk, chunk_start } = this.state { *chunk_start += amount; debug_assert!(*chunk_start <= chunk.as_ref().len()); if *chunk_start >= chunk.as_ref().len() { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } } else { debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); diff --git a/vendor/futures-util/src/stream/try_stream/mod.rs b/vendor/futures-util/src/stream/try_stream/mod.rs index 455ddca3f..414a40dbe 100644 --- a/vendor/futures-util/src/stream/try_stream/mod.rs +++ b/vendor/futures-util/src/stream/try_stream/mod.rs @@ -15,6 +15,7 @@ use crate::stream::{Inspect, Map}; #[cfg(feature = "alloc")] use alloc::vec::Vec; use core::pin::Pin; + use futures_core::{ future::{Future, TryFuture}, stream::TryStream, @@ -88,6 +89,14 @@ mod try_flatten; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten::TryFlatten; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod try_flatten_unordered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_flatten_unordered::TryFlattenUnordered; + mod try_collect; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_collect::TryCollect; @@ -711,6 +720,63 @@ pub trait TryStreamExt: TryStream { assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f)) } + /// Flattens a stream of streams into just one continuous stream. Produced streams + /// will be polled concurrently and any errors will be passed through without looking at them. + /// If the underlying base stream returns an error, it will be **immediately** propagated. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::{StreamExt, TryStreamExt}; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(Ok(1)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(Ok(2)).unwrap(); + /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(Ok(rx1)).unwrap(); + /// tx3.unbounded_send(Ok(rx2)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); + /// }); + /// + /// let stream = rx3.try_flatten_unordered(None); + /// let mut values: Vec<_> = stream.collect().await; + /// values.sort(); + /// + /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self> + where + Self::Ok: TryStream + Unpin, + <Self::Ok as TryStream>::Error: From<Self::Error>, + Self: Sized, + { + assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( + TryFlattenUnordered::new(self, limit), + ) + } + /// Flattens a stream of streams into just one continuous stream. /// /// If this stream's elements are themselves streams then this combinator @@ -736,17 +802,21 @@ pub trait TryStreamExt: TryStream { /// thread::spawn(move || { /// tx2.unbounded_send(Ok(2)).unwrap(); /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); /// }); /// thread::spawn(move || { /// tx3.unbounded_send(Ok(rx1)).unwrap(); /// tx3.unbounded_send(Ok(rx2)).unwrap(); - /// tx3.unbounded_send(Err(4)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); /// }); /// /// let mut stream = rx3.try_flatten(); /// assert_eq!(stream.next().await, Some(Ok(1))); /// assert_eq!(stream.next().await, Some(Ok(2))); /// assert_eq!(stream.next().await, Some(Err(3))); + /// assert_eq!(stream.next().await, Some(Ok(4))); + /// assert_eq!(stream.next().await, Some(Err(5))); + /// assert_eq!(stream.next().await, None); /// # }); /// ``` fn try_flatten(self) -> TryFlatten<Self> @@ -914,7 +984,7 @@ pub trait TryStreamExt: TryStream { /// that matches the stream's `Error` type. /// /// This adaptor will buffer up to `n` futures and then return their - /// outputs in the order. If the underlying stream returns an error, it will + /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will /// be immediately propagated. /// /// The returned stream will be a stream of results, each containing either @@ -1001,6 +1071,7 @@ pub trait TryStreamExt: TryStream { /// Wraps a [`TryStream`] into a stream compatible with libraries using /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled. /// ``` + /// # if cfg!(miri) { return; } // Miri does not support epoll /// use futures::future::{FutureExt, TryFutureExt}; /// # let (tx, rx) = futures::channel::oneshot::channel(); /// @@ -1026,12 +1097,7 @@ pub trait TryStreamExt: TryStream { Compat::new(self) } - /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead). - /// - /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be - /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll - /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`] - /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate. + /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). /// /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. @@ -1043,12 +1109,12 @@ pub trait TryStreamExt: TryStream { /// use futures::stream::{self, TryStreamExt}; /// use futures::io::AsyncReadExt; /// - /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]); + /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); /// let mut reader = stream.into_async_read(); - /// let mut buf = Vec::new(); /// - /// assert!(reader.read_to_end(&mut buf).await.is_ok()); - /// assert_eq!(buf, &[1, 2, 3, 4, 5]); + /// let mut buf = Vec::new(); + /// reader.read_to_end(&mut buf).await.unwrap(); + /// assert_eq!(buf, [1, 2, 3, 4, 5]); /// # }) /// ``` #[cfg(feature = "io")] @@ -1056,7 +1122,7 @@ pub trait TryStreamExt: TryStream { #[cfg(feature = "std")] fn into_async_read(self) -> IntoAsyncRead<Self> where - Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin, + Self: Sized + TryStreamExt<Error = std::io::Error>, Self::Ok: AsRef<[u8]>, { crate::io::assert_read(IntoAsyncRead::new(self)) diff --git a/vendor/futures-util/src/stream/try_stream/or_else.rs b/vendor/futures-util/src/stream/try_stream/or_else.rs index cb69e8132..53aceb8e6 100644 --- a/vendor/futures-util/src/stream/try_stream/or_else.rs +++ b/vendor/futures-util/src/stream/try_stream/or_else.rs @@ -75,7 +75,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/vendor/futures-util/src/stream/try_stream/try_buffered.rs b/vendor/futures-util/src/stream/try_stream/try_buffered.rs index 45bd3f8c7..9f48e5c0a 100644 --- a/vendor/futures-util/src/stream/try_stream/try_buffered.rs +++ b/vendor/futures-util/src/stream/try_stream/try_buffered.rs @@ -54,7 +54,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/vendor/futures-util/src/stream/try_stream/try_chunks.rs b/vendor/futures-util/src/stream/try_stream/try_chunks.rs index 07d4425a8..ec53f4bd1 100644 --- a/vendor/futures-util/src/stream/try_stream/try_chunks.rs +++ b/vendor/futures-util/src/stream/try_stream/try_chunks.rs @@ -41,9 +41,10 @@ impl<St: TryStream> TryChunks<St> { delegate_access_inner!(stream, St, (. .)); } +type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>; + impl<St: TryStream> Stream for TryChunks<St> { - #[allow(clippy::type_complexity)] - type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>; + type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.as_mut().project(); @@ -70,7 +71,7 @@ impl<St: TryStream> Stream for TryChunks<St> { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -81,9 +82,9 @@ impl<St: TryStream> Stream for TryChunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/vendor/futures-util/src/stream/try_stream/try_collect.rs b/vendor/futures-util/src/stream/try_stream/try_collect.rs index 5d3b3d766..3e5963f03 100644 --- a/vendor/futures-util/src/stream/try_stream/try_collect.rs +++ b/vendor/futures-util/src/stream/try_stream/try_collect.rs @@ -45,7 +45,7 @@ where Poll::Ready(Ok(loop { match ready!(this.stream.as_mut().try_poll_next(cx)?) { Some(x) => this.items.extend(Some(x)), - None => break mem::replace(this.items, Default::default()), + None => break mem::take(this.items), } })) } diff --git a/vendor/futures-util/src/stream/try_stream/try_filter.rs b/vendor/futures-util/src/stream/try_stream/try_filter.rs index 61e6105c3..11d58243f 100644 --- a/vendor/futures-util/src/stream/try_stream/try_filter.rs +++ b/vendor/futures-util/src/stream/try_stream/try_filter.rs @@ -90,7 +90,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_fut.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_fut.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/try_stream/try_filter_map.rs b/vendor/futures-util/src/stream/try_stream/try_filter_map.rs index bb1b5b9db..ed1201732 100644 --- a/vendor/futures-util/src/stream/try_stream/try_filter_map.rs +++ b/vendor/futures-util/src/stream/try_stream/try_filter_map.rs @@ -84,7 +84,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs new file mode 100644 index 000000000..a74dfc451 --- /dev/null +++ b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -0,0 +1,176 @@ +use core::marker::PhantomData; +use core::pin::Pin; + +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +use pin_project_lite::pin_project; + +use crate::future::Either; +use crate::stream::stream::flatten_unordered::{ + FlattenUnorderedWithFlowController, FlowController, FlowStep, +}; +use crate::stream::IntoStream; +use crate::TryStreamExt; + +delegate_all!( + /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. + TryFlattenUnordered<St>( + FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + + New[ + |stream: St, limit: impl Into<Option<usize>>| + FlattenUnorderedWithFlowController::new( + NestedTryStreamIntoEitherTryStream::new(stream), + limit.into() + ) + ] + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + <St::Ok as TryStream>::Error: From<St::Error> +); + +pin_project! { + /// Emits either successful streams or single-item streams containing the underlying errors. + /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct NestedTryStreamIntoEitherTryStream<St> + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + <St::Ok as TryStream>::Error: From<St::Error> + { + #[pin] + stream: St + } +} + +impl<St> NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn new(stream: St) -> Self { + Self { stream } + } + + delegate_access_inner!(stream, St, ()); +} + +/// Emits a single item immediately, then stream will be terminated. +#[derive(Debug, Clone)] +pub struct Single<T>(Option<T>); + +impl<T> Single<T> { + /// Constructs new `Single` with the given value. + fn new(val: T) -> Self { + Self(Some(val)) + } + + /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated. + fn next_immediate(&mut self) -> Option<T> { + self.0.take() + } +} + +impl<T> Unpin for Single<T> {} + +impl<T> Stream for Single<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(self.0.take()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) + } +} + +/// Immediately propagates errors occurred in the base stream. +#[derive(Debug, Clone, Copy)] +pub struct PropagateBaseStreamError<St>(PhantomData<St>); + +type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item; +type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item; + +impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> { + match item { + // A new successful inner stream received + st @ Either::Left(_) => FlowStep::Continue(st), + // An error encountered + Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()), + } + } +} + +type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>; + +impl<St> Stream for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + // Item is either an inner stream or a stream containing a single error. + // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. + type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let item = ready!(self.project().stream.try_poll_next(cx)); + + let out = match item { + Some(res) => match res { + // Emit successful inner stream as is + Ok(stream) => Either::Left(stream.into_stream()), + // Wrap an error into a stream containing a single item + err @ Err(_) => { + let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); + + Either::Right(Single::new(res)) + } + }, + None => return Poll::Ready(None), + }; + + Poll::Ready(Some(out)) + } +} + +impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream + FusedStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream + Sink<Item>, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>, +{ + type Error = <St as Sink<Item>>::Error; + + delegate_sink!(stream, Item); +} diff --git a/vendor/futures-util/src/stream/try_stream/try_skip_while.rs b/vendor/futures-util/src/stream/try_stream/try_skip_while.rs index a424b6c5b..52aa2d478 100644 --- a/vendor/futures-util/src/stream/try_stream/try_skip_while.rs +++ b/vendor/futures-util/src/stream/try_stream/try_skip_while.rs @@ -87,7 +87,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/try_stream/try_take_while.rs b/vendor/futures-util/src/stream/try_stream/try_take_while.rs index 3375960ef..4b5ff1ad3 100644 --- a/vendor/futures-util/src/stream/try_stream/try_take_while.rs +++ b/vendor/futures-util/src/stream/try_stream/try_take_while.rs @@ -96,7 +96,7 @@ where return (0, Some(0)); } - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), |