summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-executor
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/rust/futures-executor
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-executor')
-rw-r--r--third_party/rust/futures-executor/.cargo-checksum.json1
-rw-r--r--third_party/rust/futures-executor/Cargo.toml60
-rw-r--r--third_party/rust/futures-executor/LICENSE-APACHE202
-rw-r--r--third_party/rust/futures-executor/LICENSE-MIT26
-rw-r--r--third_party/rust/futures-executor/README.md23
-rw-r--r--third_party/rust/futures-executor/benches/thread_notify.rs109
-rw-r--r--third_party/rust/futures-executor/src/enter.rs80
-rw-r--r--third_party/rust/futures-executor/src/lib.rs76
-rw-r--r--third_party/rust/futures-executor/src/local_pool.rs402
-rw-r--r--third_party/rust/futures-executor/src/thread_pool.rs380
-rw-r--r--third_party/rust/futures-executor/src/unpark_mutex.rs137
-rw-r--r--third_party/rust/futures-executor/tests/local_pool.rs496
12 files changed, 1992 insertions, 0 deletions
diff --git a/third_party/rust/futures-executor/.cargo-checksum.json b/third_party/rust/futures-executor/.cargo-checksum.json
new file mode 100644
index 0000000000..00c487fc07
--- /dev/null
+++ b/third_party/rust/futures-executor/.cargo-checksum.json
@@ -0,0 +1 @@
+{"files":{"Cargo.toml":"dac1d16ebb659583c1092ed30905ea278db9b6a291a4f44e40bc25bd19997b70","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"05ba6a5490962c4df45b78e9ad928a29dd5c3fad749284d5b812ca7e765feb6d","benches/thread_notify.rs":"e601968527bee85766f32d2d11de5ed8f6b4bd5a29989b5c369a52bd3cd3d024","src/enter.rs":"e3e890a8fa649e76cd2ce915abb11b67d15f3c5ae5e8e374142e0363917b2406","src/lib.rs":"08a25594c789cb4ce1c8929a9ddd745e67fee1db373e011a7ebe135933522614","src/local_pool.rs":"78177af55564fdfcfdc9f3974afe7d9d0682a7e4654761d83a8fc02abb34a7dc","src/thread_pool.rs":"e52f8527bc37c511513d77d183b44e3991a7b324aaed5d17bee0d092cf448a5b","src/unpark_mutex.rs":"e186464d9bdec22a6d1e1d900ed03a1154e6b0d422ede9bd3b768657cdbb6113","tests/local_pool.rs":"9639c9a290e23faab3913c6fec190853f890defaed6ffe67de177eca5d88932a"},"package":"ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"} \ No newline at end of file
diff --git a/third_party/rust/futures-executor/Cargo.toml b/third_party/rust/futures-executor/Cargo.toml
new file mode 100644
index 0000000000..c254b8ba46
--- /dev/null
+++ b/third_party/rust/futures-executor/Cargo.toml
@@ -0,0 +1,60 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies.
+#
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
+
+[package]
+edition = "2018"
+rust-version = "1.56"
+name = "futures-executor"
+version = "0.3.28"
+description = """
+Executors for asynchronous tasks based on the futures-rs library.
+"""
+homepage = "https://rust-lang.github.io/futures-rs"
+readme = "README.md"
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/rust-lang/futures-rs"
+
+[package.metadata.docs.rs]
+all-features = true
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+
+[dependencies.futures-core]
+version = "0.3.28"
+default-features = false
+
+[dependencies.futures-task]
+version = "0.3.28"
+default-features = false
+
+[dependencies.futures-util]
+version = "0.3.28"
+default-features = false
+
+[dependencies.num_cpus]
+version = "1.8.0"
+optional = true
+
+[dev-dependencies]
+
+[features]
+default = ["std"]
+std = [
+ "futures-core/std",
+ "futures-task/std",
+ "futures-util/std",
+]
+thread-pool = [
+ "std",
+ "num_cpus",
+]
diff --git a/third_party/rust/futures-executor/LICENSE-APACHE b/third_party/rust/futures-executor/LICENSE-APACHE
new file mode 100644
index 0000000000..9eb0b097f5
--- /dev/null
+++ b/third_party/rust/futures-executor/LICENSE-APACHE
@@ -0,0 +1,202 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+Copyright (c) 2016 Alex Crichton
+Copyright (c) 2017 The Tokio Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/third_party/rust/futures-executor/LICENSE-MIT b/third_party/rust/futures-executor/LICENSE-MIT
new file mode 100644
index 0000000000..8ad082ec4f
--- /dev/null
+++ b/third_party/rust/futures-executor/LICENSE-MIT
@@ -0,0 +1,26 @@
+Copyright (c) 2016 Alex Crichton
+Copyright (c) 2017 The Tokio Authors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/third_party/rust/futures-executor/README.md b/third_party/rust/futures-executor/README.md
new file mode 100644
index 0000000000..724ff5bb33
--- /dev/null
+++ b/third_party/rust/futures-executor/README.md
@@ -0,0 +1,23 @@
+# futures-executor
+
+Executors for asynchronous tasks based on the futures-rs library.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures-executor = "0.3"
+```
+
+The current `futures-executor` requires Rust 1.56 or later.
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/third_party/rust/futures-executor/benches/thread_notify.rs b/third_party/rust/futures-executor/benches/thread_notify.rs
new file mode 100644
index 0000000000..88d0447cf6
--- /dev/null
+++ b/third_party/rust/futures-executor/benches/thread_notify.rs
@@ -0,0 +1,109 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::executor::block_on;
+use futures::future::Future;
+use futures::task::{Context, Poll, Waker};
+use std::pin::Pin;
+
+#[bench]
+fn thread_yield_single_thread_one_wait(b: &mut Bencher) {
+ const NUM: usize = 10_000;
+
+ struct Yield {
+ rem: usize,
+ }
+
+ impl Future for Yield {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if self.rem == 0 {
+ Poll::Ready(())
+ } else {
+ self.rem -= 1;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+ }
+
+ b.iter(|| {
+ let y = Yield { rem: NUM };
+ block_on(y);
+ });
+}
+
+#[bench]
+fn thread_yield_single_thread_many_wait(b: &mut Bencher) {
+ const NUM: usize = 10_000;
+
+ struct Yield {
+ rem: usize,
+ }
+
+ impl Future for Yield {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if self.rem == 0 {
+ Poll::Ready(())
+ } else {
+ self.rem -= 1;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+ }
+
+ b.iter(|| {
+ for _ in 0..NUM {
+ let y = Yield { rem: 1 };
+ block_on(y);
+ }
+ });
+}
+
+#[bench]
+fn thread_yield_multi_thread(b: &mut Bencher) {
+ use std::sync::mpsc;
+ use std::thread;
+
+ const NUM: usize = 1_000;
+
+ let (tx, rx) = mpsc::sync_channel::<Waker>(10_000);
+
+ struct Yield {
+ rem: usize,
+ tx: mpsc::SyncSender<Waker>,
+ }
+ impl Unpin for Yield {}
+
+ impl Future for Yield {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if self.rem == 0 {
+ Poll::Ready(())
+ } else {
+ self.rem -= 1;
+ self.tx.send(cx.waker().clone()).unwrap();
+ Poll::Pending
+ }
+ }
+ }
+
+ thread::spawn(move || {
+ while let Ok(task) = rx.recv() {
+ task.wake();
+ }
+ });
+
+ b.iter(move || {
+ let y = Yield { rem: NUM, tx: tx.clone() };
+
+ block_on(y);
+ });
+}
diff --git a/third_party/rust/futures-executor/src/enter.rs b/third_party/rust/futures-executor/src/enter.rs
new file mode 100644
index 0000000000..cb58c30bb7
--- /dev/null
+++ b/third_party/rust/futures-executor/src/enter.rs
@@ -0,0 +1,80 @@
+use std::cell::Cell;
+use std::fmt;
+
+thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
+
+/// Represents an executor context.
+///
+/// For more details, see [`enter` documentation](enter()).
+pub struct Enter {
+ _priv: (),
+}
+
+/// An error returned by `enter` if an execution scope has already been
+/// entered.
+pub struct EnterError {
+ _priv: (),
+}
+
+impl fmt::Debug for EnterError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("EnterError").finish()
+ }
+}
+
+impl fmt::Display for EnterError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "an execution scope has already been entered")
+ }
+}
+
+impl std::error::Error for EnterError {}
+
+/// Marks the current thread as being within the dynamic extent of an
+/// executor.
+///
+/// Executor implementations should call this function before beginning to
+/// execute a task, and drop the returned [`Enter`](Enter) value after
+/// completing task execution:
+///
+/// ```
+/// use futures::executor::enter;
+///
+/// let enter = enter().expect("...");
+/// /* run task */
+/// drop(enter);
+/// ```
+///
+/// Doing so ensures that executors aren't
+/// accidentally invoked in a nested fashion.
+///
+/// # Error
+///
+/// Returns an error if the current thread is already marked, in which case the
+/// caller should panic with a tailored error message.
+pub fn enter() -> Result<Enter, EnterError> {
+ ENTERED.with(|c| {
+ if c.get() {
+ Err(EnterError { _priv: () })
+ } else {
+ c.set(true);
+
+ Ok(Enter { _priv: () })
+ }
+ })
+}
+
+impl fmt::Debug for Enter {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Enter").finish()
+ }
+}
+
+impl Drop for Enter {
+ fn drop(&mut self) {
+ ENTERED.with(|c| {
+ assert!(c.get());
+ c.set(false);
+ });
+ }
+}
diff --git a/third_party/rust/futures-executor/src/lib.rs b/third_party/rust/futures-executor/src/lib.rs
new file mode 100644
index 0000000000..b1af87545f
--- /dev/null
+++ b/third_party/rust/futures-executor/src/lib.rs
@@ -0,0 +1,76 @@
+//! Built-in executors and related tools.
+//!
+//! All asynchronous computation occurs within an executor, which is
+//! capable of spawning futures as tasks. This module provides several
+//! built-in executors, as well as tools for building your own.
+//!
+//! All items are only available when the `std` feature of this
+//! library is activated, and it is activated by default.
+//!
+//! # Using a thread pool (M:N task scheduling)
+//!
+//! Most of the time tasks should be executed on a [thread pool](ThreadPool).
+//! A small set of worker threads can handle a very large set of spawned tasks
+//! (which are much lighter weight than threads). Tasks spawned onto the pool
+//! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on
+//! the created threads.
+//!
+//! # Spawning additional tasks
+//!
+//! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method
+//! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used
+//! instead.
+//!
+//! # Single-threaded execution
+//!
+//! In addition to thread pools, it's possible to run a task (and the tasks
+//! it spawns) entirely within a single thread via the [`LocalPool`] executor.
+//! Aside from cutting down on synchronization costs, this executor also makes
+//! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The
+//! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively
+//! little work between I/O operations.
+//!
+//! There is also a convenience function [`block_on`] for simply running a
+//! future to completion on the current thread.
+//!
+//! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj
+//! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj
+
+#![cfg_attr(not(feature = "std"), no_std)]
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ single_use_lifetimes,
+ unreachable_pub
+)]
+#![doc(test(
+ no_crate_inject,
+ attr(
+ deny(warnings, rust_2018_idioms, single_use_lifetimes),
+ allow(dead_code, unused_assignments, unused_variables)
+ )
+))]
+#![cfg_attr(docsrs, feature(doc_cfg))]
+
+#[cfg(feature = "std")]
+mod local_pool;
+#[cfg(feature = "std")]
+pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner};
+
+#[cfg(feature = "thread-pool")]
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+#[cfg(feature = "std")]
+mod thread_pool;
+#[cfg(feature = "thread-pool")]
+#[cfg(feature = "std")]
+mod unpark_mutex;
+#[cfg(feature = "thread-pool")]
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+#[cfg(feature = "std")]
+pub use crate::thread_pool::{ThreadPool, ThreadPoolBuilder};
+
+#[cfg(feature = "std")]
+mod enter;
+#[cfg(feature = "std")]
+pub use crate::enter::{enter, Enter, EnterError};
diff --git a/third_party/rust/futures-executor/src/local_pool.rs b/third_party/rust/futures-executor/src/local_pool.rs
new file mode 100644
index 0000000000..8a9bc2fc90
--- /dev/null
+++ b/third_party/rust/futures-executor/src/local_pool.rs
@@ -0,0 +1,402 @@
+use crate::enter;
+use futures_core::future::Future;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use futures_task::{waker_ref, ArcWake};
+use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
+use futures_util::pin_mut;
+use futures_util::stream::FuturesUnordered;
+use futures_util::stream::StreamExt;
+use std::cell::RefCell;
+use std::ops::{Deref, DerefMut};
+use std::rc::{Rc, Weak};
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
+use std::thread::{self, Thread};
+
+/// A single-threaded task pool for polling futures to completion.
+///
+/// This executor allows you to multiplex any number of tasks onto a single
+/// thread. It's appropriate to poll strictly I/O-bound futures that do very
+/// little work in between I/O actions.
+///
+/// To get a handle to the pool that implements
+/// [`Spawn`](futures_task::Spawn), use the
+/// [`spawner()`](LocalPool::spawner) method. Because the executor is
+/// single-threaded, it supports a special form of task spawning for non-`Send`
+/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
+#[derive(Debug)]
+pub struct LocalPool {
+ pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
+ incoming: Rc<Incoming>,
+}
+
+/// A handle to a [`LocalPool`](LocalPool) that implements
+/// [`Spawn`](futures_task::Spawn).
+#[derive(Clone, Debug)]
+pub struct LocalSpawner {
+ incoming: Weak<Incoming>,
+}
+
+type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
+
+pub(crate) struct ThreadNotify {
+ /// The (single) executor thread.
+ thread: Thread,
+ /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
+ /// before the next `park()`, which may otherwise happen if the code
+ /// being executed as part of the future(s) being polled makes use of
+ /// park / unpark calls of its own, i.e. we cannot assume that no other
+ /// code uses park / unpark on the executing `thread`.
+ unparked: AtomicBool,
+}
+
+thread_local! {
+ static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
+ thread: thread::current(),
+ unparked: AtomicBool::new(false),
+ });
+}
+
+impl ArcWake for ThreadNotify {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ // Make sure the wakeup is remembered until the next `park()`.
+ let unparked = arc_self.unparked.swap(true, Ordering::Release);
+ if !unparked {
+ // If the thread has not been unparked yet, it must be done
+ // now. If it was actually parked, it will run again,
+ // otherwise the token made available by `unpark`
+ // may be consumed before reaching `park()`, but `unparked`
+ // ensures it is not forgotten.
+ arc_self.thread.unpark();
+ }
+ }
+}
+
+// Set up and run a basic single-threaded spawner loop, invoking `f` on each
+// turn.
+fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
+ let _enter = enter().expect(
+ "cannot execute `LocalPool` executor from within \
+ another executor",
+ );
+
+ CURRENT_THREAD_NOTIFY.with(|thread_notify| {
+ let waker = waker_ref(thread_notify);
+ let mut cx = Context::from_waker(&waker);
+ loop {
+ if let Poll::Ready(t) = f(&mut cx) {
+ return t;
+ }
+
+ // Wait for a wakeup.
+ while !thread_notify.unparked.swap(false, Ordering::Acquire) {
+ // No wakeup occurred. It may occur now, right before parking,
+ // but in that case the token made available by `unpark()`
+ // is guaranteed to still be available and `park()` is a no-op.
+ thread::park();
+ }
+ }
+ })
+}
+
+/// Check for a wakeup, but don't consume it.
+fn woken() -> bool {
+ CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
+}
+
+impl LocalPool {
+ /// Create a new, empty pool of tasks.
+ pub fn new() -> Self {
+ Self { pool: FuturesUnordered::new(), incoming: Default::default() }
+ }
+
+ /// Get a clonable handle to the pool as a [`Spawn`].
+ pub fn spawner(&self) -> LocalSpawner {
+ LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
+ }
+
+ /// Run all tasks in the pool to completion.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ ///
+ /// let mut pool = LocalPool::new();
+ ///
+ /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
+ ///
+ /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
+ /// pool.run();
+ /// ```
+ ///
+ /// The function will block the calling thread until *all* tasks in the pool
+ /// are complete, including any spawned while running existing tasks.
+ pub fn run(&mut self) {
+ run_executor(|cx| self.poll_pool(cx))
+ }
+
+ /// Runs all the tasks in the pool until the given future completes.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ ///
+ /// let mut pool = LocalPool::new();
+ /// # let my_app = async {};
+ ///
+ /// // run tasks in the pool until `my_app` completes
+ /// pool.run_until(my_app);
+ /// ```
+ ///
+ /// The function will block the calling thread *only* until the future `f`
+ /// completes; there may still be incomplete tasks in the pool, which will
+ /// be inert after the call completes, but can continue with further use of
+ /// one of the pool's run or poll methods. While the function is running,
+ /// however, all tasks in the pool will try to make progress.
+ pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
+ pin_mut!(future);
+
+ run_executor(|cx| {
+ {
+ // if our main task is done, so are we
+ let result = future.as_mut().poll(cx);
+ if let Poll::Ready(output) = result {
+ return Poll::Ready(output);
+ }
+ }
+
+ let _ = self.poll_pool(cx);
+ Poll::Pending
+ })
+ }
+
+ /// Runs all tasks and returns after completing one future or until no more progress
+ /// can be made. Returns `true` if one future was completed, `false` otherwise.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ /// use futures::task::LocalSpawnExt;
+ /// use futures::future::{ready, pending};
+ ///
+ /// let mut pool = LocalPool::new();
+ /// let spawner = pool.spawner();
+ ///
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(pending()).unwrap();
+ ///
+ /// // Run the two ready tasks and return true for them.
+ /// pool.try_run_one(); // returns true after completing one of the ready futures
+ /// pool.try_run_one(); // returns true after completing the other ready future
+ ///
+ /// // the remaining task can not be completed
+ /// assert!(!pool.try_run_one()); // returns false
+ /// ```
+ ///
+ /// This function will not block the calling thread and will return the moment
+ /// that there are no tasks left for which progress can be made or after exactly one
+ /// task was completed; Remaining incomplete tasks in the pool can continue with
+ /// further use of one of the pool's run or poll methods.
+ /// Though only one task will be completed, progress may be made on multiple tasks.
+ pub fn try_run_one(&mut self) -> bool {
+ run_executor(|cx| {
+ loop {
+ self.drain_incoming();
+
+ match self.pool.poll_next_unpin(cx) {
+ // Success!
+ Poll::Ready(Some(())) => return Poll::Ready(true),
+ // The pool was empty.
+ Poll::Ready(None) => return Poll::Ready(false),
+ Poll::Pending => (),
+ }
+
+ if !self.incoming.borrow().is_empty() {
+ // New tasks were spawned; try again.
+ continue;
+ } else if woken() {
+ // The pool yielded to us, but there's more progress to be made.
+ return Poll::Pending;
+ } else {
+ return Poll::Ready(false);
+ }
+ }
+ })
+ }
+
+ /// Runs all tasks in the pool and returns if no more progress can be made
+ /// on any task.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ /// use futures::task::LocalSpawnExt;
+ /// use futures::future::{ready, pending};
+ ///
+ /// let mut pool = LocalPool::new();
+ /// let spawner = pool.spawner();
+ ///
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(pending()).unwrap();
+ ///
+ /// // Runs the two ready task and returns.
+ /// // The empty task remains in the pool.
+ /// pool.run_until_stalled();
+ /// ```
+ ///
+ /// This function will not block the calling thread and will return the moment
+ /// that there are no tasks left for which progress can be made;
+ /// remaining incomplete tasks in the pool can continue with further use of one
+ /// of the pool's run or poll methods. While the function is running, all tasks
+ /// in the pool will try to make progress.
+ pub fn run_until_stalled(&mut self) {
+ run_executor(|cx| match self.poll_pool(cx) {
+ // The pool is empty.
+ Poll::Ready(()) => Poll::Ready(()),
+ Poll::Pending => {
+ if woken() {
+ Poll::Pending
+ } else {
+ // We're stalled for now.
+ Poll::Ready(())
+ }
+ }
+ });
+ }
+
+ /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
+ /// Repeat until either the pool is empty, or it returns `Pending`.
+ ///
+ /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
+ ///
+ /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
+ /// mean that the pool can't make progress.
+ fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ loop {
+ self.drain_incoming();
+
+ let pool_ret = self.pool.poll_next_unpin(cx);
+
+ // We queued up some new tasks; add them and poll again.
+ if !self.incoming.borrow().is_empty() {
+ continue;
+ }
+
+ match pool_ret {
+ Poll::Ready(Some(())) => continue,
+ Poll::Ready(None) => return Poll::Ready(()),
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+
+ /// Empty the incoming queue of newly-spawned tasks.
+ fn drain_incoming(&mut self) {
+ let mut incoming = self.incoming.borrow_mut();
+ for task in incoming.drain(..) {
+ self.pool.push(task)
+ }
+ }
+}
+
+impl Default for LocalPool {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Run a future to completion on the current thread.
+///
+/// This function will block the caller until the given future has completed.
+///
+/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
+/// spawned tasks.
+pub fn block_on<F: Future>(f: F) -> F::Output {
+ pin_mut!(f);
+ run_executor(|cx| f.as_mut().poll(cx))
+}
+
+/// Turn a stream into a blocking iterator.
+///
+/// When `next` is called on the resulting `BlockingStream`, the caller
+/// will be blocked until the next element of the `Stream` becomes available.
+pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
+ BlockingStream { stream }
+}
+
+/// An iterator which blocks on values from a stream until they become available.
+#[derive(Debug)]
+pub struct BlockingStream<S: Stream + Unpin> {
+ stream: S,
+}
+
+impl<S: Stream + Unpin> Deref for BlockingStream<S> {
+ type Target = S;
+ fn deref(&self) -> &Self::Target {
+ &self.stream
+ }
+}
+
+impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.stream
+ }
+}
+
+impl<S: Stream + Unpin> BlockingStream<S> {
+ /// Convert this `BlockingStream` into the inner `Stream` type.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
+ type Item = S::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ LocalPool::new().run_until(self.stream.next())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
+
+impl Spawn for LocalSpawner {
+ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
+ if let Some(incoming) = self.incoming.upgrade() {
+ incoming.borrow_mut().push(future.into());
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ if self.incoming.upgrade().is_some() {
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+}
+
+impl LocalSpawn for LocalSpawner {
+ fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
+ if let Some(incoming) = self.incoming.upgrade() {
+ incoming.borrow_mut().push(future);
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+
+ fn status_local(&self) -> Result<(), SpawnError> {
+ if self.incoming.upgrade().is_some() {
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+}
diff --git a/third_party/rust/futures-executor/src/thread_pool.rs b/third_party/rust/futures-executor/src/thread_pool.rs
new file mode 100644
index 0000000000..5371008953
--- /dev/null
+++ b/third_party/rust/futures-executor/src/thread_pool.rs
@@ -0,0 +1,380 @@
+use crate::enter;
+use crate::unpark_mutex::UnparkMutex;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_task::{waker_ref, ArcWake};
+use futures_task::{FutureObj, Spawn, SpawnError};
+use futures_util::future::FutureExt;
+use std::cmp;
+use std::fmt;
+use std::io;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc;
+use std::sync::{Arc, Mutex};
+use std::thread;
+
+/// A general-purpose thread pool for scheduling tasks that poll futures to
+/// completion.
+///
+/// The thread pool multiplexes any number of tasks onto a fixed number of
+/// worker threads.
+///
+/// This type is a clonable handle to the threadpool itself.
+/// Cloning it will only create a new reference, not a new threadpool.
+///
+/// This type is only available when the `thread-pool` feature of this
+/// library is activated.
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+pub struct ThreadPool {
+ state: Arc<PoolState>,
+}
+
+/// Thread pool configuration object.
+///
+/// This type is only available when the `thread-pool` feature of this
+/// library is activated.
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+pub struct ThreadPoolBuilder {
+ pool_size: usize,
+ stack_size: usize,
+ name_prefix: Option<String>,
+ after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+ before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+}
+
+trait AssertSendSync: Send + Sync {}
+impl AssertSendSync for ThreadPool {}
+
+struct PoolState {
+ tx: Mutex<mpsc::Sender<Message>>,
+ rx: Mutex<mpsc::Receiver<Message>>,
+ cnt: AtomicUsize,
+ size: usize,
+}
+
+impl fmt::Debug for ThreadPool {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ThreadPool").field("size", &self.state.size).finish()
+ }
+}
+
+impl fmt::Debug for ThreadPoolBuilder {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ThreadPoolBuilder")
+ .field("pool_size", &self.pool_size)
+ .field("name_prefix", &self.name_prefix)
+ .finish()
+ }
+}
+
+enum Message {
+ Run(Task),
+ Close,
+}
+
+impl ThreadPool {
+ /// Creates a new thread pool with the default configuration.
+ ///
+ /// See documentation for the methods in
+ /// [`ThreadPoolBuilder`](ThreadPoolBuilder) for details on the default
+ /// configuration.
+ pub fn new() -> Result<Self, io::Error> {
+ ThreadPoolBuilder::new().create()
+ }
+
+ /// Create a default thread pool configuration, which can then be customized.
+ ///
+ /// See documentation for the methods in
+ /// [`ThreadPoolBuilder`](ThreadPoolBuilder) for details on the default
+ /// configuration.
+ pub fn builder() -> ThreadPoolBuilder {
+ ThreadPoolBuilder::new()
+ }
+
+ /// Spawns a future that will be run to completion.
+ ///
+ /// > **Note**: This method is similar to `Spawn::spawn_obj`, except that
+ /// > it is guaranteed to always succeed.
+ pub fn spawn_obj_ok(&self, future: FutureObj<'static, ()>) {
+ let task = Task {
+ future,
+ wake_handle: Arc::new(WakeHandle { exec: self.clone(), mutex: UnparkMutex::new() }),
+ exec: self.clone(),
+ };
+ self.state.send(Message::Run(task));
+ }
+
+ /// Spawns a task that polls the given future with output `()` to
+ /// completion.
+ ///
+ /// ```
+ /// # {
+ /// use futures::executor::ThreadPool;
+ ///
+ /// let pool = ThreadPool::new().unwrap();
+ ///
+ /// let future = async { /* ... */ };
+ /// pool.spawn_ok(future);
+ /// # }
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+ /// ```
+ ///
+ /// > **Note**: This method is similar to `SpawnExt::spawn`, except that
+ /// > it is guaranteed to always succeed.
+ pub fn spawn_ok<Fut>(&self, future: Fut)
+ where
+ Fut: Future<Output = ()> + Send + 'static,
+ {
+ self.spawn_obj_ok(FutureObj::new(Box::new(future)))
+ }
+}
+
+impl Spawn for ThreadPool {
+ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
+ self.spawn_obj_ok(future);
+ Ok(())
+ }
+}
+
+impl PoolState {
+ fn send(&self, msg: Message) {
+ self.tx.lock().unwrap().send(msg).unwrap();
+ }
+
+ fn work(
+ &self,
+ idx: usize,
+ after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+ before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+ ) {
+ let _scope = enter().unwrap();
+ if let Some(after_start) = after_start {
+ after_start(idx);
+ }
+ loop {
+ let msg = self.rx.lock().unwrap().recv().unwrap();
+ match msg {
+ Message::Run(task) => task.run(),
+ Message::Close => break,
+ }
+ }
+ if let Some(before_stop) = before_stop {
+ before_stop(idx);
+ }
+ }
+}
+
+impl Clone for ThreadPool {
+ fn clone(&self) -> Self {
+ self.state.cnt.fetch_add(1, Ordering::Relaxed);
+ Self { state: self.state.clone() }
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ if self.state.cnt.fetch_sub(1, Ordering::Relaxed) == 1 {
+ for _ in 0..self.state.size {
+ self.state.send(Message::Close);
+ }
+ }
+ }
+}
+
+impl ThreadPoolBuilder {
+ /// Create a default thread pool configuration.
+ ///
+ /// See the other methods on this type for details on the defaults.
+ pub fn new() -> Self {
+ Self {
+ pool_size: cmp::max(1, num_cpus::get()),
+ stack_size: 0,
+ name_prefix: None,
+ after_start: None,
+ before_stop: None,
+ }
+ }
+
+ /// Set size of a future ThreadPool
+ ///
+ /// The size of a thread pool is the number of worker threads spawned. By
+ /// default, this is equal to the number of CPU cores.
+ ///
+ /// # Panics
+ ///
+ /// Panics if `pool_size == 0`.
+ pub fn pool_size(&mut self, size: usize) -> &mut Self {
+ assert!(size > 0);
+ self.pool_size = size;
+ self
+ }
+
+ /// Set stack size of threads in the pool, in bytes.
+ ///
+ /// By default, worker threads use Rust's standard stack size.
+ pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
+ self.stack_size = stack_size;
+ self
+ }
+
+ /// Set thread name prefix of a future ThreadPool.
+ ///
+ /// Thread name prefix is used for generating thread names. For example, if prefix is
+ /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc.
+ ///
+ /// By default, worker threads are assigned Rust's standard thread name.
+ pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
+ self.name_prefix = Some(name_prefix.into());
+ self
+ }
+
+ /// Execute the closure `f` immediately after each worker thread is started,
+ /// but before running any tasks on it.
+ ///
+ /// This hook is intended for bookkeeping and monitoring.
+ /// The closure `f` will be dropped after the `builder` is dropped
+ /// and all worker threads in the pool have executed it.
+ ///
+ /// The closure provided will receive an index corresponding to the worker
+ /// thread it's running on.
+ pub fn after_start<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(usize) + Send + Sync + 'static,
+ {
+ self.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Execute closure `f` just prior to shutting down each worker thread.
+ ///
+ /// This hook is intended for bookkeeping and monitoring.
+ /// The closure `f` will be dropped after the `builder` is dropped
+ /// and all threads in the pool have executed it.
+ ///
+ /// The closure provided will receive an index corresponding to the worker
+ /// thread it's running on.
+ pub fn before_stop<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(usize) + Send + Sync + 'static,
+ {
+ self.before_stop = Some(Arc::new(f));
+ self
+ }
+
+ /// Create a [`ThreadPool`](ThreadPool) with the given configuration.
+ pub fn create(&mut self) -> Result<ThreadPool, io::Error> {
+ let (tx, rx) = mpsc::channel();
+ let pool = ThreadPool {
+ state: Arc::new(PoolState {
+ tx: Mutex::new(tx),
+ rx: Mutex::new(rx),
+ cnt: AtomicUsize::new(1),
+ size: self.pool_size,
+ }),
+ };
+
+ for counter in 0..self.pool_size {
+ let state = pool.state.clone();
+ let after_start = self.after_start.clone();
+ let before_stop = self.before_stop.clone();
+ let mut thread_builder = thread::Builder::new();
+ if let Some(ref name_prefix) = self.name_prefix {
+ thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter));
+ }
+ if self.stack_size > 0 {
+ thread_builder = thread_builder.stack_size(self.stack_size);
+ }
+ thread_builder.spawn(move || state.work(counter, after_start, before_stop))?;
+ }
+ Ok(pool)
+ }
+}
+
+impl Default for ThreadPoolBuilder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// A task responsible for polling a future to completion.
+struct Task {
+ future: FutureObj<'static, ()>,
+ exec: ThreadPool,
+ wake_handle: Arc<WakeHandle>,
+}
+
+struct WakeHandle {
+ mutex: UnparkMutex<Task>,
+ exec: ThreadPool,
+}
+
+impl Task {
+ /// Actually run the task (invoking `poll` on the future) on the current
+ /// thread.
+ fn run(self) {
+ let Self { mut future, wake_handle, mut exec } = self;
+ let waker = waker_ref(&wake_handle);
+ let mut cx = Context::from_waker(&waker);
+
+ // Safety: The ownership of this `Task` object is evidence that
+ // we are in the `POLLING`/`REPOLL` state for the mutex.
+ unsafe {
+ wake_handle.mutex.start_poll();
+
+ loop {
+ let res = future.poll_unpin(&mut cx);
+ match res {
+ Poll::Pending => {}
+ Poll::Ready(()) => return wake_handle.mutex.complete(),
+ }
+ let task = Self { future, wake_handle: wake_handle.clone(), exec };
+ match wake_handle.mutex.wait(task) {
+ Ok(()) => return, // we've waited
+ Err(task) => {
+ // someone's notified us
+ future = task.future;
+ exec = task.exec;
+ }
+ }
+ }
+ }
+ }
+}
+
+impl fmt::Debug for Task {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Task").field("contents", &"...").finish()
+ }
+}
+
+impl ArcWake for WakeHandle {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ if let Ok(task) = arc_self.mutex.notify() {
+ arc_self.exec.state.send(Message::Run(task))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::mpsc;
+
+ #[test]
+ fn test_drop_after_start() {
+ {
+ let (tx, rx) = mpsc::sync_channel(2);
+ let _cpu_pool = ThreadPoolBuilder::new()
+ .pool_size(2)
+ .after_start(move |_| tx.send(1).unwrap())
+ .create()
+ .unwrap();
+
+ // After ThreadPoolBuilder is deconstructed, the tx should be dropped
+ // so that we can use rx as an iterator.
+ let count = rx.into_iter().count();
+ assert_eq!(count, 2);
+ }
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+ }
+}
diff --git a/third_party/rust/futures-executor/src/unpark_mutex.rs b/third_party/rust/futures-executor/src/unpark_mutex.rs
new file mode 100644
index 0000000000..ac5112cfa2
--- /dev/null
+++ b/third_party/rust/futures-executor/src/unpark_mutex.rs
@@ -0,0 +1,137 @@
+use std::cell::UnsafeCell;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+/// A "lock" around data `D`, which employs a *helping* strategy.
+///
+/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being
+/// invoked on only a single thread at a time (2) `poll` being invoked at least
+/// once after each `unpark` (unless the future has completed).
+pub(crate) struct UnparkMutex<D> {
+ // The state of task execution (state machine described below)
+ status: AtomicUsize,
+
+ // The actual task data, accessible only in the POLLING state
+ inner: UnsafeCell<Option<D>>,
+}
+
+// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on
+// acquisition failure, the current lock holder performs the desired work --
+// re-polling.
+//
+// As such, these impls mirror those for `Mutex<D>`. In particular, a reference
+// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which
+// must therefore be `Send`.
+unsafe impl<D: Send> Send for UnparkMutex<D> {}
+unsafe impl<D: Send> Sync for UnparkMutex<D> {}
+
+// There are four possible task states, listed below with their possible
+// transitions:
+
+// The task is blocked, waiting on an event
+const WAITING: usize = 0; // --> POLLING
+
+// The task is actively being polled by a thread; arrival of additional events
+// of interest should move it to the REPOLL state
+const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE
+
+// The task is actively being polled, but will need to be re-polled upon
+// completion to ensure that all events were observed.
+const REPOLL: usize = 2; // --> POLLING
+
+// The task has finished executing (either successfully or with an error/panic)
+const COMPLETE: usize = 3; // No transitions out
+
+impl<D> UnparkMutex<D> {
+ pub(crate) fn new() -> Self {
+ Self { status: AtomicUsize::new(WAITING), inner: UnsafeCell::new(None) }
+ }
+
+ /// Attempt to "notify" the mutex that a poll should occur.
+ ///
+ /// An `Ok` result indicates that the `POLLING` state has been entered, and
+ /// the caller can proceed to poll the future. An `Err` result indicates
+ /// that polling is not necessary (because the task is finished or the
+ /// polling has been delegated).
+ pub(crate) fn notify(&self) -> Result<D, ()> {
+ let mut status = self.status.load(SeqCst);
+ loop {
+ match status {
+ // The task is idle, so try to run it immediately.
+ WAITING => {
+ match self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst) {
+ Ok(_) => {
+ let data = unsafe {
+ // SAFETY: we've ensured mutual exclusion via
+ // the status protocol; we are the only thread
+ // that has transitioned to the POLLING state,
+ // and we won't transition back to QUEUED until
+ // the lock is "released" by this thread. See
+ // the protocol diagram above.
+ (*self.inner.get()).take().unwrap()
+ };
+ return Ok(data);
+ }
+ Err(cur) => status = cur,
+ }
+ }
+
+ // The task is being polled, so we need to record that it should
+ // be *repolled* when complete.
+ POLLING => match self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst) {
+ Ok(_) => return Err(()),
+ Err(cur) => status = cur,
+ },
+
+ // The task is already scheduled for polling, or is complete, so
+ // we've got nothing to do.
+ _ => return Err(()),
+ }
+ }
+ }
+
+ /// Alert the mutex that polling is about to begin, clearing any accumulated
+ /// re-poll requests.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn start_poll(&self) {
+ self.status.store(POLLING, SeqCst);
+ }
+
+ /// Alert the mutex that polling completed with `Pending`.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> {
+ *self.inner.get() = Some(data);
+
+ match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) {
+ // no unparks came in while we were running
+ Ok(_) => Ok(()),
+
+ // guaranteed to be in REPOLL state; just clobber the
+ // state and run again.
+ Err(status) => {
+ assert_eq!(status, REPOLL);
+ self.status.store(POLLING, SeqCst);
+ Err((*self.inner.get()).take().unwrap())
+ }
+ }
+ }
+
+ /// Alert the mutex that the task has completed execution and should not be
+ /// notified again.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn complete(&self) {
+ self.status.store(COMPLETE, SeqCst);
+ }
+}
diff --git a/third_party/rust/futures-executor/tests/local_pool.rs b/third_party/rust/futures-executor/tests/local_pool.rs
new file mode 100644
index 0000000000..72ce74b744
--- /dev/null
+++ b/third_party/rust/futures-executor/tests/local_pool.rs
@@ -0,0 +1,496 @@
+use futures::channel::oneshot;
+use futures::executor::LocalPool;
+use futures::future::{self, lazy, poll_fn, Future};
+use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
+use std::cell::{Cell, RefCell};
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+struct Pending(Rc<()>);
+
+impl Future for Pending {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Pending
+ }
+}
+
+fn pending() -> Pending {
+ Pending(Rc::new(()))
+}
+
+#[test]
+fn run_until_single_future() {
+ let mut cnt = 0;
+
+ {
+ let mut pool = LocalPool::new();
+ let fut = lazy(|_| {
+ cnt += 1;
+ });
+ pool.run_until(fut);
+ }
+
+ assert_eq!(cnt, 1);
+}
+
+#[test]
+fn run_until_ignores_spawned() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+ pool.run_until(lazy(|_| ()));
+}
+
+#[test]
+fn run_until_executes_spawned() {
+ let (tx, rx) = oneshot::channel();
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ tx.send(()).unwrap();
+ }))
+ .into(),
+ )
+ .unwrap();
+ pool.run_until(rx).unwrap();
+}
+
+#[test]
+fn run_returns_if_empty() {
+ let mut pool = LocalPool::new();
+ pool.run();
+ pool.run();
+}
+
+#[test]
+fn run_executes_spawned() {
+ let cnt = Rc::new(Cell::new(0));
+ let cnt2 = cnt.clone();
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ let spawn2 = pool.spawner();
+
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ spawn2
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt2.set(cnt2.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.run();
+
+ assert_eq!(cnt.get(), 1);
+}
+
+#[test]
+fn run_spawn_many() {
+ const ITER: usize = 200;
+
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ for _ in 0..ITER {
+ let cnt = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+ }
+
+ pool.run();
+
+ assert_eq!(cnt.get(), ITER);
+}
+
+#[test]
+fn try_run_one_returns_if_empty() {
+ let mut pool = LocalPool::new();
+ assert!(!pool.try_run_one());
+}
+
+#[test]
+fn try_run_one_executes_one_ready() {
+ const ITER: usize = 200;
+
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ for _ in 0..ITER {
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+
+ let cnt = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+ }
+
+ for i in 0..ITER {
+ assert_eq!(cnt.get(), i);
+ assert!(pool.try_run_one());
+ assert_eq!(cnt.get(), i + 1);
+ }
+ assert!(!pool.try_run_one());
+}
+
+#[test]
+fn try_run_one_returns_on_no_progress() {
+ const ITER: usize = 10;
+
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
+ {
+ let cnt = cnt.clone();
+ let waker = waker.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |ctx| {
+ cnt.set(cnt.get() + 1);
+ waker.set(Some(ctx.waker().clone()));
+ if cnt.get() == ITER {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ }))
+ .into(),
+ )
+ .unwrap();
+ }
+
+ for i in 0..ITER - 1 {
+ assert_eq!(cnt.get(), i);
+ assert!(!pool.try_run_one());
+ assert_eq!(cnt.get(), i + 1);
+ let w = waker.take();
+ assert!(w.is_some());
+ w.unwrap().wake();
+ }
+ assert!(pool.try_run_one());
+ assert_eq!(cnt.get(), ITER);
+}
+
+#[test]
+fn try_run_one_runs_sub_futures() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ let cnt = Rc::new(Cell::new(0));
+
+ let inner_spawner = spawn.clone();
+ let cnt1 = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |_| {
+ cnt1.set(cnt1.get() + 1);
+
+ let cnt2 = cnt1.clone();
+ inner_spawner
+ .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
+ .unwrap();
+
+ Poll::Pending
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.try_run_one();
+ assert_eq!(cnt.get(), 2);
+}
+
+#[test]
+fn run_until_stalled_returns_if_empty() {
+ let mut pool = LocalPool::new();
+ pool.run_until_stalled();
+ pool.run_until_stalled();
+}
+
+#[test]
+fn run_until_stalled_returns_multiple_times() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ let cnt = Rc::new(Cell::new(0));
+
+ let cnt1 = cnt.clone();
+ spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
+ pool.run_until_stalled();
+ assert_eq!(cnt.get(), 1);
+
+ let cnt2 = cnt.clone();
+ spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
+ pool.run_until_stalled();
+ assert_eq!(cnt.get(), 2);
+}
+
+#[test]
+fn run_until_stalled_runs_spawned_sub_futures() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ let cnt = Rc::new(Cell::new(0));
+
+ let inner_spawner = spawn.clone();
+ let cnt1 = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |_| {
+ cnt1.set(cnt1.get() + 1);
+
+ let cnt2 = cnt1.clone();
+ inner_spawner
+ .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
+ .unwrap();
+
+ Poll::Pending
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.run_until_stalled();
+ assert_eq!(cnt.get(), 2);
+}
+
+#[test]
+fn run_until_stalled_executes_all_ready() {
+ const ITER: usize = if cfg!(miri) { 50 } else { 200 };
+ const PER_ITER: usize = 3;
+
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ for i in 0..ITER {
+ for _ in 0..PER_ITER {
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+
+ let cnt = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ // also add some pending tasks to test if they are ignored
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+ }
+ assert_eq!(cnt.get(), i * PER_ITER);
+ pool.run_until_stalled();
+ assert_eq!(cnt.get(), (i + 1) * PER_ITER);
+ }
+}
+
+#[test]
+#[should_panic]
+fn nesting_run() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ spawn
+ .spawn_obj(
+ Box::pin(lazy(|_| {
+ let mut pool = LocalPool::new();
+ pool.run();
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.run();
+}
+
+#[test]
+#[should_panic]
+fn nesting_run_run_until_stalled() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ spawn
+ .spawn_obj(
+ Box::pin(lazy(|_| {
+ let mut pool = LocalPool::new();
+ pool.run_until_stalled();
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.run();
+}
+
+#[test]
+fn tasks_are_scheduled_fairly() {
+ let state = Rc::new(RefCell::new([0, 0]));
+
+ struct Spin {
+ state: Rc<RefCell<[i32; 2]>>,
+ idx: usize,
+ }
+
+ impl Future for Spin {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ let mut state = self.state.borrow_mut();
+
+ if self.idx == 0 {
+ let diff = state[0] - state[1];
+
+ assert!(diff.abs() <= 1);
+
+ if state[0] >= 50 {
+ return Poll::Ready(());
+ }
+ }
+
+ state[self.idx] += 1;
+
+ if state[self.idx] >= 100 {
+ return Poll::Ready(());
+ }
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
+
+ spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
+
+ pool.run();
+}
+
+// Tests that the use of park/unpark in user-code has no
+// effect on the expected behavior of the executor.
+#[test]
+fn park_unpark_independence() {
+ let mut done = false;
+
+ let future = future::poll_fn(move |cx| {
+ if done {
+ return Poll::Ready(());
+ }
+ done = true;
+ cx.waker().clone().wake(); // (*)
+ // some user-code that temporarily parks the thread
+ let test = thread::current();
+ let latch = Arc::new(AtomicBool::new(false));
+ let signal = latch.clone();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(10));
+ signal.store(true, Ordering::SeqCst);
+ test.unpark()
+ });
+ while !latch.load(Ordering::Relaxed) {
+ thread::park();
+ }
+ Poll::Pending // Expect to be called again due to (*).
+ });
+
+ futures::executor::block_on(future)
+}
+
+struct SelfWaking {
+ wakeups_remaining: Rc<RefCell<usize>>,
+}
+
+impl Future for SelfWaking {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if *self.wakeups_remaining.borrow() != 0 {
+ *self.wakeups_remaining.borrow_mut() -= 1;
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ }
+}
+
+/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
+///
+/// The issue was that self-waking futures could cause `run_until_stalled`
+/// to exit early, even when progress could still be made.
+#[test]
+fn self_waking_run_until_stalled() {
+ let wakeups_remaining = Rc::new(RefCell::new(10));
+
+ let mut pool = LocalPool::new();
+ let spawner = pool.spawner();
+ for _ in 0..3 {
+ let wakeups_remaining = Rc::clone(&wakeups_remaining);
+ spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
+ }
+
+ // This should keep polling until there are no more wakeups.
+ pool.run_until_stalled();
+
+ assert_eq!(*wakeups_remaining.borrow(), 0);
+}
+
+/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
+///
+/// The issue was that self-waking futures could cause `try_run_one`
+/// to exit early, even when progress could still be made.
+#[test]
+fn self_waking_try_run_one() {
+ let wakeups_remaining = Rc::new(RefCell::new(10));
+
+ let mut pool = LocalPool::new();
+ let spawner = pool.spawner();
+ for _ in 0..3 {
+ let wakeups_remaining = Rc::clone(&wakeups_remaining);
+ spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
+ }
+
+ spawner.spawn(future::ready(())).unwrap();
+
+ // The `ready` future should complete.
+ assert!(pool.try_run_one());
+
+ // The self-waking futures are each polled once.
+ assert_eq!(*wakeups_remaining.borrow(), 7);
+}