summaryrefslogtreecommitdiffstats
path: root/vendor/futures-executor
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:39 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:39 +0000
commit1376c5a617be5c25655d0d7cb63e3beaa5a6e026 (patch)
tree3bb8d61aee02bc7a15eab3f36e3b921afc2075d0 /vendor/futures-executor
parentReleasing progress-linux version 1.69.0+dfsg1-1~progress7.99u1. (diff)
downloadrustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.tar.xz
rustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.zip
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/futures-executor')
-rw-r--r--vendor/futures-executor/.cargo-checksum.json2
-rw-r--r--vendor/futures-executor/Cargo.toml33
-rw-r--r--vendor/futures-executor/README.md2
-rw-r--r--vendor/futures-executor/src/enter.rs2
-rw-r--r--vendor/futures-executor/src/local_pool.rs106
-rw-r--r--vendor/futures-executor/src/thread_pool.rs33
-rw-r--r--vendor/futures-executor/tests/local_pool.rs66
7 files changed, 164 insertions, 80 deletions
diff --git a/vendor/futures-executor/.cargo-checksum.json b/vendor/futures-executor/.cargo-checksum.json
index a8ad6b92b..00c487fc0 100644
--- a/vendor/futures-executor/.cargo-checksum.json
+++ b/vendor/futures-executor/.cargo-checksum.json
@@ -1 +1 @@
-{"files":{"Cargo.toml":"c6d60a83b1a88b21d0173ac269f6811d42618d8a216e14bfb32d56347871747a","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"151d3753b1ae87a1e1b1604c001ab8b2a5041b0e90ed09ea18d792081c424370","benches/thread_notify.rs":"e601968527bee85766f32d2d11de5ed8f6b4bd5a29989b5c369a52bd3cd3d024","src/enter.rs":"c1a771f373b469d98e2599d8e37da7d7a7083c30332d643f37867f86406ab1e2","src/lib.rs":"08a25594c789cb4ce1c8929a9ddd745e67fee1db373e011a7ebe135933522614","src/local_pool.rs":"1661a58468491d714a358b6382df88bbd7557e19506009763f841cbcf85781f5","src/thread_pool.rs":"206d5c9d16857d6b2cc9aecb63cd1c9859177b2eaea9b1d7055f5c42bd1ce33f","src/unpark_mutex.rs":"e186464d9bdec22a6d1e1d900ed03a1154e6b0d422ede9bd3b768657cdbb6113","tests/local_pool.rs":"c7f870582a29cdb6ebbb3a325ddb8485c61efac80fb96656003162294f4ec923"},"package":"29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a"} \ No newline at end of file
+{"files":{"Cargo.toml":"dac1d16ebb659583c1092ed30905ea278db9b6a291a4f44e40bc25bd19997b70","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"05ba6a5490962c4df45b78e9ad928a29dd5c3fad749284d5b812ca7e765feb6d","benches/thread_notify.rs":"e601968527bee85766f32d2d11de5ed8f6b4bd5a29989b5c369a52bd3cd3d024","src/enter.rs":"e3e890a8fa649e76cd2ce915abb11b67d15f3c5ae5e8e374142e0363917b2406","src/lib.rs":"08a25594c789cb4ce1c8929a9ddd745e67fee1db373e011a7ebe135933522614","src/local_pool.rs":"78177af55564fdfcfdc9f3974afe7d9d0682a7e4654761d83a8fc02abb34a7dc","src/thread_pool.rs":"e52f8527bc37c511513d77d183b44e3991a7b324aaed5d17bee0d092cf448a5b","src/unpark_mutex.rs":"e186464d9bdec22a6d1e1d900ed03a1154e6b0d422ede9bd3b768657cdbb6113","tests/local_pool.rs":"9639c9a290e23faab3913c6fec190853f890defaed6ffe67de177eca5d88932a"},"package":"ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"} \ No newline at end of file
diff --git a/vendor/futures-executor/Cargo.toml b/vendor/futures-executor/Cargo.toml
index 50632b1e4..c254b8ba4 100644
--- a/vendor/futures-executor/Cargo.toml
+++ b/vendor/futures-executor/Cargo.toml
@@ -11,26 +11,34 @@
[package]
edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
name = "futures-executor"
-version = "0.3.19"
-description = "Executors for asynchronous tasks based on the futures-rs library.\n"
+version = "0.3.28"
+description = """
+Executors for asynchronous tasks based on the futures-rs library.
+"""
homepage = "https://rust-lang.github.io/futures-rs"
+readme = "README.md"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+
[dependencies.futures-core]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.futures-task]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.futures-util]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.num_cpus]
@@ -41,5 +49,12 @@ optional = true
[features]
default = ["std"]
-std = ["futures-core/std", "futures-task/std", "futures-util/std"]
-thread-pool = ["std", "num_cpus"]
+std = [
+ "futures-core/std",
+ "futures-task/std",
+ "futures-util/std",
+]
+thread-pool = [
+ "std",
+ "num_cpus",
+]
diff --git a/vendor/futures-executor/README.md b/vendor/futures-executor/README.md
index 67086851e..724ff5bb3 100644
--- a/vendor/futures-executor/README.md
+++ b/vendor/futures-executor/README.md
@@ -11,7 +11,7 @@ Add this to your `Cargo.toml`:
futures-executor = "0.3"
```
-The current `futures-executor` requires Rust 1.45 or later.
+The current `futures-executor` requires Rust 1.56 or later.
## License
diff --git a/vendor/futures-executor/src/enter.rs b/vendor/futures-executor/src/enter.rs
index 5895a9efb..cb58c30bb 100644
--- a/vendor/futures-executor/src/enter.rs
+++ b/vendor/futures-executor/src/enter.rs
@@ -34,7 +34,7 @@ impl std::error::Error for EnterError {}
/// executor.
///
/// Executor implementations should call this function before beginning to
-/// execute a tasks, and drop the returned [`Enter`](Enter) value after
+/// execute a task, and drop the returned [`Enter`](Enter) value after
/// completing task execution:
///
/// ```
diff --git a/vendor/futures-executor/src/local_pool.rs b/vendor/futures-executor/src/local_pool.rs
index bee96d8db..8a9bc2fc9 100644
--- a/vendor/futures-executor/src/local_pool.rs
+++ b/vendor/futures-executor/src/local_pool.rs
@@ -63,7 +63,7 @@ thread_local! {
impl ArcWake for ThreadNotify {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Make sure the wakeup is remembered until the next `park()`.
- let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
+ let unparked = arc_self.unparked.swap(true, Ordering::Release);
if !unparked {
// If the thread has not been unparked yet, it must be done
// now. If it was actually parked, it will run again,
@@ -90,33 +90,21 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
if let Poll::Ready(t) = f(&mut cx) {
return t;
}
- // Consume the wakeup that occurred while executing `f`, if any.
- let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
- if !unparked {
+
+ // Wait for a wakeup.
+ while !thread_notify.unparked.swap(false, Ordering::Acquire) {
// No wakeup occurred. It may occur now, right before parking,
// but in that case the token made available by `unpark()`
// is guaranteed to still be available and `park()` is a no-op.
thread::park();
- // When the thread is unparked, `unparked` will have been set
- // and needs to be unset before the next call to `f` to avoid
- // a redundant loop iteration.
- thread_notify.unparked.store(false, Ordering::Release);
}
}
})
}
-fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
- let _enter = enter().expect(
- "cannot execute `LocalPool` executor from within \
- another executor",
- );
-
- CURRENT_THREAD_NOTIFY.with(|thread_notify| {
- let waker = waker_ref(thread_notify);
- let mut cx = Context::from_waker(&waker);
- f(&mut cx)
- })
+/// Check for a wakeup, but don't consume it.
+fn woken() -> bool {
+ CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
}
impl LocalPool {
@@ -212,20 +200,26 @@ impl LocalPool {
/// further use of one of the pool's run or poll methods.
/// Though only one task will be completed, progress may be made on multiple tasks.
pub fn try_run_one(&mut self) -> bool {
- poll_executor(|ctx| {
+ run_executor(|cx| {
loop {
- let ret = self.poll_pool_once(ctx);
-
- // return if we have executed a future
- if let Poll::Ready(Some(_)) = ret {
- return true;
+ self.drain_incoming();
+
+ match self.pool.poll_next_unpin(cx) {
+ // Success!
+ Poll::Ready(Some(())) => return Poll::Ready(true),
+ // The pool was empty.
+ Poll::Ready(None) => return Poll::Ready(false),
+ Poll::Pending => (),
}
- // if there are no new incoming futures
- // then there is no feature that can make progress
- // and we can return without having completed a single future
- if self.incoming.borrow().is_empty() {
- return false;
+ if !self.incoming.borrow().is_empty() {
+ // New tasks were spawned; try again.
+ continue;
+ } else if woken() {
+ // The pool yielded to us, but there's more progress to be made.
+ return Poll::Pending;
+ } else {
+ return Poll::Ready(false);
}
}
})
@@ -257,44 +251,52 @@ impl LocalPool {
/// of the pool's run or poll methods. While the function is running, all tasks
/// in the pool will try to make progress.
pub fn run_until_stalled(&mut self) {
- poll_executor(|ctx| {
- let _ = self.poll_pool(ctx);
+ run_executor(|cx| match self.poll_pool(cx) {
+ // The pool is empty.
+ Poll::Ready(()) => Poll::Ready(()),
+ Poll::Pending => {
+ if woken() {
+ Poll::Pending
+ } else {
+ // We're stalled for now.
+ Poll::Ready(())
+ }
+ }
});
}
- // Make maximal progress on the entire pool of spawned task, returning `Ready`
- // if the pool is empty and `Pending` if no further progress can be made.
+ /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
+ /// Repeat until either the pool is empty, or it returns `Pending`.
+ ///
+ /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
+ ///
+ /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
+ /// mean that the pool can't make progress.
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
- // state for the FuturesUnordered, which will never be used
loop {
- let ret = self.poll_pool_once(cx);
+ self.drain_incoming();
- // we queued up some new tasks; add them and poll again
+ let pool_ret = self.pool.poll_next_unpin(cx);
+
+ // We queued up some new tasks; add them and poll again.
if !self.incoming.borrow().is_empty() {
continue;
}
- // no queued tasks; we may be done
- match ret {
- Poll::Pending => return Poll::Pending,
+ match pool_ret {
+ Poll::Ready(Some(())) => continue,
Poll::Ready(None) => return Poll::Ready(()),
- _ => {}
+ Poll::Pending => return Poll::Pending,
}
}
}
- // Try make minimal progress on the pool of spawned tasks
- fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
- // empty the incoming queue of newly-spawned tasks
- {
- let mut incoming = self.incoming.borrow_mut();
- for task in incoming.drain(..) {
- self.pool.push(task)
- }
+ /// Empty the incoming queue of newly-spawned tasks.
+ fn drain_incoming(&mut self) {
+ let mut incoming = self.incoming.borrow_mut();
+ for task in incoming.drain(..) {
+ self.pool.push(task)
}
-
- // try to execute the next ready future
- self.pool.poll_next_unpin(cx)
}
}
diff --git a/vendor/futures-executor/src/thread_pool.rs b/vendor/futures-executor/src/thread_pool.rs
index 5e1f586eb..537100895 100644
--- a/vendor/futures-executor/src/thread_pool.rs
+++ b/vendor/futures-executor/src/thread_pool.rs
@@ -108,12 +108,15 @@ impl ThreadPool {
/// completion.
///
/// ```
+ /// # {
/// use futures::executor::ThreadPool;
///
/// let pool = ThreadPool::new().unwrap();
///
/// let future = async { /* ... */ };
/// pool.spawn_ok(future);
+ /// # }
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
///
/// > **Note**: This method is similar to `SpawnExt::spawn`, except that
@@ -346,9 +349,8 @@ impl fmt::Debug for Task {
impl ArcWake for WakeHandle {
fn wake_by_ref(arc_self: &Arc<Self>) {
- match arc_self.mutex.notify() {
- Ok(task) => arc_self.exec.state.send(Message::Run(task)),
- Err(()) => {}
+ if let Ok(task) = arc_self.mutex.notify() {
+ arc_self.exec.state.send(Message::Run(task))
}
}
}
@@ -360,16 +362,19 @@ mod tests {
#[test]
fn test_drop_after_start() {
- let (tx, rx) = mpsc::sync_channel(2);
- let _cpu_pool = ThreadPoolBuilder::new()
- .pool_size(2)
- .after_start(move |_| tx.send(1).unwrap())
- .create()
- .unwrap();
-
- // After ThreadPoolBuilder is deconstructed, the tx should be dropped
- // so that we can use rx as an iterator.
- let count = rx.into_iter().count();
- assert_eq!(count, 2);
+ {
+ let (tx, rx) = mpsc::sync_channel(2);
+ let _cpu_pool = ThreadPoolBuilder::new()
+ .pool_size(2)
+ .after_start(move |_| tx.send(1).unwrap())
+ .create()
+ .unwrap();
+
+ // After ThreadPoolBuilder is deconstructed, the tx should be dropped
+ // so that we can use rx as an iterator.
+ let count = rx.into_iter().count();
+ assert_eq!(count, 2);
+ }
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
}
diff --git a/vendor/futures-executor/tests/local_pool.rs b/vendor/futures-executor/tests/local_pool.rs
index 9b1316b99..72ce74b74 100644
--- a/vendor/futures-executor/tests/local_pool.rs
+++ b/vendor/futures-executor/tests/local_pool.rs
@@ -1,7 +1,7 @@
use futures::channel::oneshot;
use futures::executor::LocalPool;
use futures::future::{self, lazy, poll_fn, Future};
-use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker};
+use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
@@ -288,7 +288,7 @@ fn run_until_stalled_runs_spawned_sub_futures() {
#[test]
fn run_until_stalled_executes_all_ready() {
- const ITER: usize = 200;
+ const ITER: usize = if cfg!(miri) { 50 } else { 200 };
const PER_ITER: usize = 3;
let cnt = Rc::new(Cell::new(0));
@@ -432,3 +432,65 @@ fn park_unpark_independence() {
futures::executor::block_on(future)
}
+
+struct SelfWaking {
+ wakeups_remaining: Rc<RefCell<usize>>,
+}
+
+impl Future for SelfWaking {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if *self.wakeups_remaining.borrow() != 0 {
+ *self.wakeups_remaining.borrow_mut() -= 1;
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ }
+}
+
+/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
+///
+/// The issue was that self-waking futures could cause `run_until_stalled`
+/// to exit early, even when progress could still be made.
+#[test]
+fn self_waking_run_until_stalled() {
+ let wakeups_remaining = Rc::new(RefCell::new(10));
+
+ let mut pool = LocalPool::new();
+ let spawner = pool.spawner();
+ for _ in 0..3 {
+ let wakeups_remaining = Rc::clone(&wakeups_remaining);
+ spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
+ }
+
+ // This should keep polling until there are no more wakeups.
+ pool.run_until_stalled();
+
+ assert_eq!(*wakeups_remaining.borrow(), 0);
+}
+
+/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
+///
+/// The issue was that self-waking futures could cause `try_run_one`
+/// to exit early, even when progress could still be made.
+#[test]
+fn self_waking_try_run_one() {
+ let wakeups_remaining = Rc::new(RefCell::new(10));
+
+ let mut pool = LocalPool::new();
+ let spawner = pool.spawner();
+ for _ in 0..3 {
+ let wakeups_remaining = Rc::clone(&wakeups_remaining);
+ spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
+ }
+
+ spawner.spawn(future::ready(())).unwrap();
+
+ // The `ready` future should complete.
+ assert!(pool.try_run_one());
+
+ // The self-waking futures are each polled once.
+ assert_eq!(*wakeups_remaining.borrow(), 7);
+}