diff options
Diffstat (limited to 'vendor/futures-util/src/stream/futures_unordered/mod.rs')
-rw-r--r-- | vendor/futures-util/src/stream/futures_unordered/mod.rs | 61 |
1 files changed, 24 insertions, 37 deletions
diff --git a/vendor/futures-util/src/stream/futures_unordered/mod.rs b/vendor/futures-util/src/stream/futures_unordered/mod.rs index aab2bb446..6b5804dc4 100644 --- a/vendor/futures-util/src/stream/futures_unordered/mod.rs +++ b/vendor/futures-util/src/stream/futures_unordered/mod.rs @@ -6,7 +6,6 @@ use crate::task::AtomicWaker; use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::cmp; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::marker::PhantomData; @@ -23,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352 pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; @@ -31,35 +31,11 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; -/// Constant used for a `FuturesUnordered` to determine how many times it is -/// allowed to poll underlying futures without yielding. -/// -/// A single call to `poll_next` may potentially do a lot of work before -/// yielding. This happens in particular if the underlying futures are awoken -/// frequently but continue to return `Pending`. This is problematic if other -/// tasks are waiting on the executor, since they do not get to run. This value -/// caps the number of calls to `poll` on underlying futures a single call to -/// `poll_next` is allowed to make. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough -/// that amortize wakeup and scheduling costs, but low enough that we do not -/// starve other tasks for long. -/// -/// See also https://github.com/rust-lang/futures-rs/issues/2047. -/// -/// Note that using the length of the `FuturesUnordered` instead of this value -/// may cause problems if the number of futures is large. -/// See also https://github.com/rust-lang/futures-rs/pull/2527. -/// -/// Additionally, polling the same future twice per iteration may cause another -/// problem. So, when using this value, it is necessary to limit the max value -/// based on the length of the `FuturesUnordered`. -/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`) -/// See also https://github.com/rust-lang/futures-rs/pull/2333. -const YIELD_EVERY: usize = 32; - /// A set of futures which may complete in any order. /// +/// See [`FuturesOrdered`](crate::stream::FuturesOrdered) for a version of this +/// type that preserves a FIFO order. +/// /// This structure is optimized to manage a large number of futures. /// Futures managed by [`FuturesUnordered`] will only be polled when they /// generate wake-up notifications. This reduces the required amount of work @@ -149,8 +125,9 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Weak::new(), + woken: AtomicBool::new(false), }); - let stub_ptr = &*stub as *const Task<Fut>; + let stub_ptr = Arc::as_ptr(&stub); let ready_to_run_queue = Arc::new(ReadyToRunQueue { waker: AtomicWaker::new(), head: AtomicPtr::new(stub_ptr as *mut _), @@ -195,6 +172,7 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), + woken: AtomicBool::new(false), }); // Reset the `is_terminated` flag if we've previously marked ourselves @@ -403,7 +381,7 @@ impl<Fut> FuturesUnordered<Fut> { // The `ReadyToRunQueue` stub is never inserted into the `head_all` // list, and its pointer value will remain valid for the lifetime of // this `FuturesUnordered`, so we can make use of its value here. - &*self.ready_to_run_queue.stub as *const _ as *mut _ + Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _ } } @@ -411,12 +389,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - // See YIELD_EVERY docs for more. - let yield_every = cmp::min(self.len(), YIELD_EVERY); + let len = self.len(); // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; + let mut yielded = 0; // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -527,7 +505,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { // the internal allocation, appropriately accessing fields and // deallocating the task if need be. let res = { - let waker = Task::waker_ref(bomb.task.as_ref().unwrap()); + let task = bomb.task.as_ref().unwrap(); + // We are only interested in whether the future is awoken before it + // finishes polling, so reset the flag here. + task.woken.store(false, Relaxed); + let waker = Task::waker_ref(task); let mut cx = Context::from_waker(&waker); // Safety: We won't move the future ever again @@ -540,12 +522,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { match res { Poll::Pending => { let task = bomb.task.take().unwrap(); + // If the future was awoken during polling, we assume + // the future wanted to explicitly yield. + yielded += task.woken.load(Relaxed) as usize; bomb.queue.link(task); - if polled == yield_every { - // We have polled a large number of futures in a row without yielding. - // To ensure we do not starve other tasks waiting on the executor, - // we yield here, but immediately wake ourselves up to continue. + // If a future yields, we respect it and yield here. + // If all futures have been polled, we also yield here to + // avoid starving other tasks waiting on the executor. + // (polling the same future twice per iteration may cause + // the problem: https://github.com/rust-lang/futures-rs/pull/2333) + if yielded >= 2 || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } |