diff options
Diffstat (limited to 'third_party/rust/futures-executor')
-rw-r--r-- | third_party/rust/futures-executor/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/futures-executor/Cargo.toml | 60 | ||||
-rw-r--r-- | third_party/rust/futures-executor/LICENSE-APACHE | 202 | ||||
-rw-r--r-- | third_party/rust/futures-executor/LICENSE-MIT | 26 | ||||
-rw-r--r-- | third_party/rust/futures-executor/README.md | 23 | ||||
-rw-r--r-- | third_party/rust/futures-executor/benches/thread_notify.rs | 109 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/enter.rs | 80 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/lib.rs | 76 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/local_pool.rs | 402 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/thread_pool.rs | 380 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/unpark_mutex.rs | 137 | ||||
-rw-r--r-- | third_party/rust/futures-executor/tests/local_pool.rs | 496 |
12 files changed, 1992 insertions, 0 deletions
diff --git a/third_party/rust/futures-executor/.cargo-checksum.json b/third_party/rust/futures-executor/.cargo-checksum.json new file mode 100644 index 0000000000..2d2a531d24 --- /dev/null +++ b/third_party/rust/futures-executor/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"b1cbe0b44fdc7e378dba72a463079f8092860863b0ef904b259f9b88cae09b34","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"151d3753b1ae87a1e1b1604c001ab8b2a5041b0e90ed09ea18d792081c424370","benches/thread_notify.rs":"e601968527bee85766f32d2d11de5ed8f6b4bd5a29989b5c369a52bd3cd3d024","src/enter.rs":"e3e890a8fa649e76cd2ce915abb11b67d15f3c5ae5e8e374142e0363917b2406","src/lib.rs":"08a25594c789cb4ce1c8929a9ddd745e67fee1db373e011a7ebe135933522614","src/local_pool.rs":"78177af55564fdfcfdc9f3974afe7d9d0682a7e4654761d83a8fc02abb34a7dc","src/thread_pool.rs":"e52f8527bc37c511513d77d183b44e3991a7b324aaed5d17bee0d092cf448a5b","src/unpark_mutex.rs":"e186464d9bdec22a6d1e1d900ed03a1154e6b0d422ede9bd3b768657cdbb6113","tests/local_pool.rs":"9639c9a290e23faab3913c6fec190853f890defaed6ffe67de177eca5d88932a"},"package":"e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e"}
\ No newline at end of file diff --git a/third_party/rust/futures-executor/Cargo.toml b/third_party/rust/futures-executor/Cargo.toml new file mode 100644 index 0000000000..6faf14d217 --- /dev/null +++ b/third_party/rust/futures-executor/Cargo.toml @@ -0,0 +1,60 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +rust-version = "1.45" +name = "futures-executor" +version = "0.3.26" +description = """ +Executors for asynchronous tasks based on the futures-rs library. +""" +homepage = "https://rust-lang.github.io/futures-rs" +readme = "README.md" +license = "MIT OR Apache-2.0" +repository = "https://github.com/rust-lang/futures-rs" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = [ + "--cfg", + "docsrs", +] + +[dependencies.futures-core] +version = "0.3.26" +default-features = false + +[dependencies.futures-task] +version = "0.3.26" +default-features = false + +[dependencies.futures-util] +version = "0.3.26" +default-features = false + +[dependencies.num_cpus] +version = "1.8.0" +optional = true + +[dev-dependencies] + +[features] +default = ["std"] +std = [ + "futures-core/std", + "futures-task/std", + "futures-util/std", +] +thread-pool = [ + "std", + "num_cpus", +] diff --git a/third_party/rust/futures-executor/LICENSE-APACHE b/third_party/rust/futures-executor/LICENSE-APACHE new file mode 100644 index 0000000000..9eb0b097f5 --- /dev/null +++ b/third_party/rust/futures-executor/LICENSE-APACHE @@ -0,0 +1,202 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright (c) 2016 Alex Crichton +Copyright (c) 2017 The Tokio Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/third_party/rust/futures-executor/LICENSE-MIT b/third_party/rust/futures-executor/LICENSE-MIT new file mode 100644 index 0000000000..8ad082ec4f --- /dev/null +++ b/third_party/rust/futures-executor/LICENSE-MIT @@ -0,0 +1,26 @@ +Copyright (c) 2016 Alex Crichton +Copyright (c) 2017 The Tokio Authors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/futures-executor/README.md b/third_party/rust/futures-executor/README.md new file mode 100644 index 0000000000..67086851e4 --- /dev/null +++ b/third_party/rust/futures-executor/README.md @@ -0,0 +1,23 @@ +# futures-executor + +Executors for asynchronous tasks based on the futures-rs library. + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +futures-executor = "0.3" +``` + +The current `futures-executor` requires Rust 1.45 or later. + +## License + +Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or +[MIT license](LICENSE-MIT) at your option. + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall +be dual licensed as above, without any additional terms or conditions. diff --git a/third_party/rust/futures-executor/benches/thread_notify.rs b/third_party/rust/futures-executor/benches/thread_notify.rs new file mode 100644 index 0000000000..88d0447cf6 --- /dev/null +++ b/third_party/rust/futures-executor/benches/thread_notify.rs @@ -0,0 +1,109 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::executor::block_on; +use futures::future::Future; +use futures::task::{Context, Poll, Waker}; +use std::pin::Pin; + +#[bench] +fn thread_yield_single_thread_one_wait(b: &mut Bencher) { + const NUM: usize = 10_000; + + struct Yield { + rem: usize, + } + + impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if self.rem == 0 { + Poll::Ready(()) + } else { + self.rem -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + b.iter(|| { + let y = Yield { rem: NUM }; + block_on(y); + }); +} + +#[bench] +fn thread_yield_single_thread_many_wait(b: &mut Bencher) { + const NUM: usize = 10_000; + + struct Yield { + rem: usize, + } + + impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if self.rem == 0 { + Poll::Ready(()) + } else { + self.rem -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + b.iter(|| { + for _ in 0..NUM { + let y = Yield { rem: 1 }; + block_on(y); + } + }); +} + +#[bench] +fn thread_yield_multi_thread(b: &mut Bencher) { + use std::sync::mpsc; + use std::thread; + + const NUM: usize = 1_000; + + let (tx, rx) = mpsc::sync_channel::<Waker>(10_000); + + struct Yield { + rem: usize, + tx: mpsc::SyncSender<Waker>, + } + impl Unpin for Yield {} + + impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if self.rem == 0 { + Poll::Ready(()) + } else { + self.rem -= 1; + self.tx.send(cx.waker().clone()).unwrap(); + Poll::Pending + } + } + } + + thread::spawn(move || { + while let Ok(task) = rx.recv() { + task.wake(); + } + }); + + b.iter(move || { + let y = Yield { rem: NUM, tx: tx.clone() }; + + block_on(y); + }); +} diff --git a/third_party/rust/futures-executor/src/enter.rs b/third_party/rust/futures-executor/src/enter.rs new file mode 100644 index 0000000000..cb58c30bb7 --- /dev/null +++ b/third_party/rust/futures-executor/src/enter.rs @@ -0,0 +1,80 @@ +use std::cell::Cell; +use std::fmt; + +thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); + +/// Represents an executor context. +/// +/// For more details, see [`enter` documentation](enter()). +pub struct Enter { + _priv: (), +} + +/// An error returned by `enter` if an execution scope has already been +/// entered. +pub struct EnterError { + _priv: (), +} + +impl fmt::Debug for EnterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EnterError").finish() + } +} + +impl fmt::Display for EnterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "an execution scope has already been entered") + } +} + +impl std::error::Error for EnterError {} + +/// Marks the current thread as being within the dynamic extent of an +/// executor. +/// +/// Executor implementations should call this function before beginning to +/// execute a task, and drop the returned [`Enter`](Enter) value after +/// completing task execution: +/// +/// ``` +/// use futures::executor::enter; +/// +/// let enter = enter().expect("..."); +/// /* run task */ +/// drop(enter); +/// ``` +/// +/// Doing so ensures that executors aren't +/// accidentally invoked in a nested fashion. +/// +/// # Error +/// +/// Returns an error if the current thread is already marked, in which case the +/// caller should panic with a tailored error message. +pub fn enter() -> Result<Enter, EnterError> { + ENTERED.with(|c| { + if c.get() { + Err(EnterError { _priv: () }) + } else { + c.set(true); + + Ok(Enter { _priv: () }) + } + }) +} + +impl fmt::Debug for Enter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for Enter { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(c.get()); + c.set(false); + }); + } +} diff --git a/third_party/rust/futures-executor/src/lib.rs b/third_party/rust/futures-executor/src/lib.rs new file mode 100644 index 0000000000..b1af87545f --- /dev/null +++ b/third_party/rust/futures-executor/src/lib.rs @@ -0,0 +1,76 @@ +//! Built-in executors and related tools. +//! +//! All asynchronous computation occurs within an executor, which is +//! capable of spawning futures as tasks. This module provides several +//! built-in executors, as well as tools for building your own. +//! +//! All items are only available when the `std` feature of this +//! library is activated, and it is activated by default. +//! +//! # Using a thread pool (M:N task scheduling) +//! +//! Most of the time tasks should be executed on a [thread pool](ThreadPool). +//! A small set of worker threads can handle a very large set of spawned tasks +//! (which are much lighter weight than threads). Tasks spawned onto the pool +//! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on +//! the created threads. +//! +//! # Spawning additional tasks +//! +//! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method +//! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used +//! instead. +//! +//! # Single-threaded execution +//! +//! In addition to thread pools, it's possible to run a task (and the tasks +//! it spawns) entirely within a single thread via the [`LocalPool`] executor. +//! Aside from cutting down on synchronization costs, this executor also makes +//! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The +//! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively +//! little work between I/O operations. +//! +//! There is also a convenience function [`block_on`] for simply running a +//! future to completion on the current thread. +//! +//! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj +//! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj + +#![cfg_attr(not(feature = "std"), no_std)] +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + single_use_lifetimes, + unreachable_pub +)] +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms, single_use_lifetimes), + allow(dead_code, unused_assignments, unused_variables) + ) +))] +#![cfg_attr(docsrs, feature(doc_cfg))] + +#[cfg(feature = "std")] +mod local_pool; +#[cfg(feature = "std")] +pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner}; + +#[cfg(feature = "thread-pool")] +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] +#[cfg(feature = "std")] +mod thread_pool; +#[cfg(feature = "thread-pool")] +#[cfg(feature = "std")] +mod unpark_mutex; +#[cfg(feature = "thread-pool")] +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] +#[cfg(feature = "std")] +pub use crate::thread_pool::{ThreadPool, ThreadPoolBuilder}; + +#[cfg(feature = "std")] +mod enter; +#[cfg(feature = "std")] +pub use crate::enter::{enter, Enter, EnterError}; diff --git a/third_party/rust/futures-executor/src/local_pool.rs b/third_party/rust/futures-executor/src/local_pool.rs new file mode 100644 index 0000000000..8a9bc2fc90 --- /dev/null +++ b/third_party/rust/futures-executor/src/local_pool.rs @@ -0,0 +1,402 @@ +use crate::enter; +use futures_core::future::Future; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use futures_task::{waker_ref, ArcWake}; +use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +use futures_util::pin_mut; +use futures_util::stream::FuturesUnordered; +use futures_util::stream::StreamExt; +use std::cell::RefCell; +use std::ops::{Deref, DerefMut}; +use std::rc::{Rc, Weak}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use std::thread::{self, Thread}; + +/// A single-threaded task pool for polling futures to completion. +/// +/// This executor allows you to multiplex any number of tasks onto a single +/// thread. It's appropriate to poll strictly I/O-bound futures that do very +/// little work in between I/O actions. +/// +/// To get a handle to the pool that implements +/// [`Spawn`](futures_task::Spawn), use the +/// [`spawner()`](LocalPool::spawner) method. Because the executor is +/// single-threaded, it supports a special form of task spawning for non-`Send` +/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj). +#[derive(Debug)] +pub struct LocalPool { + pool: FuturesUnordered<LocalFutureObj<'static, ()>>, + incoming: Rc<Incoming>, +} + +/// A handle to a [`LocalPool`](LocalPool) that implements +/// [`Spawn`](futures_task::Spawn). +#[derive(Clone, Debug)] +pub struct LocalSpawner { + incoming: Weak<Incoming>, +} + +type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>; + +pub(crate) struct ThreadNotify { + /// The (single) executor thread. + thread: Thread, + /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten" + /// before the next `park()`, which may otherwise happen if the code + /// being executed as part of the future(s) being polled makes use of + /// park / unpark calls of its own, i.e. we cannot assume that no other + /// code uses park / unpark on the executing `thread`. + unparked: AtomicBool, +} + +thread_local! { + static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify { + thread: thread::current(), + unparked: AtomicBool::new(false), + }); +} + +impl ArcWake for ThreadNotify { + fn wake_by_ref(arc_self: &Arc<Self>) { + // Make sure the wakeup is remembered until the next `park()`. + let unparked = arc_self.unparked.swap(true, Ordering::Release); + if !unparked { + // If the thread has not been unparked yet, it must be done + // now. If it was actually parked, it will run again, + // otherwise the token made available by `unpark` + // may be consumed before reaching `park()`, but `unparked` + // ensures it is not forgotten. + arc_self.thread.unpark(); + } + } +} + +// Set up and run a basic single-threaded spawner loop, invoking `f` on each +// turn. +fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T { + let _enter = enter().expect( + "cannot execute `LocalPool` executor from within \ + another executor", + ); + + CURRENT_THREAD_NOTIFY.with(|thread_notify| { + let waker = waker_ref(thread_notify); + let mut cx = Context::from_waker(&waker); + loop { + if let Poll::Ready(t) = f(&mut cx) { + return t; + } + + // Wait for a wakeup. + while !thread_notify.unparked.swap(false, Ordering::Acquire) { + // No wakeup occurred. It may occur now, right before parking, + // but in that case the token made available by `unpark()` + // is guaranteed to still be available and `park()` is a no-op. + thread::park(); + } + } + }) +} + +/// Check for a wakeup, but don't consume it. +fn woken() -> bool { + CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire)) +} + +impl LocalPool { + /// Create a new, empty pool of tasks. + pub fn new() -> Self { + Self { pool: FuturesUnordered::new(), incoming: Default::default() } + } + + /// Get a clonable handle to the pool as a [`Spawn`]. + pub fn spawner(&self) -> LocalSpawner { + LocalSpawner { incoming: Rc::downgrade(&self.incoming) } + } + + /// Run all tasks in the pool to completion. + /// + /// ``` + /// use futures::executor::LocalPool; + /// + /// let mut pool = LocalPool::new(); + /// + /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()` + /// + /// // run *all* tasks in the pool to completion, including any newly-spawned ones. + /// pool.run(); + /// ``` + /// + /// The function will block the calling thread until *all* tasks in the pool + /// are complete, including any spawned while running existing tasks. + pub fn run(&mut self) { + run_executor(|cx| self.poll_pool(cx)) + } + + /// Runs all the tasks in the pool until the given future completes. + /// + /// ``` + /// use futures::executor::LocalPool; + /// + /// let mut pool = LocalPool::new(); + /// # let my_app = async {}; + /// + /// // run tasks in the pool until `my_app` completes + /// pool.run_until(my_app); + /// ``` + /// + /// The function will block the calling thread *only* until the future `f` + /// completes; there may still be incomplete tasks in the pool, which will + /// be inert after the call completes, but can continue with further use of + /// one of the pool's run or poll methods. While the function is running, + /// however, all tasks in the pool will try to make progress. + pub fn run_until<F: Future>(&mut self, future: F) -> F::Output { + pin_mut!(future); + + run_executor(|cx| { + { + // if our main task is done, so are we + let result = future.as_mut().poll(cx); + if let Poll::Ready(output) = result { + return Poll::Ready(output); + } + } + + let _ = self.poll_pool(cx); + Poll::Pending + }) + } + + /// Runs all tasks and returns after completing one future or until no more progress + /// can be made. Returns `true` if one future was completed, `false` otherwise. + /// + /// ``` + /// use futures::executor::LocalPool; + /// use futures::task::LocalSpawnExt; + /// use futures::future::{ready, pending}; + /// + /// let mut pool = LocalPool::new(); + /// let spawner = pool.spawner(); + /// + /// spawner.spawn_local(ready(())).unwrap(); + /// spawner.spawn_local(ready(())).unwrap(); + /// spawner.spawn_local(pending()).unwrap(); + /// + /// // Run the two ready tasks and return true for them. + /// pool.try_run_one(); // returns true after completing one of the ready futures + /// pool.try_run_one(); // returns true after completing the other ready future + /// + /// // the remaining task can not be completed + /// assert!(!pool.try_run_one()); // returns false + /// ``` + /// + /// This function will not block the calling thread and will return the moment + /// that there are no tasks left for which progress can be made or after exactly one + /// task was completed; Remaining incomplete tasks in the pool can continue with + /// further use of one of the pool's run or poll methods. + /// Though only one task will be completed, progress may be made on multiple tasks. + pub fn try_run_one(&mut self) -> bool { + run_executor(|cx| { + loop { + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), + } + + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken() { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); + } + } + }) + } + + /// Runs all tasks in the pool and returns if no more progress can be made + /// on any task. + /// + /// ``` + /// use futures::executor::LocalPool; + /// use futures::task::LocalSpawnExt; + /// use futures::future::{ready, pending}; + /// + /// let mut pool = LocalPool::new(); + /// let spawner = pool.spawner(); + /// + /// spawner.spawn_local(ready(())).unwrap(); + /// spawner.spawn_local(ready(())).unwrap(); + /// spawner.spawn_local(pending()).unwrap(); + /// + /// // Runs the two ready task and returns. + /// // The empty task remains in the pool. + /// pool.run_until_stalled(); + /// ``` + /// + /// This function will not block the calling thread and will return the moment + /// that there are no tasks left for which progress can be made; + /// remaining incomplete tasks in the pool can continue with further use of one + /// of the pool's run or poll methods. While the function is running, all tasks + /// in the pool will try to make progress. + pub fn run_until_stalled(&mut self) { + run_executor(|cx| match self.poll_pool(cx) { + // The pool is empty. + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken() { + Poll::Pending + } else { + // We're stalled for now. + Poll::Ready(()) + } + } + }); + } + + /// Poll `self.pool`, re-filling it with any newly-spawned tasks. + /// Repeat until either the pool is empty, or it returns `Pending`. + /// + /// Returns `Ready` if the pool was empty, and `Pending` otherwise. + /// + /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily + /// mean that the pool can't make progress. + fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { + loop { + self.drain_incoming(); + + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. + if !self.incoming.borrow().is_empty() { + continue; + } + + match pool_ret { + Poll::Ready(Some(())) => continue, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + } + } + } + + /// Empty the incoming queue of newly-spawned tasks. + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) + } + } +} + +impl Default for LocalPool { + fn default() -> Self { + Self::new() + } +} + +/// Run a future to completion on the current thread. +/// +/// This function will block the caller until the given future has completed. +/// +/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over +/// spawned tasks. +pub fn block_on<F: Future>(f: F) -> F::Output { + pin_mut!(f); + run_executor(|cx| f.as_mut().poll(cx)) +} + +/// Turn a stream into a blocking iterator. +/// +/// When `next` is called on the resulting `BlockingStream`, the caller +/// will be blocked until the next element of the `Stream` becomes available. +pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> { + BlockingStream { stream } +} + +/// An iterator which blocks on values from a stream until they become available. +#[derive(Debug)] +pub struct BlockingStream<S: Stream + Unpin> { + stream: S, +} + +impl<S: Stream + Unpin> Deref for BlockingStream<S> { + type Target = S; + fn deref(&self) -> &Self::Target { + &self.stream + } +} + +impl<S: Stream + Unpin> DerefMut for BlockingStream<S> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.stream + } +} + +impl<S: Stream + Unpin> BlockingStream<S> { + /// Convert this `BlockingStream` into the inner `Stream` type. + pub fn into_inner(self) -> S { + self.stream + } +} + +impl<S: Stream + Unpin> Iterator for BlockingStream<S> { + type Item = S::Item; + + fn next(&mut self) -> Option<Self::Item> { + LocalPool::new().run_until(self.stream.next()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} + +impl Spawn for LocalSpawner { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future.into()); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } + + fn status(&self) -> Result<(), SpawnError> { + if self.incoming.upgrade().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} + +impl LocalSpawn for LocalSpawner { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } + + fn status_local(&self) -> Result<(), SpawnError> { + if self.incoming.upgrade().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} diff --git a/third_party/rust/futures-executor/src/thread_pool.rs b/third_party/rust/futures-executor/src/thread_pool.rs new file mode 100644 index 0000000000..5371008953 --- /dev/null +++ b/third_party/rust/futures-executor/src/thread_pool.rs @@ -0,0 +1,380 @@ +use crate::enter; +use crate::unpark_mutex::UnparkMutex; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_task::{waker_ref, ArcWake}; +use futures_task::{FutureObj, Spawn, SpawnError}; +use futures_util::future::FutureExt; +use std::cmp; +use std::fmt; +use std::io; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::thread; + +/// A general-purpose thread pool for scheduling tasks that poll futures to +/// completion. +/// +/// The thread pool multiplexes any number of tasks onto a fixed number of +/// worker threads. +/// +/// This type is a clonable handle to the threadpool itself. +/// Cloning it will only create a new reference, not a new threadpool. +/// +/// This type is only available when the `thread-pool` feature of this +/// library is activated. +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] +pub struct ThreadPool { + state: Arc<PoolState>, +} + +/// Thread pool configuration object. +/// +/// This type is only available when the `thread-pool` feature of this +/// library is activated. +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] +pub struct ThreadPoolBuilder { + pool_size: usize, + stack_size: usize, + name_prefix: Option<String>, + after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>, + before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>, +} + +trait AssertSendSync: Send + Sync {} +impl AssertSendSync for ThreadPool {} + +struct PoolState { + tx: Mutex<mpsc::Sender<Message>>, + rx: Mutex<mpsc::Receiver<Message>>, + cnt: AtomicUsize, + size: usize, +} + +impl fmt::Debug for ThreadPool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadPool").field("size", &self.state.size).finish() + } +} + +impl fmt::Debug for ThreadPoolBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadPoolBuilder") + .field("pool_size", &self.pool_size) + .field("name_prefix", &self.name_prefix) + .finish() + } +} + +enum Message { + Run(Task), + Close, +} + +impl ThreadPool { + /// Creates a new thread pool with the default configuration. + /// + /// See documentation for the methods in + /// [`ThreadPoolBuilder`](ThreadPoolBuilder) for details on the default + /// configuration. + pub fn new() -> Result<Self, io::Error> { + ThreadPoolBuilder::new().create() + } + + /// Create a default thread pool configuration, which can then be customized. + /// + /// See documentation for the methods in + /// [`ThreadPoolBuilder`](ThreadPoolBuilder) for details on the default + /// configuration. + pub fn builder() -> ThreadPoolBuilder { + ThreadPoolBuilder::new() + } + + /// Spawns a future that will be run to completion. + /// + /// > **Note**: This method is similar to `Spawn::spawn_obj`, except that + /// > it is guaranteed to always succeed. + pub fn spawn_obj_ok(&self, future: FutureObj<'static, ()>) { + let task = Task { + future, + wake_handle: Arc::new(WakeHandle { exec: self.clone(), mutex: UnparkMutex::new() }), + exec: self.clone(), + }; + self.state.send(Message::Run(task)); + } + + /// Spawns a task that polls the given future with output `()` to + /// completion. + /// + /// ``` + /// # { + /// use futures::executor::ThreadPool; + /// + /// let pool = ThreadPool::new().unwrap(); + /// + /// let future = async { /* ... */ }; + /// pool.spawn_ok(future); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// > **Note**: This method is similar to `SpawnExt::spawn`, except that + /// > it is guaranteed to always succeed. + pub fn spawn_ok<Fut>(&self, future: Fut) + where + Fut: Future<Output = ()> + Send + 'static, + { + self.spawn_obj_ok(FutureObj::new(Box::new(future))) + } +} + +impl Spawn for ThreadPool { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + self.spawn_obj_ok(future); + Ok(()) + } +} + +impl PoolState { + fn send(&self, msg: Message) { + self.tx.lock().unwrap().send(msg).unwrap(); + } + + fn work( + &self, + idx: usize, + after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>, + before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>, + ) { + let _scope = enter().unwrap(); + if let Some(after_start) = after_start { + after_start(idx); + } + loop { + let msg = self.rx.lock().unwrap().recv().unwrap(); + match msg { + Message::Run(task) => task.run(), + Message::Close => break, + } + } + if let Some(before_stop) = before_stop { + before_stop(idx); + } + } +} + +impl Clone for ThreadPool { + fn clone(&self) -> Self { + self.state.cnt.fetch_add(1, Ordering::Relaxed); + Self { state: self.state.clone() } + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + if self.state.cnt.fetch_sub(1, Ordering::Relaxed) == 1 { + for _ in 0..self.state.size { + self.state.send(Message::Close); + } + } + } +} + +impl ThreadPoolBuilder { + /// Create a default thread pool configuration. + /// + /// See the other methods on this type for details on the defaults. + pub fn new() -> Self { + Self { + pool_size: cmp::max(1, num_cpus::get()), + stack_size: 0, + name_prefix: None, + after_start: None, + before_stop: None, + } + } + + /// Set size of a future ThreadPool + /// + /// The size of a thread pool is the number of worker threads spawned. By + /// default, this is equal to the number of CPU cores. + /// + /// # Panics + /// + /// Panics if `pool_size == 0`. + pub fn pool_size(&mut self, size: usize) -> &mut Self { + assert!(size > 0); + self.pool_size = size; + self + } + + /// Set stack size of threads in the pool, in bytes. + /// + /// By default, worker threads use Rust's standard stack size. + pub fn stack_size(&mut self, stack_size: usize) -> &mut Self { + self.stack_size = stack_size; + self + } + + /// Set thread name prefix of a future ThreadPool. + /// + /// Thread name prefix is used for generating thread names. For example, if prefix is + /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc. + /// + /// By default, worker threads are assigned Rust's standard thread name. + pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self { + self.name_prefix = Some(name_prefix.into()); + self + } + + /// Execute the closure `f` immediately after each worker thread is started, + /// but before running any tasks on it. + /// + /// This hook is intended for bookkeeping and monitoring. + /// The closure `f` will be dropped after the `builder` is dropped + /// and all worker threads in the pool have executed it. + /// + /// The closure provided will receive an index corresponding to the worker + /// thread it's running on. + pub fn after_start<F>(&mut self, f: F) -> &mut Self + where + F: Fn(usize) + Send + Sync + 'static, + { + self.after_start = Some(Arc::new(f)); + self + } + + /// Execute closure `f` just prior to shutting down each worker thread. + /// + /// This hook is intended for bookkeeping and monitoring. + /// The closure `f` will be dropped after the `builder` is dropped + /// and all threads in the pool have executed it. + /// + /// The closure provided will receive an index corresponding to the worker + /// thread it's running on. + pub fn before_stop<F>(&mut self, f: F) -> &mut Self + where + F: Fn(usize) + Send + Sync + 'static, + { + self.before_stop = Some(Arc::new(f)); + self + } + + /// Create a [`ThreadPool`](ThreadPool) with the given configuration. + pub fn create(&mut self) -> Result<ThreadPool, io::Error> { + let (tx, rx) = mpsc::channel(); + let pool = ThreadPool { + state: Arc::new(PoolState { + tx: Mutex::new(tx), + rx: Mutex::new(rx), + cnt: AtomicUsize::new(1), + size: self.pool_size, + }), + }; + + for counter in 0..self.pool_size { + let state = pool.state.clone(); + let after_start = self.after_start.clone(); + let before_stop = self.before_stop.clone(); + let mut thread_builder = thread::Builder::new(); + if let Some(ref name_prefix) = self.name_prefix { + thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter)); + } + if self.stack_size > 0 { + thread_builder = thread_builder.stack_size(self.stack_size); + } + thread_builder.spawn(move || state.work(counter, after_start, before_stop))?; + } + Ok(pool) + } +} + +impl Default for ThreadPoolBuilder { + fn default() -> Self { + Self::new() + } +} + +/// A task responsible for polling a future to completion. +struct Task { + future: FutureObj<'static, ()>, + exec: ThreadPool, + wake_handle: Arc<WakeHandle>, +} + +struct WakeHandle { + mutex: UnparkMutex<Task>, + exec: ThreadPool, +} + +impl Task { + /// Actually run the task (invoking `poll` on the future) on the current + /// thread. + fn run(self) { + let Self { mut future, wake_handle, mut exec } = self; + let waker = waker_ref(&wake_handle); + let mut cx = Context::from_waker(&waker); + + // Safety: The ownership of this `Task` object is evidence that + // we are in the `POLLING`/`REPOLL` state for the mutex. + unsafe { + wake_handle.mutex.start_poll(); + + loop { + let res = future.poll_unpin(&mut cx); + match res { + Poll::Pending => {} + Poll::Ready(()) => return wake_handle.mutex.complete(), + } + let task = Self { future, wake_handle: wake_handle.clone(), exec }; + match wake_handle.mutex.wait(task) { + Ok(()) => return, // we've waited + Err(task) => { + // someone's notified us + future = task.future; + exec = task.exec; + } + } + } + } + } +} + +impl fmt::Debug for Task { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Task").field("contents", &"...").finish() + } +} + +impl ArcWake for WakeHandle { + fn wake_by_ref(arc_self: &Arc<Self>) { + if let Ok(task) = arc_self.mutex.notify() { + arc_self.exec.state.send(Message::Run(task)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::mpsc; + + #[test] + fn test_drop_after_start() { + { + let (tx, rx) = mpsc::sync_channel(2); + let _cpu_pool = ThreadPoolBuilder::new() + .pool_size(2) + .after_start(move |_| tx.send(1).unwrap()) + .create() + .unwrap(); + + // After ThreadPoolBuilder is deconstructed, the tx should be dropped + // so that we can use rx as an iterator. + let count = rx.into_iter().count(); + assert_eq!(count, 2); + } + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + } +} diff --git a/third_party/rust/futures-executor/src/unpark_mutex.rs b/third_party/rust/futures-executor/src/unpark_mutex.rs new file mode 100644 index 0000000000..ac5112cfa2 --- /dev/null +++ b/third_party/rust/futures-executor/src/unpark_mutex.rs @@ -0,0 +1,137 @@ +use std::cell::UnsafeCell; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +/// A "lock" around data `D`, which employs a *helping* strategy. +/// +/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being +/// invoked on only a single thread at a time (2) `poll` being invoked at least +/// once after each `unpark` (unless the future has completed). +pub(crate) struct UnparkMutex<D> { + // The state of task execution (state machine described below) + status: AtomicUsize, + + // The actual task data, accessible only in the POLLING state + inner: UnsafeCell<Option<D>>, +} + +// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on +// acquisition failure, the current lock holder performs the desired work -- +// re-polling. +// +// As such, these impls mirror those for `Mutex<D>`. In particular, a reference +// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which +// must therefore be `Send`. +unsafe impl<D: Send> Send for UnparkMutex<D> {} +unsafe impl<D: Send> Sync for UnparkMutex<D> {} + +// There are four possible task states, listed below with their possible +// transitions: + +// The task is blocked, waiting on an event +const WAITING: usize = 0; // --> POLLING + +// The task is actively being polled by a thread; arrival of additional events +// of interest should move it to the REPOLL state +const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE + +// The task is actively being polled, but will need to be re-polled upon +// completion to ensure that all events were observed. +const REPOLL: usize = 2; // --> POLLING + +// The task has finished executing (either successfully or with an error/panic) +const COMPLETE: usize = 3; // No transitions out + +impl<D> UnparkMutex<D> { + pub(crate) fn new() -> Self { + Self { status: AtomicUsize::new(WAITING), inner: UnsafeCell::new(None) } + } + + /// Attempt to "notify" the mutex that a poll should occur. + /// + /// An `Ok` result indicates that the `POLLING` state has been entered, and + /// the caller can proceed to poll the future. An `Err` result indicates + /// that polling is not necessary (because the task is finished or the + /// polling has been delegated). + pub(crate) fn notify(&self) -> Result<D, ()> { + let mut status = self.status.load(SeqCst); + loop { + match status { + // The task is idle, so try to run it immediately. + WAITING => { + match self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst) { + Ok(_) => { + let data = unsafe { + // SAFETY: we've ensured mutual exclusion via + // the status protocol; we are the only thread + // that has transitioned to the POLLING state, + // and we won't transition back to QUEUED until + // the lock is "released" by this thread. See + // the protocol diagram above. + (*self.inner.get()).take().unwrap() + }; + return Ok(data); + } + Err(cur) => status = cur, + } + } + + // The task is being polled, so we need to record that it should + // be *repolled* when complete. + POLLING => match self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst) { + Ok(_) => return Err(()), + Err(cur) => status = cur, + }, + + // The task is already scheduled for polling, or is complete, so + // we've got nothing to do. + _ => return Err(()), + } + } + } + + /// Alert the mutex that polling is about to begin, clearing any accumulated + /// re-poll requests. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub(crate) unsafe fn start_poll(&self) { + self.status.store(POLLING, SeqCst); + } + + /// Alert the mutex that polling completed with `Pending`. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> { + *self.inner.get() = Some(data); + + match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) { + // no unparks came in while we were running + Ok(_) => Ok(()), + + // guaranteed to be in REPOLL state; just clobber the + // state and run again. + Err(status) => { + assert_eq!(status, REPOLL); + self.status.store(POLLING, SeqCst); + Err((*self.inner.get()).take().unwrap()) + } + } + } + + /// Alert the mutex that the task has completed execution and should not be + /// notified again. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub(crate) unsafe fn complete(&self) { + self.status.store(COMPLETE, SeqCst); + } +} diff --git a/third_party/rust/futures-executor/tests/local_pool.rs b/third_party/rust/futures-executor/tests/local_pool.rs new file mode 100644 index 0000000000..72ce74b744 --- /dev/null +++ b/third_party/rust/futures-executor/tests/local_pool.rs @@ -0,0 +1,496 @@ +use futures::channel::oneshot; +use futures::executor::LocalPool; +use futures::future::{self, lazy, poll_fn, Future}; +use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker}; +use std::cell::{Cell, RefCell}; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +struct Pending(Rc<()>); + +impl Future for Pending { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } +} + +fn pending() -> Pending { + Pending(Rc::new(())) +} + +#[test] +fn run_until_single_future() { + let mut cnt = 0; + + { + let mut pool = LocalPool::new(); + let fut = lazy(|_| { + cnt += 1; + }); + pool.run_until(fut); + } + + assert_eq!(cnt, 1); +} + +#[test] +fn run_until_ignores_spawned() { + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); + pool.run_until(lazy(|_| ())); +} + +#[test] +fn run_until_executes_spawned() { + let (tx, rx) = oneshot::channel(); + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + tx.send(()).unwrap(); + })) + .into(), + ) + .unwrap(); + pool.run_until(rx).unwrap(); +} + +#[test] +fn run_returns_if_empty() { + let mut pool = LocalPool::new(); + pool.run(); + pool.run(); +} + +#[test] +fn run_executes_spawned() { + let cnt = Rc::new(Cell::new(0)); + let cnt2 = cnt.clone(); + + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + let spawn2 = pool.spawner(); + + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + spawn2 + .spawn_local_obj( + Box::pin(lazy(move |_| { + cnt2.set(cnt2.get() + 1); + })) + .into(), + ) + .unwrap(); + })) + .into(), + ) + .unwrap(); + + pool.run(); + + assert_eq!(cnt.get(), 1); +} + +#[test] +fn run_spawn_many() { + const ITER: usize = 200; + + let cnt = Rc::new(Cell::new(0)); + + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + + for _ in 0..ITER { + let cnt = cnt.clone(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + cnt.set(cnt.get() + 1); + })) + .into(), + ) + .unwrap(); + } + + pool.run(); + + assert_eq!(cnt.get(), ITER); +} + +#[test] +fn try_run_one_returns_if_empty() { + let mut pool = LocalPool::new(); + assert!(!pool.try_run_one()); +} + +#[test] +fn try_run_one_executes_one_ready() { + const ITER: usize = 200; + + let cnt = Rc::new(Cell::new(0)); + + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + + for _ in 0..ITER { + spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); + + let cnt = cnt.clone(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + cnt.set(cnt.get() + 1); + })) + .into(), + ) + .unwrap(); + + spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); + } + + for i in 0..ITER { + assert_eq!(cnt.get(), i); + assert!(pool.try_run_one()); + assert_eq!(cnt.get(), i + 1); + } + assert!(!pool.try_run_one()); +} + +#[test] +fn try_run_one_returns_on_no_progress() { + const ITER: usize = 10; + + let cnt = Rc::new(Cell::new(0)); + + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + + let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None)); + { + let cnt = cnt.clone(); + let waker = waker.clone(); + spawn + .spawn_local_obj( + Box::pin(poll_fn(move |ctx| { + cnt.set(cnt.get() + 1); + waker.set(Some(ctx.waker().clone())); + if cnt.get() == ITER { + Poll::Ready(()) + } else { + Poll::Pending + } + })) + .into(), + ) + .unwrap(); + } + + for i in 0..ITER - 1 { + assert_eq!(cnt.get(), i); + assert!(!pool.try_run_one()); + assert_eq!(cnt.get(), i + 1); + let w = waker.take(); + assert!(w.is_some()); + w.unwrap().wake(); + } + assert!(pool.try_run_one()); + assert_eq!(cnt.get(), ITER); +} + +#[test] +fn try_run_one_runs_sub_futures() { + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + let cnt = Rc::new(Cell::new(0)); + + let inner_spawner = spawn.clone(); + let cnt1 = cnt.clone(); + spawn + .spawn_local_obj( + Box::pin(poll_fn(move |_| { + cnt1.set(cnt1.get() + 1); + + let cnt2 = cnt1.clone(); + inner_spawner + .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()) + .unwrap(); + + Poll::Pending + })) + .into(), + ) + .unwrap(); + + pool.try_run_one(); + assert_eq!(cnt.get(), 2); +} + +#[test] +fn run_until_stalled_returns_if_empty() { + let mut pool = LocalPool::new(); + pool.run_until_stalled(); + pool.run_until_stalled(); +} + +#[test] +fn run_until_stalled_returns_multiple_times() { + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + let cnt = Rc::new(Cell::new(0)); + + let cnt1 = cnt.clone(); + spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap(); + pool.run_until_stalled(); + assert_eq!(cnt.get(), 1); + + let cnt2 = cnt.clone(); + spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap(); + pool.run_until_stalled(); + assert_eq!(cnt.get(), 2); +} + +#[test] +fn run_until_stalled_runs_spawned_sub_futures() { + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + let cnt = Rc::new(Cell::new(0)); + + let inner_spawner = spawn.clone(); + let cnt1 = cnt.clone(); + spawn + .spawn_local_obj( + Box::pin(poll_fn(move |_| { + cnt1.set(cnt1.get() + 1); + + let cnt2 = cnt1.clone(); + inner_spawner + .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()) + .unwrap(); + + Poll::Pending + })) + .into(), + ) + .unwrap(); + + pool.run_until_stalled(); + assert_eq!(cnt.get(), 2); +} + +#[test] +fn run_until_stalled_executes_all_ready() { + const ITER: usize = if cfg!(miri) { 50 } else { 200 }; + const PER_ITER: usize = 3; + + let cnt = Rc::new(Cell::new(0)); + + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + + for i in 0..ITER { + for _ in 0..PER_ITER { + spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); + + let cnt = cnt.clone(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + cnt.set(cnt.get() + 1); + })) + .into(), + ) + .unwrap(); + + // also add some pending tasks to test if they are ignored + spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); + } + assert_eq!(cnt.get(), i * PER_ITER); + pool.run_until_stalled(); + assert_eq!(cnt.get(), (i + 1) * PER_ITER); + } +} + +#[test] +#[should_panic] +fn nesting_run() { + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + + spawn + .spawn_obj( + Box::pin(lazy(|_| { + let mut pool = LocalPool::new(); + pool.run(); + })) + .into(), + ) + .unwrap(); + + pool.run(); +} + +#[test] +#[should_panic] +fn nesting_run_run_until_stalled() { + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + + spawn + .spawn_obj( + Box::pin(lazy(|_| { + let mut pool = LocalPool::new(); + pool.run_until_stalled(); + })) + .into(), + ) + .unwrap(); + + pool.run(); +} + +#[test] +fn tasks_are_scheduled_fairly() { + let state = Rc::new(RefCell::new([0, 0])); + + struct Spin { + state: Rc<RefCell<[i32; 2]>>, + idx: usize, + } + + impl Future for Spin { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let mut state = self.state.borrow_mut(); + + if self.idx == 0 { + let diff = state[0] - state[1]; + + assert!(diff.abs() <= 1); + + if state[0] >= 50 { + return Poll::Ready(()); + } + } + + state[self.idx] += 1; + + if state[self.idx] >= 100 { + return Poll::Ready(()); + } + + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + let mut pool = LocalPool::new(); + let spawn = pool.spawner(); + + spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap(); + + spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap(); + + pool.run(); +} + +// Tests that the use of park/unpark in user-code has no +// effect on the expected behavior of the executor. +#[test] +fn park_unpark_independence() { + let mut done = false; + + let future = future::poll_fn(move |cx| { + if done { + return Poll::Ready(()); + } + done = true; + cx.waker().clone().wake(); // (*) + // some user-code that temporarily parks the thread + let test = thread::current(); + let latch = Arc::new(AtomicBool::new(false)); + let signal = latch.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(10)); + signal.store(true, Ordering::SeqCst); + test.unpark() + }); + while !latch.load(Ordering::Relaxed) { + thread::park(); + } + Poll::Pending // Expect to be called again due to (*). + }); + + futures::executor::block_on(future) +} + +struct SelfWaking { + wakeups_remaining: Rc<RefCell<usize>>, +} + +impl Future for SelfWaking { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if *self.wakeups_remaining.borrow() != 0 { + *self.wakeups_remaining.borrow_mut() -= 1; + cx.waker().wake_by_ref(); + } + + Poll::Pending + } +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `run_until_stalled` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_run_until_stalled() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + // This should keep polling until there are no more wakeups. + pool.run_until_stalled(); + + assert_eq!(*wakeups_remaining.borrow(), 0); +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `try_run_one` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_try_run_one() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + spawner.spawn(future::ready(())).unwrap(); + + // The `ready` future should complete. + assert!(pool.try_run_one()); + + // The self-waking futures are each polled once. + assert_eq!(*wakeups_remaining.borrow(), 7); +} |