summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util/src/stream/futures_unordered/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures-util/src/stream/futures_unordered/mod.rs')
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/mod.rs61
1 files changed, 24 insertions, 37 deletions
diff --git a/vendor/futures-util/src/stream/futures_unordered/mod.rs b/vendor/futures-util/src/stream/futures_unordered/mod.rs
index aab2bb446..6b5804dc4 100644
--- a/vendor/futures-util/src/stream/futures_unordered/mod.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/mod.rs
@@ -6,7 +6,6 @@
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
-use core::cmp;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
@@ -23,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;
mod iter;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};
mod task;
@@ -31,35 +31,11 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};
-/// Constant used for a `FuturesUnordered` to determine how many times it is
-/// allowed to poll underlying futures without yielding.
-///
-/// A single call to `poll_next` may potentially do a lot of work before
-/// yielding. This happens in particular if the underlying futures are awoken
-/// frequently but continue to return `Pending`. This is problematic if other
-/// tasks are waiting on the executor, since they do not get to run. This value
-/// caps the number of calls to `poll` on underlying futures a single call to
-/// `poll_next` is allowed to make.
-///
-/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
-/// that amortize wakeup and scheduling costs, but low enough that we do not
-/// starve other tasks for long.
-///
-/// See also https://github.com/rust-lang/futures-rs/issues/2047.
-///
-/// Note that using the length of the `FuturesUnordered` instead of this value
-/// may cause problems if the number of futures is large.
-/// See also https://github.com/rust-lang/futures-rs/pull/2527.
-///
-/// Additionally, polling the same future twice per iteration may cause another
-/// problem. So, when using this value, it is necessary to limit the max value
-/// based on the length of the `FuturesUnordered`.
-/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
-/// See also https://github.com/rust-lang/futures-rs/pull/2333.
-const YIELD_EVERY: usize = 32;
-
/// A set of futures which may complete in any order.
///
+/// See [`FuturesOrdered`](crate::stream::FuturesOrdered) for a version of this
+/// type that preserves a FIFO order.
+///
/// This structure is optimized to manage a large number of futures.
/// Futures managed by [`FuturesUnordered`] will only be polled when they
/// generate wake-up notifications. This reduces the required amount of work
@@ -149,8 +125,9 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Weak::new(),
+ woken: AtomicBool::new(false),
});
- let stub_ptr = &*stub as *const Task<Fut>;
+ let stub_ptr = Arc::as_ptr(&stub);
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
waker: AtomicWaker::new(),
head: AtomicPtr::new(stub_ptr as *mut _),
@@ -195,6 +172,7 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
+ woken: AtomicBool::new(false),
});
// Reset the `is_terminated` flag if we've previously marked ourselves
@@ -403,7 +381,7 @@ impl<Fut> FuturesUnordered<Fut> {
// The `ReadyToRunQueue` stub is never inserted into the `head_all`
// list, and its pointer value will remain valid for the lifetime of
// this `FuturesUnordered`, so we can make use of its value here.
- &*self.ready_to_run_queue.stub as *const _ as *mut _
+ Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _
}
}
@@ -411,12 +389,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- // See YIELD_EVERY docs for more.
- let yield_every = cmp::min(self.len(), YIELD_EVERY);
+ let len = self.len();
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
+ let mut yielded = 0;
// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());
@@ -527,7 +505,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// the internal allocation, appropriately accessing fields and
// deallocating the task if need be.
let res = {
- let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
+ let task = bomb.task.as_ref().unwrap();
+ // We are only interested in whether the future is awoken before it
+ // finishes polling, so reset the flag here.
+ task.woken.store(false, Relaxed);
+ let waker = Task::waker_ref(task);
let mut cx = Context::from_waker(&waker);
// Safety: We won't move the future ever again
@@ -540,12 +522,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
+ // If the future was awoken during polling, we assume
+ // the future wanted to explicitly yield.
+ yielded += task.woken.load(Relaxed) as usize;
bomb.queue.link(task);
- if polled == yield_every {
- // We have polled a large number of futures in a row without yielding.
- // To ensure we do not starve other tasks waiting on the executor,
- // we yield here, but immediately wake ourselves up to continue.
+ // If a future yields, we respect it and yield here.
+ // If all futures have been polled, we also yield here to
+ // avoid starving other tasks waiting on the executor.
+ // (polling the same future twice per iteration may cause
+ // the problem: https://github.com/rust-lang/futures-rs/pull/2333)
+ if yielded >= 2 || polled == len {
cx.waker().wake_by_ref();
return Poll::Pending;
}