summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool')
-rw-r--r--third_party/rust/tokio-threadpool/.cargo-checksum.json1
-rw-r--r--third_party/rust/tokio-threadpool/CHANGELOG.md98
-rw-r--r--third_party/rust/tokio-threadpool/Cargo.lock290
-rw-r--r--third_party/rust/tokio-threadpool/Cargo.toml61
-rw-r--r--third_party/rust/tokio-threadpool/LICENSE25
-rw-r--r--third_party/rust/tokio-threadpool/README.md56
-rw-r--r--third_party/rust/tokio-threadpool/benches/basic.rs165
-rw-r--r--third_party/rust/tokio-threadpool/benches/blocking.rs137
-rw-r--r--third_party/rust/tokio-threadpool/benches/depth.rs76
-rw-r--r--third_party/rust/tokio-threadpool/examples/depth.rs48
-rw-r--r--third_party/rust/tokio-threadpool/examples/hello.rs24
-rw-r--r--third_party/rust/tokio-threadpool/src/blocking/global.rs218
-rw-r--r--third_party/rust/tokio-threadpool/src/blocking/mod.rs88
-rw-r--r--third_party/rust/tokio-threadpool/src/builder.rs476
-rw-r--r--third_party/rust/tokio-threadpool/src/callback.rs30
-rw-r--r--third_party/rust/tokio-threadpool/src/config.rs34
-rw-r--r--third_party/rust/tokio-threadpool/src/lib.rs164
-rw-r--r--third_party/rust/tokio-threadpool/src/notifier.rs92
-rw-r--r--third_party/rust/tokio-threadpool/src/park/boxed.rs45
-rw-r--r--third_party/rust/tokio-threadpool/src/park/default_park.rs97
-rw-r--r--third_party/rust/tokio-threadpool/src/park/mod.rs8
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/backup.rs308
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/backup_stack.rs191
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/mod.rs475
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/state.rs132
-rw-r--r--third_party/rust/tokio-threadpool/src/sender.rs218
-rw-r--r--third_party/rust/tokio-threadpool/src/shutdown.rs103
-rw-r--r--third_party/rust/tokio-threadpool/src/task/blocking.rs496
-rw-r--r--third_party/rust/tokio-threadpool/src/task/blocking_state.rs89
-rw-r--r--third_party/rust/tokio-threadpool/src/task/mod.rs308
-rw-r--r--third_party/rust/tokio-threadpool/src/task/state.rs57
-rw-r--r--third_party/rust/tokio-threadpool/src/thread_pool.rs217
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/entry.rs330
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/mod.rs797
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/stack.rs260
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/state.rs153
-rw-r--r--third_party/rust/tokio-threadpool/tests/blocking.rs421
-rw-r--r--third_party/rust/tokio-threadpool/tests/hammer.rs101
-rw-r--r--third_party/rust/tokio-threadpool/tests/threadpool.rs555
39 files changed, 7444 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/.cargo-checksum.json b/third_party/rust/tokio-threadpool/.cargo-checksum.json
new file mode 100644
index 0000000000..8f235ccfd3
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/.cargo-checksum.json
@@ -0,0 +1 @@
+{"files":{"CHANGELOG.md":"6c6384e964eb407567182ff80e9eea32b597df599f387cd1c5991479716bf68f","Cargo.lock":"557a52b0b51573b515fceb1ff91ccf838a010fae7d5d840671ef03c35453ffd6","Cargo.toml":"50a9f8055b0c10352ba0212ff719d45e4f0f71aaa95eda2df1a3e1f6e3ac40bd","LICENSE":"898b1ae9821e98daf8964c8d6c7f61641f5f5aa78ad500020771c0939ee0dea1","README.md":"87c45bc5ea9337e0048e1ecbc6fd31d485396c12ca041816d728fee7df8dba3a","benches/basic.rs":"2ddbbba8fe38ded4ff2bcf57d0440905dfccafb56e3ef3e1ca31f5ce2f752fd1","benches/blocking.rs":"5f0b14d8fdfadd9e8503fe73a9616112b5eb274ccf10fc5343164648ddc28c6b","benches/depth.rs":"4c5b7a616273591b0dd68860d939be5383f7abf9fc1a3b06954ecbb32106d4a4","examples/depth.rs":"66e1548e3b22c4f4db231e1f56e7d760fbb2afba631022d934adc2843b089a6d","examples/hello.rs":"3dcd089b9103abf96b65ae1e86c02af14a10924aae1a5bf0772229360c9f9a61","src/blocking/global.rs":"9397f8bb44bb651619ad74fce96f44c6d7b4bde86e0f5478f4948212c4b8e501","src/blocking/mod.rs":"25e30ba6d263d9c51117d760ca53da8b0ceedc4d23638e236389c57789088b8b","src/builder.rs":"5ef2077f34b793641a3e72ac739f8f8d49fb72f6028faf8f9c436214474ae95b","src/callback.rs":"1de9db5541545d0a53245c928706715d34f9c5931c91223ad5de4b691be00ab7","src/config.rs":"6e7c470e8639df7eab03b5f504e3cede5787920c87011e63dc4497390a618406","src/lib.rs":"a00bd8fbfdac2067d57d6b3759352282d48529cc5a7d6417ed31fef3b11fe925","src/notifier.rs":"3f127a0495958f47ef747f537923da49adf080e74577dcd209da7db8a8549406","src/park/boxed.rs":"b1b98737f2ff20f1cfda44c42de14e5caf9bfe436ac85a116368fd1d7327c77a","src/park/default_park.rs":"b215774f69cdf141f4795906b35b5b76f5b2516ff6edbe382d490c940a7dfcca","src/park/mod.rs":"63b391ff690d98088bf586278e0b9c735cb551472c0fc85bc0238433bd0aad76","src/pool/backup.rs":"5782cb9c3513a2b4302aa586390e898e32f10bdd7717bc282a77e95ef8ad225f","src/pool/backup_stack.rs":"e7e8a1bdf1040965b7d95c1b04e0d1959480729d82dee3bd2fea2f2190063203","src/pool/mod.rs":"ce5ea1beed38bcef52be7070a3dac9888f4805bd090ab679396ea2e7e2d5f248","src/pool/state.rs":"474e8b30328f386fe6bda36b469c9f8590d1eb6dc525325254f1a1b3cf2923f3","src/sender.rs":"c5563a2d81f2ebe7dc77cd4caed2c1b0b95b0774d4b5dff3a20c4ad6d2107a7a","src/shutdown.rs":"94cd086cffb0dc86a338226f47be549e148a6a44f707ed3fc5ed1334a0d2cb7d","src/task/blocking.rs":"04a24d2d5505593f1172aab5a4ba903736d6630afd658c15a9f0b3252599a6c0","src/task/blocking_state.rs":"4116e314417b53608947374ce243e116596383b2f0c8728a5814f7d73fcc574d","src/task/mod.rs":"f3d854026cd1f6c8d1b6c116f79945a14554f10da4300fdc6acf31fa9a7b99b1","src/task/state.rs":"8f90e419e50ab862be6d7c0ef34c6b1cc50c0e020b4fc8297ac1680b630db7ed","src/thread_pool.rs":"fc039328a70800570421b68d9f4790b30120e5b6629a1a4a7e47218b7cb8b7eb","src/worker/entry.rs":"126bc91b41ef728a423b9d6b0728ae135be15256e03106566908e3b7f158c956","src/worker/mod.rs":"5b80dedb308503bb6007d77d1f10f0889d05d15aa3a26d68f2d7a92d31629d15","src/worker/stack.rs":"7e40d40fdef023ea0dfdf076a1b68e3aa56eaf32f6bf3de546d557321650787f","src/worker/state.rs":"0aba1be55c52f0e69db249ef986b2db50b493de65b7a8ff8dd2af29b192c8ade","tests/blocking.rs":"00467e3bba6c0d0a66ed82d876ffadded022b6fde5dabd9d7c92b9243cc9968c","tests/hammer.rs":"fe3371ca3b1a7f5eeedb4bca619090b0a90fe0dca46fda8aadd761b847ed6a0a","tests/threadpool.rs":"5bb7989ea4ca4f596938f7bacaf3dd3ebf54349dbc6d0d03da1f003867f11206"},"package":"f0c32ffea4827978e9aa392d2f743d973c1dfa3730a2ed3f22ce1e6984da848c"} \ No newline at end of file
diff --git a/third_party/rust/tokio-threadpool/CHANGELOG.md b/third_party/rust/tokio-threadpool/CHANGELOG.md
new file mode 100644
index 0000000000..f99b9f90e0
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/CHANGELOG.md
@@ -0,0 +1,98 @@
+# 0.1.17 (December 3, 2019)
+
+### Added
+- Internal APIs for overriding blocking behavior (#1752)
+
+# 0.1.16 (September 25, 2019)
+
+### Changed
+- Remove last non-dev dependency on rand crate by seeding PRNG via libstd
+ `RandomState` (#1324 backport)
+- Upgrade (dev-only dependency) rand to 0.7.0 (#1302 backport)
+- The minimum supported rust version (MSRV) is now 1.31.0 (#1358)
+
+# 0.1.15 (June 2, 2019)
+
+### Changed
+- Allow other executors inside `threadpool::blocking` (#1155).
+
+# 0.1.14 (April 22, 2019)
+
+### Added
+- Add `panic_handler` for customizing action taken on panic (#1052).
+
+# 0.1.13 (March 22, 2019)
+
+### Added
+- `TypedExecutor` implementations (#993)
+
+# 0.1.12 (March 1, 2019)
+
+### Fixed
+- Documentation typos (#915).
+
+### Changed
+- Update crossbeam dependencies (#874).
+
+# 0.1.11 (January 24, 2019)
+
+### Fixed
+- Drop incomplete tasks when threadpool is dropped (#722).
+
+# 0.1.10 (January 6, 2019)
+
+* Fix deadlock bug in `blocking` (#795).
+* Introduce global task queue (#798).
+* Use crossbeam's Parker / Unparker (#529).
+* Panic if worker thread cannot be spawned (#826).
+* Improve `blocking` API documentation (#789).
+
+# 0.1.9 (November 21, 2018)
+
+* Bump internal dependency versions (#746, #753).
+* Internal refactors (#768, #769).
+
+# 0.1.8 (October 23, 2018)
+
+* Assign spawned tasks to random worker (#660).
+* Worker threads no longer shutdown (#692).
+* Reduce atomic ops in notifier (#702).
+
+# 0.1.7 (September 27, 2018)
+
+* Add ThreadPool::spawn_handle (#602, #604).
+* Fix spawned future leak (#649).
+
+# 0.1.6 (August 23, 2018)
+
+* Misc performance improvements (#466, #468, #470, #475, #534)
+* Documentation improvements (#450)
+* Shutdown backup threads when idle (#489)
+* Implement std::error::Error for error types (#511)
+* Bugfix: handle num_cpus returning zero (#530).
+
+# 0.1.5 (July 3, 2018)
+
+* Fix race condition bug when threads are woken up (#459).
+* Improve `BlockingError` message (#451).
+
+# 0.1.4 (June 6, 2018)
+
+* Fix bug that can occur with multiple pools in a process (#375).
+
+# 0.1.3 (May 2, 2018)
+
+* Add `blocking` annotation (#317).
+
+# 0.1.2 (March 30, 2018)
+
+* Add the ability to specify a custom thread parker.
+
+# 0.1.1 (March 22, 2018)
+
+* Handle futures that panic on the threadpool.
+* Optionally support futures 0.2.
+
+# 0.1.0 (March 09, 2018)
+
+* Initial release
diff --git a/third_party/rust/tokio-threadpool/Cargo.lock b/third_party/rust/tokio-threadpool/Cargo.lock
new file mode 100644
index 0000000000..22ee369b6b
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/Cargo.lock
@@ -0,0 +1,290 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+[[package]]
+name = "autocfg"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "c2-chacha"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "cfg-if"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "crossbeam-deque"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
+ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "memoffset 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-queue"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.6.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
+ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "env_logger"
+version = "0.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "futures"
+version = "0.1.29"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "futures-cpupool"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "getrandom"
+version = "0.1.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
+ "wasi 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "hermit-abi"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "lazy_static"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "libc"
+version = "0.2.66"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "log"
+version = "0.4.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "memoffset"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "num_cpus"
+version = "1.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "hermit-abi 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "ppv-lite86"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "rand"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "getrandom 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_chacha 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rand_chacha"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "c2-chacha 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rand_core"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "getrandom 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rand_hc"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rustc_version"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "scopeguard"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "semver"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "semver-parser"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "slab"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "threadpool"
+version = "1.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "tokio-executor"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "tokio-threadpool"
+version = "0.1.17"
+dependencies = [
+ "crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "wasi"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[metadata]
+"checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2"
+"checksum c2-chacha 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "214238caa1bf3a496ec3392968969cab8549f96ff30652c9e56885329315f6bb"
+"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
+"checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca"
+"checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac"
+"checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
+"checksum crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
+"checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4"
+"checksum env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3"
+"checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef"
+"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
+"checksum getrandom 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "e7db7ca94ed4cd01190ceee0d8a8052f08a247aa1b469a7f68c6a3b71afcf407"
+"checksum hermit-abi 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "307c3c9f937f38e3534b1d6447ecf090cafcc9744e4a6360e8b037b2cf5af120"
+"checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
+"checksum libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)" = "d515b1f41455adea1313a4a2ac8a8a477634fbae63cc6100e3aebb207ce61558"
+"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
+"checksum memoffset 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "75189eb85871ea5c2e2c15abbdd541185f63b408415e5051f5cac122d8c774b9"
+"checksum num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)" = "76dac5ed2a876980778b8b85f75a71b6cbf0db0b1232ee12f826bccb00d09d72"
+"checksum ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b"
+"checksum rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3ae1b169243eaf61759b8475a998f0a385e42042370f3a7dbaf35246eacc8412"
+"checksum rand_chacha 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "03a2a90da8c7523f554344f921aa97283eadf6ac484a6d2a7d0212fa7f8d6853"
+"checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
+"checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
+"checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
+"checksum scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d"
+"checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
+"checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
+"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
+"checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865"
+"checksum tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ca6df436c42b0c3330a82d855d2ef017cd793090ad550a6bc2184f4b933532ab"
+"checksum wasi 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b89c3ce4ce14bdc6fb6beaf9ec7928ca331de5df7e5ea278375642a2f478570d"
diff --git a/third_party/rust/tokio-threadpool/Cargo.toml b/third_party/rust/tokio-threadpool/Cargo.toml
new file mode 100644
index 0000000000..5541360ec6
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/Cargo.toml
@@ -0,0 +1,61 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+name = "tokio-threadpool"
+version = "0.1.17"
+authors = ["Carl Lerche <me@carllerche.com>"]
+description = "A task scheduler backed by a work-stealing thread pool.\n"
+homepage = "https://github.com/tokio-rs/tokio"
+documentation = "https://docs.rs/tokio-threadpool/0.1.17/tokio_threadpool"
+keywords = ["futures", "tokio"]
+categories = ["concurrency", "asynchronous"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+[dependencies.crossbeam-deque]
+version = "0.7.0"
+
+[dependencies.crossbeam-queue]
+version = "0.1.0"
+
+[dependencies.crossbeam-utils]
+version = "0.6.4"
+
+[dependencies.futures]
+version = "0.1.19"
+
+[dependencies.lazy_static]
+version = "1"
+
+[dependencies.log]
+version = "0.4"
+
+[dependencies.num_cpus]
+version = "1.2"
+
+[dependencies.slab]
+version = "0.4.1"
+
+[dependencies.tokio-executor]
+version = "0.1.8"
+[dev-dependencies.env_logger]
+version = "0.6"
+default-features = false
+
+[dev-dependencies.futures-cpupool]
+version = "0.1.7"
+
+[dev-dependencies.rand]
+version = "0.7"
+
+[dev-dependencies.threadpool]
+version = "1.7.1"
diff --git a/third_party/rust/tokio-threadpool/LICENSE b/third_party/rust/tokio-threadpool/LICENSE
new file mode 100644
index 0000000000..cdb28b4b56
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2019 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/third_party/rust/tokio-threadpool/README.md b/third_party/rust/tokio-threadpool/README.md
new file mode 100644
index 0000000000..2a98585a89
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/README.md
@@ -0,0 +1,56 @@
+# Tokio Thread Pool
+
+A library for scheduling execution of futures concurrently across a pool of
+threads.
+
+[Documentation](https://docs.rs/tokio-threadpool/0.1.17/tokio_threadpool)
+
+### Why not Rayon?
+
+Rayon is designed to handle parallelizing single computations by breaking them
+into smaller chunks. The scheduling for each individual chunk doesn't matter as
+long as the root computation completes in a timely fashion. In other words,
+Rayon does not provide any guarantees of fairness with regards to how each task
+gets scheduled.
+
+On the other hand, `tokio-threadpool` is a general purpose scheduler and
+attempts to schedule each task fairly. This is the ideal behavior when
+scheduling a set of unrelated tasks.
+
+### Why not futures-cpupool?
+
+It's 10x slower.
+
+## Examples
+
+```rust
+extern crate tokio_threadpool;
+extern crate futures;
+
+use tokio_threadpool::ThreadPool;
+use futures::{Future, lazy};
+use futures::sync::oneshot;
+
+pub fn main() {
+ let pool = ThreadPool::new();
+ let (tx, rx) = oneshot::channel();
+
+ pool.spawn(lazy(|| {
+ println!("Running on the pool");
+ tx.send("complete").map_err(|e| println!("send error, {}", e))
+ }));
+
+ println!("Result: {:?}", rx.wait());
+ pool.shutdown().wait().unwrap();
+}
+```
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/third_party/rust/tokio-threadpool/benches/basic.rs b/third_party/rust/tokio-threadpool/benches/basic.rs
new file mode 100644
index 0000000000..cd1f5673c0
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/benches/basic.rs
@@ -0,0 +1,165 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate futures_cpupool;
+extern crate num_cpus;
+extern crate test;
+extern crate tokio_threadpool;
+
+const NUM_SPAWN: usize = 10_000;
+const NUM_YIELD: usize = 1_000;
+const TASKS_PER_CPU: usize = 50;
+
+mod threadpool {
+ use futures::{future, task, Async};
+ use num_cpus;
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::atomic::Ordering::SeqCst;
+ use std::sync::{mpsc, Arc};
+ use test;
+ use tokio_threadpool::*;
+
+ #[bench]
+ fn spawn_many(b: &mut test::Bencher) {
+ let threadpool = ThreadPool::new();
+
+ let (tx, rx) = mpsc::sync_channel(10);
+ let rem = Arc::new(AtomicUsize::new(0));
+
+ b.iter(move || {
+ rem.store(super::NUM_SPAWN, SeqCst);
+
+ for _ in 0..super::NUM_SPAWN {
+ let tx = tx.clone();
+ let rem = rem.clone();
+
+ threadpool.spawn(future::lazy(move || {
+ if 1 == rem.fetch_sub(1, SeqCst) {
+ tx.send(()).unwrap();
+ }
+
+ Ok(())
+ }));
+ }
+
+ let _ = rx.recv().unwrap();
+ });
+ }
+
+ #[bench]
+ fn yield_many(b: &mut test::Bencher) {
+ let threadpool = ThreadPool::new();
+ let tasks = super::TASKS_PER_CPU * num_cpus::get();
+
+ let (tx, rx) = mpsc::sync_channel(tasks);
+
+ b.iter(move || {
+ for _ in 0..tasks {
+ let mut rem = super::NUM_YIELD;
+ let tx = tx.clone();
+
+ threadpool.spawn(future::poll_fn(move || {
+ rem -= 1;
+
+ if rem == 0 {
+ tx.send(()).unwrap();
+ Ok(Async::Ready(()))
+ } else {
+ // Notify the current task
+ task::current().notify();
+
+ // Not ready
+ Ok(Async::NotReady)
+ }
+ }));
+ }
+
+ for _ in 0..tasks {
+ let _ = rx.recv().unwrap();
+ }
+ });
+ }
+}
+
+// In this case, CPU pool completes the benchmark faster, but this is due to how
+// CpuPool currently behaves, starving other futures. This completes the
+// benchmark quickly but results in poor runtime characteristics for a thread
+// pool.
+//
+// See rust-lang-nursery/futures-rs#617
+//
+mod cpupool {
+ use futures::future::{self, Executor};
+ use futures::{task, Async};
+ use futures_cpupool::*;
+ use num_cpus;
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::atomic::Ordering::SeqCst;
+ use std::sync::{mpsc, Arc};
+ use test;
+
+ #[bench]
+ fn spawn_many(b: &mut test::Bencher) {
+ let pool = CpuPool::new(num_cpus::get());
+
+ let (tx, rx) = mpsc::sync_channel(10);
+ let rem = Arc::new(AtomicUsize::new(0));
+
+ b.iter(move || {
+ rem.store(super::NUM_SPAWN, SeqCst);
+
+ for _ in 0..super::NUM_SPAWN {
+ let tx = tx.clone();
+ let rem = rem.clone();
+
+ pool.execute(future::lazy(move || {
+ if 1 == rem.fetch_sub(1, SeqCst) {
+ tx.send(()).unwrap();
+ }
+
+ Ok(())
+ }))
+ .ok()
+ .unwrap();
+ }
+
+ let _ = rx.recv().unwrap();
+ });
+ }
+
+ #[bench]
+ fn yield_many(b: &mut test::Bencher) {
+ let pool = CpuPool::new(num_cpus::get());
+ let tasks = super::TASKS_PER_CPU * num_cpus::get();
+
+ let (tx, rx) = mpsc::sync_channel(tasks);
+
+ b.iter(move || {
+ for _ in 0..tasks {
+ let mut rem = super::NUM_YIELD;
+ let tx = tx.clone();
+
+ pool.execute(future::poll_fn(move || {
+ rem -= 1;
+
+ if rem == 0 {
+ tx.send(()).unwrap();
+ Ok(Async::Ready(()))
+ } else {
+ // Notify the current task
+ task::current().notify();
+
+ // Not ready
+ Ok(Async::NotReady)
+ }
+ }))
+ .ok()
+ .unwrap();
+ }
+
+ for _ in 0..tasks {
+ let _ = rx.recv().unwrap();
+ }
+ });
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/benches/blocking.rs b/third_party/rust/tokio-threadpool/benches/blocking.rs
new file mode 100644
index 0000000000..bc5121545a
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/benches/blocking.rs
@@ -0,0 +1,137 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate rand;
+extern crate test;
+extern crate threadpool;
+extern crate tokio_threadpool;
+
+const ITER: usize = 1_000;
+
+mod blocking {
+ use super::*;
+
+ use futures::future::*;
+ use tokio_threadpool::{blocking, Builder};
+
+ #[bench]
+ fn cpu_bound(b: &mut test::Bencher) {
+ let pool = Builder::new().pool_size(2).max_blocking(20).build();
+
+ b.iter(|| {
+ let count_down = Arc::new(CountDown::new(::ITER));
+
+ for _ in 0..::ITER {
+ let count_down = count_down.clone();
+
+ pool.spawn(lazy(move || {
+ poll_fn(|| blocking(|| perform_complex_computation()).map_err(|_| panic!()))
+ .and_then(move |_| {
+ // Do something with the value
+ count_down.dec();
+ Ok(())
+ })
+ }));
+ }
+
+ count_down.wait();
+ })
+ }
+}
+
+mod message_passing {
+ use super::*;
+
+ use futures::future::*;
+ use futures::sync::oneshot;
+ use tokio_threadpool::Builder;
+
+ #[bench]
+ fn cpu_bound(b: &mut test::Bencher) {
+ let pool = Builder::new().pool_size(2).max_blocking(20).build();
+
+ let blocking = threadpool::ThreadPool::new(20);
+
+ b.iter(|| {
+ let count_down = Arc::new(CountDown::new(::ITER));
+
+ for _ in 0..::ITER {
+ let count_down = count_down.clone();
+ let blocking = blocking.clone();
+
+ pool.spawn(lazy(move || {
+ // Create a channel to receive the return value.
+ let (tx, rx) = oneshot::channel();
+
+ // Spawn a task on the blocking thread pool to process the
+ // computation.
+ blocking.execute(move || {
+ let res = perform_complex_computation();
+ tx.send(res).unwrap();
+ });
+
+ rx.and_then(move |_| {
+ count_down.dec();
+ Ok(())
+ })
+ .map_err(|_| panic!())
+ }));
+ }
+
+ count_down.wait();
+ })
+ }
+}
+
+fn perform_complex_computation() -> usize {
+ use rand::*;
+
+ // Simulate a CPU heavy computation
+ let mut rng = rand::thread_rng();
+ rng.gen()
+}
+
+// Util for waiting until the tasks complete
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::*;
+use std::sync::*;
+
+struct CountDown {
+ rem: AtomicUsize,
+ mutex: Mutex<()>,
+ condvar: Condvar,
+}
+
+impl CountDown {
+ fn new(rem: usize) -> Self {
+ CountDown {
+ rem: AtomicUsize::new(rem),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ }
+ }
+
+ fn dec(&self) {
+ let prev = self.rem.fetch_sub(1, AcqRel);
+
+ if prev != 1 {
+ return;
+ }
+
+ let _lock = self.mutex.lock().unwrap();
+ self.condvar.notify_all();
+ }
+
+ fn wait(&self) {
+ let mut lock = self.mutex.lock().unwrap();
+
+ loop {
+ if self.rem.load(Acquire) == 0 {
+ return;
+ }
+
+ lock = self.condvar.wait(lock).unwrap();
+ }
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/benches/depth.rs b/third_party/rust/tokio-threadpool/benches/depth.rs
new file mode 100644
index 0000000000..b4f2bcb095
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/benches/depth.rs
@@ -0,0 +1,76 @@
+#![feature(test)]
+
+extern crate futures;
+extern crate futures_cpupool;
+extern crate num_cpus;
+extern crate test;
+extern crate tokio_threadpool;
+
+const ITER: usize = 20_000;
+
+mod us {
+ use futures::future;
+ use std::sync::mpsc;
+ use test;
+ use tokio_threadpool::*;
+
+ #[bench]
+ fn chained_spawn(b: &mut test::Bencher) {
+ let threadpool = ThreadPool::new();
+
+ fn spawn(pool_tx: Sender, res_tx: mpsc::Sender<()>, n: usize) {
+ if n == 0 {
+ res_tx.send(()).unwrap();
+ } else {
+ let pool_tx2 = pool_tx.clone();
+ pool_tx
+ .spawn(future::lazy(move || {
+ spawn(pool_tx2, res_tx, n - 1);
+ Ok(())
+ }))
+ .unwrap();
+ }
+ }
+
+ b.iter(move || {
+ let (res_tx, res_rx) = mpsc::channel();
+
+ spawn(threadpool.sender().clone(), res_tx, super::ITER);
+ res_rx.recv().unwrap();
+ });
+ }
+}
+
+mod cpupool {
+ use futures::future::{self, Executor};
+ use futures_cpupool::*;
+ use num_cpus;
+ use std::sync::mpsc;
+ use test;
+
+ #[bench]
+ fn chained_spawn(b: &mut test::Bencher) {
+ let pool = CpuPool::new(num_cpus::get());
+
+ fn spawn(pool: CpuPool, res_tx: mpsc::Sender<()>, n: usize) {
+ if n == 0 {
+ res_tx.send(()).unwrap();
+ } else {
+ let pool2 = pool.clone();
+ pool.execute(future::lazy(move || {
+ spawn(pool2, res_tx, n - 1);
+ Ok(())
+ }))
+ .ok()
+ .unwrap();
+ }
+ }
+
+ b.iter(move || {
+ let (res_tx, res_rx) = mpsc::channel();
+
+ spawn(pool.clone(), res_tx, super::ITER);
+ res_rx.recv().unwrap();
+ });
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/examples/depth.rs b/third_party/rust/tokio-threadpool/examples/depth.rs
new file mode 100644
index 0000000000..3d376dd38a
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/examples/depth.rs
@@ -0,0 +1,48 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio_threadpool;
+
+use futures::future::{self, Executor};
+use tokio_threadpool::*;
+
+use std::sync::mpsc;
+
+const ITER: usize = 2_000_000;
+// const ITER: usize = 30;
+
+fn chained_spawn() {
+ let pool = ThreadPool::new();
+ let tx = pool.sender().clone();
+
+ fn spawn(tx: Sender, res_tx: mpsc::Sender<()>, n: usize) {
+ if n == 0 {
+ res_tx.send(()).unwrap();
+ } else {
+ let tx2 = tx.clone();
+ tx.execute(future::lazy(move || {
+ spawn(tx2, res_tx, n - 1);
+ Ok(())
+ }))
+ .ok()
+ .unwrap();
+ }
+ }
+
+ loop {
+ println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
+ let (res_tx, res_rx) = mpsc::channel();
+
+ for _ in 0..10 {
+ spawn(tx.clone(), res_tx.clone(), ITER);
+ }
+
+ for _ in 0..10 {
+ res_rx.recv().unwrap();
+ }
+ }
+}
+
+pub fn main() {
+ let _ = ::env_logger::init();
+ chained_spawn();
+}
diff --git a/third_party/rust/tokio-threadpool/examples/hello.rs b/third_party/rust/tokio-threadpool/examples/hello.rs
new file mode 100644
index 0000000000..87eb688c2d
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/examples/hello.rs
@@ -0,0 +1,24 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio_threadpool;
+
+use futures::sync::oneshot;
+use futures::*;
+use tokio_threadpool::*;
+
+pub fn main() {
+ let _ = ::env_logger::init();
+
+ let pool = ThreadPool::new();
+ let tx = pool.sender().clone();
+
+ let res = oneshot::spawn(
+ future::lazy(|| {
+ println!("Running on the pool");
+ Ok::<_, ()>("complete")
+ }),
+ &tx,
+ );
+
+ println!("Result: {:?}", res.wait());
+}
diff --git a/third_party/rust/tokio-threadpool/src/blocking/global.rs b/third_party/rust/tokio-threadpool/src/blocking/global.rs
new file mode 100644
index 0000000000..c732a58335
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/blocking/global.rs
@@ -0,0 +1,218 @@
+use super::{BlockingError, BlockingImpl};
+use futures::Poll;
+use std::cell::Cell;
+use std::fmt;
+use std::marker::PhantomData;
+use tokio_executor::Enter;
+
+thread_local! {
+ static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking);
+}
+
+/// Ensures that the executor is removed from the thread-local context
+/// when leaving the scope. This handles cases that involve panicking.
+///
+/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
+/// backwards-compatibility layer. In general, user code should not override the
+/// blocking implementation. If you use this, make sure you know what you're
+/// doing.
+pub struct DefaultGuard<'a> {
+ prior: BlockingImpl,
+ _lifetime: PhantomData<&'a ()>,
+}
+
+/// Set the default blocking implementation, returning a guard that resets the
+/// blocking implementation when dropped.
+///
+/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
+/// backwards-compatibility layer. In general, user code should not override the
+/// blocking implementation. If you use this, make sure you know what you're
+/// doing.
+pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> {
+ CURRENT.with(|cell| {
+ let prior = cell.replace(blocking);
+ DefaultGuard {
+ prior,
+ _lifetime: PhantomData,
+ }
+ })
+}
+
+/// Set the default blocking implementation for the duration of the closure.
+///
+/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
+/// backwards-compatibility layer. In general, user code should not override the
+/// blocking implementation. If you use this, make sure you know what you're
+/// doing.
+pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R
+where
+ F: FnOnce(&mut Enter) -> R,
+{
+ let _guard = set_default(blocking);
+ f(enter)
+}
+
+/// Enter a blocking section of code.
+///
+/// The `blocking` function annotates a section of code that performs a blocking
+/// operation, either by issuing a blocking syscall or by performing a long
+/// running CPU-bound computation.
+///
+/// When the `blocking` function enters, it hands off the responsibility of
+/// processing the current work queue to another thread. Then, it calls the
+/// supplied closure. The closure is permitted to block indefinitely.
+///
+/// If the maximum number of concurrent `blocking` calls has been reached, then
+/// `NotReady` is returned and the task is notified once existing `blocking`
+/// calls complete. The maximum value is specified when creating a thread pool
+/// using [`Builder::max_blocking`][build]
+///
+/// NB: The entire task that called `blocking` is blocked whenever the supplied
+/// closure blocks, even if you have used future combinators such as `select` -
+/// the other futures in this task will not make progress until the closure
+/// returns.
+/// If this is not desired, ensure that `blocking` runs in its own task (e.g.
+/// using `futures::sync::oneshot::spawn`).
+///
+/// [build]: struct.Builder.html#method.max_blocking
+///
+/// # Return
+///
+/// When the blocking closure is executed, `Ok(Ready(T))` is returned, where
+/// `T` is the closure's return value.
+///
+/// If the thread pool has shutdown, `Err` is returned.
+///
+/// If the number of concurrent `blocking` calls has reached the maximum,
+/// `Ok(NotReady)` is returned and the current task is notified when a call to
+/// `blocking` will succeed.
+///
+/// If `blocking` is called from outside the context of a Tokio thread pool,
+/// `Err` is returned.
+///
+/// # Background
+///
+/// By default, the Tokio thread pool expects that tasks will only run for short
+/// periods at a time before yielding back to the thread pool. This is the basic
+/// premise of cooperative multitasking.
+///
+/// However, it is common to want to perform a blocking operation while
+/// processing an asynchronous computation. Examples of blocking operation
+/// include:
+///
+/// * Performing synchronous file operations (reading and writing).
+/// * Blocking on acquiring a mutex.
+/// * Performing a CPU bound computation, like cryptographic encryption or
+/// decryption.
+///
+/// One option for dealing with blocking operations in an asynchronous context
+/// is to use a thread pool dedicated to performing these operations. This not
+/// ideal as it requires bidirectional message passing as well as a channel to
+/// communicate which adds a level of buffering.
+///
+/// Instead, `blocking` hands off the responsibility of processing the work queue
+/// to another thread. This hand off is light compared to a channel and does not
+/// require buffering.
+///
+/// # Examples
+///
+/// Block on receiving a message from a `std` channel. This example is a little
+/// silly as using the non-blocking channel from the `futures` crate would make
+/// more sense. The blocking receive can be replaced with any blocking operation
+/// that needs to be performed.
+///
+/// ```rust
+/// # extern crate futures;
+/// # extern crate tokio_threadpool;
+///
+/// use tokio_threadpool::{ThreadPool, blocking};
+///
+/// use futures::Future;
+/// use futures::future::{lazy, poll_fn};
+///
+/// use std::sync::mpsc;
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// pub fn main() {
+/// // This is a *blocking* channel
+/// let (tx, rx) = mpsc::channel();
+///
+/// // Spawn a thread to send a message
+/// thread::spawn(move || {
+/// thread::sleep(Duration::from_millis(500));
+/// tx.send("hello").unwrap();
+/// });
+///
+/// let pool = ThreadPool::new();
+///
+/// pool.spawn(lazy(move || {
+/// // Because `blocking` returns `Poll`, it is intended to be used
+/// // from the context of a `Future` implementation. Since we don't
+/// // have a complicated requirement, we can use `poll_fn` in this
+/// // case.
+/// poll_fn(move || {
+/// blocking(|| {
+/// let msg = rx.recv().unwrap();
+/// println!("message = {}", msg);
+/// }).map_err(|_| panic!("the threadpool shut down"))
+/// })
+/// }));
+///
+/// // Wait for the task we just spawned to complete.
+/// pool.shutdown_on_idle().wait().unwrap();
+/// }
+/// ```
+pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
+where
+ F: FnOnce() -> T,
+{
+ CURRENT.with(|cell| {
+ let blocking = cell.get();
+
+ // Object-safety workaround: the `Blocking` trait must be object-safe,
+ // since we use a trait object in the thread-local. However, a blocking
+ // _operation_ will be generic over the return type of the blocking
+ // function. Therefore, rather than passing a function with a return
+ // type to `Blocking::run_blocking`, we pass a _new_ closure which
+ // doesn't have a return value. That closure invokes the blocking
+ // function and assigns its value to `ret`, which we then unpack when
+ // the blocking call finishes.
+ let mut f = Some(f);
+ let mut ret = None;
+ {
+ let ret2 = &mut ret;
+ let mut run = move || {
+ let f = f
+ .take()
+ .expect("blocking closure invoked twice; this is a bug!");
+ *ret2 = Some((f)());
+ };
+
+ try_ready!((blocking)(&mut run));
+ }
+
+ // Return the result
+ let ret =
+ ret.expect("blocking function finished, but return value was unset; this is a bug!");
+ Ok(ret.into())
+ })
+}
+
+// === impl DefaultGuard ===
+
+impl<'a> fmt::Debug for DefaultGuard<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad("DefaultGuard { .. }")
+ }
+}
+
+impl<'a> Drop for DefaultGuard<'a> {
+ fn drop(&mut self) {
+ // if the TLS value has already been torn down, there's nothing else we
+ // can do. we're almost certainly panicking anyway.
+ let _ = CURRENT.try_with(|cell| {
+ cell.set(self.prior);
+ });
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/blocking/mod.rs b/third_party/rust/tokio-threadpool/src/blocking/mod.rs
new file mode 100644
index 0000000000..27151d44b8
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/blocking/mod.rs
@@ -0,0 +1,88 @@
+use worker::Worker;
+
+use futures::{Async, Poll};
+use tokio_executor;
+
+use std::error::Error;
+use std::fmt;
+
+mod global;
+pub use self::global::blocking;
+#[doc(hidden)]
+pub use self::global::{set_default, with_default, DefaultGuard};
+
+/// Error raised by `blocking`.
+pub struct BlockingError {
+ _p: (),
+}
+
+/// A function implementing the behavior run on calls to `blocking`.
+///
+/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
+/// backwards-compatibility layer. In general, user code should not override the
+/// blocking implementation. If you use this, make sure you know what you're
+/// doing.
+#[doc(hidden)]
+pub type BlockingImpl = fn(&mut dyn FnMut()) -> Poll<(), BlockingError>;
+
+fn default_blocking(f: &mut dyn FnMut()) -> Poll<(), BlockingError> {
+ let res = Worker::with_current(|worker| {
+ let worker = match worker {
+ Some(worker) => worker,
+ None => {
+ return Err(BlockingError::new());
+ }
+ };
+
+ // Transition the worker state to blocking. This will exit the fn early
+ // with `NotReady` if the pool does not have enough capacity to enter
+ // blocking mode.
+ worker.transition_to_blocking()
+ });
+
+ // If the transition cannot happen, exit early
+ try_ready!(res);
+
+ // Currently in blocking mode, so call the inner closure.
+ //
+ // "Exit" the current executor in case the blocking function wants
+ // to call a different executor.
+ tokio_executor::exit(move || (f)());
+
+ // Try to transition out of blocking mode. This is a fast path that takes
+ // back ownership of the worker if the worker handoff didn't complete yet.
+ Worker::with_current(|worker| {
+ // Worker must be set since it was above.
+ worker.unwrap().transition_from_blocking();
+ });
+
+ Ok(Async::Ready(()))
+}
+
+impl BlockingError {
+ /// Returns a new `BlockingError`.
+ #[doc(hidden)]
+ pub fn new() -> Self {
+ Self { _p: () }
+ }
+}
+
+impl fmt::Display for BlockingError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "{}", self.description())
+ }
+}
+
+impl fmt::Debug for BlockingError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("BlockingError")
+ .field("reason", &self.description())
+ .finish()
+ }
+}
+
+impl Error for BlockingError {
+ fn description(&self) -> &str {
+ "`blocking` annotation used from outside the context of a thread pool"
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/builder.rs b/third_party/rust/tokio-threadpool/src/builder.rs
new file mode 100644
index 0000000000..b06568e6ae
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/builder.rs
@@ -0,0 +1,476 @@
+use callback::Callback;
+use config::{Config, MAX_WORKERS};
+use park::{BoxPark, BoxedPark, DefaultPark};
+use pool::{Pool, MAX_BACKUP};
+use shutdown::ShutdownTrigger;
+use thread_pool::ThreadPool;
+use worker::{self, Worker, WorkerId};
+
+use std::any::Any;
+use std::cmp::max;
+use std::error::Error;
+use std::fmt;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crossbeam_deque::Injector;
+use num_cpus;
+use tokio_executor::park::Park;
+use tokio_executor::Enter;
+
+/// Builds a thread pool with custom configuration values.
+///
+/// Methods can be chained in order to set the configuration values. The thread
+/// pool is constructed by calling [`build`].
+///
+/// New instances of `Builder` are obtained via [`Builder::new`].
+///
+/// See function level documentation for details on the various configuration
+/// settings.
+///
+/// [`build`]: #method.build
+/// [`Builder::new`]: #method.new
+///
+/// # Examples
+///
+/// ```
+/// # extern crate tokio_threadpool;
+/// # extern crate futures;
+/// # use tokio_threadpool::Builder;
+/// use futures::future::{Future, lazy};
+/// use std::time::Duration;
+///
+/// # pub fn main() {
+/// let thread_pool = Builder::new()
+/// .pool_size(4)
+/// .keep_alive(Some(Duration::from_secs(30)))
+/// .build();
+///
+/// thread_pool.spawn(lazy(|| {
+/// println!("called from a worker thread");
+/// Ok(())
+/// }));
+///
+/// // Gracefully shutdown the threadpool
+/// thread_pool.shutdown().wait().unwrap();
+/// # }
+/// ```
+pub struct Builder {
+ /// Thread pool specific configuration values
+ config: Config,
+
+ /// Number of workers to spawn
+ pool_size: usize,
+
+ /// Maximum number of futures that can be in a blocking section
+ /// concurrently.
+ max_blocking: usize,
+
+ /// Generates the `Park` instances
+ new_park: Box<dyn Fn(&WorkerId) -> BoxPark>,
+}
+
+impl Builder {
+ /// Returns a new thread pool builder initialized with default configuration
+ /// values.
+ ///
+ /// Configuration methods can be chained on the return value.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ /// use std::time::Duration;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .pool_size(4)
+ /// .keep_alive(Some(Duration::from_secs(30)))
+ /// .build();
+ /// # }
+ /// ```
+ pub fn new() -> Builder {
+ let num_cpus = max(1, num_cpus::get());
+
+ let new_park =
+ Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark);
+
+ Builder {
+ pool_size: num_cpus,
+ max_blocking: 100,
+ config: Config {
+ keep_alive: None,
+ name_prefix: None,
+ stack_size: None,
+ around_worker: None,
+ after_start: None,
+ before_stop: None,
+ panic_handler: None,
+ },
+ new_park,
+ }
+ }
+
+ /// Set the maximum number of worker threads for the thread pool instance.
+ ///
+ /// This must be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is the number of cores available to the system.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .pool_size(4)
+ /// .build();
+ /// # }
+ /// ```
+ pub fn pool_size(&mut self, val: usize) -> &mut Self {
+ assert!(val >= 1, "at least one thread required");
+ assert!(val <= MAX_WORKERS, "max value is {}", MAX_WORKERS);
+
+ self.pool_size = val;
+ self
+ }
+
+ /// Set the maximum number of concurrent blocking sections.
+ ///
+ /// When the maximum concurrent `blocking` calls is reached, any further
+ /// calls to `blocking` will return `NotReady` and the task is notified once
+ /// previously in-flight calls to `blocking` return.
+ ///
+ /// This must be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is 100.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .max_blocking(200)
+ /// .build();
+ /// # }
+ /// ```
+ pub fn max_blocking(&mut self, val: usize) -> &mut Self {
+ assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP);
+ self.max_blocking = val;
+ self
+ }
+
+ /// Set the thread keep alive duration
+ ///
+ /// If set, a thread that has completed a `blocking` call will wait for up
+ /// to the specified duration to become a worker thread again. Once the
+ /// duration elapses, the thread will shutdown.
+ ///
+ /// When the value is `None`, the thread will wait to become a worker
+ /// thread forever.
+ ///
+ /// The default value is `None`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ /// use std::time::Duration;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .keep_alive(Some(Duration::from_secs(30)))
+ /// .build();
+ /// # }
+ /// ```
+ pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
+ self.config.keep_alive = val;
+ self
+ }
+
+ /// Sets a callback to be triggered when a panic during a future bubbles up
+ /// to Tokio. By default Tokio catches these panics, and they will be
+ /// ignored. The parameter passed to this callback is the same error value
+ /// returned from std::panic::catch_unwind(). To abort the process on
+ /// panics, use std::panic::resume_unwind() in this callback as shown
+ /// below.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .panic_handler(|err| std::panic::resume_unwind(err))
+ /// .build();
+ /// # }
+ /// ```
+ pub fn panic_handler<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
+ {
+ self.config.panic_handler = Some(Arc::new(f));
+ self
+ }
+
+ /// Set name prefix of threads spawned by the scheduler
+ ///
+ /// Thread name prefix is used for generating thread names. For example, if
+ /// prefix is `my-pool-`, then threads in the pool will get names like
+ /// `my-pool-1` etc.
+ ///
+ /// If this configuration is not set, then the thread will use the system
+ /// default naming scheme.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .name_prefix("my-pool-")
+ /// .build();
+ /// # }
+ /// ```
+ pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
+ self.config.name_prefix = Some(val.into());
+ self
+ }
+
+ /// Set the stack size (in bytes) for worker threads.
+ ///
+ /// The actual stack size may be greater than this value if the platform
+ /// specifies minimal stack size.
+ ///
+ /// The default stack size for spawned threads is 2 MiB, though this
+ /// particular stack size is subject to change in the future.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .stack_size(32 * 1024)
+ /// .build();
+ /// # }
+ /// ```
+ pub fn stack_size(&mut self, val: usize) -> &mut Self {
+ self.config.stack_size = Some(val);
+ self
+ }
+
+ /// Execute function `f` on each worker thread.
+ ///
+ /// This function is provided a handle to the worker and is expected to call
+ /// [`Worker::run`], otherwise the worker thread will shutdown without doing
+ /// any work.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .around_worker(|worker, _| {
+ /// println!("worker is starting up");
+ /// worker.run();
+ /// println!("worker is shutting down");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ ///
+ /// [`Worker::run`]: struct.Worker.html#method.run
+ pub fn around_worker<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(&Worker, &mut Enter) + Send + Sync + 'static,
+ {
+ self.config.around_worker = Some(Callback::new(f));
+ self
+ }
+
+ /// Execute function `f` after each thread is started but before it starts
+ /// doing work.
+ ///
+ /// This is intended for bookkeeping and monitoring use cases.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .after_start(|| {
+ /// println!("thread started");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ pub fn after_start<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.config.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Execute function `f` before each thread stops.
+ ///
+ /// This is intended for bookkeeping and monitoring use cases.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .before_stop(|| {
+ /// println!("thread stopping");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ pub fn before_stop<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.config.before_stop = Some(Arc::new(f));
+ self
+ }
+
+ /// Customize the `park` instance used by each worker thread.
+ ///
+ /// The provided closure `f` is called once per worker and returns a `Park`
+ /// instance that is used by the worker to put itself to sleep.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ /// # fn decorate<F>(f: F) -> F { f }
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .custom_park(|_| {
+ /// use tokio_threadpool::park::DefaultPark;
+ ///
+ /// // This is the default park type that the worker would use if we
+ /// // did not customize it.
+ /// let park = DefaultPark::new();
+ ///
+ /// // Decorate the `park` instance, allowing us to customize work
+ /// // that happens when a worker thread goes to sleep.
+ /// decorate(park)
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(&WorkerId) -> P + 'static,
+ P: Park + Send + 'static,
+ P::Error: Error,
+ {
+ self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id))));
+
+ self
+ }
+
+ /// Create the configured `ThreadPool`.
+ ///
+ /// The returned `ThreadPool` instance is ready to spawn tasks.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::Builder;
+ ///
+ /// # pub fn main() {
+ /// let thread_pool = Builder::new()
+ /// .build();
+ /// # }
+ /// ```
+ pub fn build(&self) -> ThreadPool {
+ trace!("build; num-workers={}", self.pool_size);
+
+ // Create the worker entry list
+ let workers: Arc<[worker::Entry]> = {
+ let mut workers = vec![];
+
+ for i in 0..self.pool_size {
+ let id = WorkerId::new(i);
+ let park = (self.new_park)(&id);
+ let unpark = park.unpark();
+
+ workers.push(worker::Entry::new(park, unpark));
+ }
+
+ workers.into()
+ };
+
+ let queue = Arc::new(Injector::new());
+
+ // Create a trigger that will clean up resources on shutdown.
+ //
+ // The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain
+ // strong references.
+ let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone()));
+
+ // Create the pool
+ let pool = Arc::new(Pool::new(
+ workers,
+ Arc::downgrade(&trigger),
+ self.max_blocking,
+ self.config.clone(),
+ queue,
+ ));
+
+ ThreadPool::new2(pool, trigger)
+ }
+}
+
+impl fmt::Debug for Builder {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Builder")
+ .field("config", &self.config)
+ .field("pool_size", &self.pool_size)
+ .field("new_park", &"Box<Fn() -> BoxPark>")
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/callback.rs b/third_party/rust/tokio-threadpool/src/callback.rs
new file mode 100644
index 0000000000..4fea4cac8e
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/callback.rs
@@ -0,0 +1,30 @@
+use worker::Worker;
+
+use std::fmt;
+use std::sync::Arc;
+
+use tokio_executor::Enter;
+
+#[derive(Clone)]
+pub(crate) struct Callback {
+ f: Arc<dyn Fn(&Worker, &mut Enter) + Send + Sync>,
+}
+
+impl Callback {
+ pub fn new<F>(f: F) -> Self
+ where
+ F: Fn(&Worker, &mut Enter) + Send + Sync + 'static,
+ {
+ Callback { f: Arc::new(f) }
+ }
+
+ pub fn call(&self, worker: &Worker, enter: &mut Enter) {
+ (self.f)(worker, enter)
+ }
+}
+
+impl fmt::Debug for Callback {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "Fn")
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/config.rs b/third_party/rust/tokio-threadpool/src/config.rs
new file mode 100644
index 0000000000..94b3c06a0b
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/config.rs
@@ -0,0 +1,34 @@
+use callback::Callback;
+
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+use std::time::Duration;
+
+/// Thread pool specific configuration values
+#[derive(Clone)]
+pub(crate) struct Config {
+ pub keep_alive: Option<Duration>,
+ // Used to configure a worker thread
+ pub name_prefix: Option<String>,
+ pub stack_size: Option<usize>,
+ pub around_worker: Option<Callback>,
+ pub after_start: Option<Arc<dyn Fn() + Send + Sync>>,
+ pub before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
+ pub panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>,
+}
+
+/// Max number of workers that can be part of a pool. This is the most that can
+/// fit in the scheduler state. Note, that this is the max number of **active**
+/// threads. There can be more standby threads.
+pub(crate) const MAX_WORKERS: usize = 1 << 15;
+
+impl fmt::Debug for Config {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Config")
+ .field("keep_alive", &self.keep_alive)
+ .field("name_prefix", &self.name_prefix)
+ .field("stack_size", &self.stack_size)
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/lib.rs b/third_party/rust/tokio-threadpool/src/lib.rs
new file mode 100644
index 0000000000..a879af1771
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/lib.rs
@@ -0,0 +1,164 @@
+#![doc(html_root_url = "https://docs.rs/tokio-threadpool/0.1.17")]
+#![deny(missing_docs, missing_debug_implementations)]
+
+//! A work-stealing based thread pool for executing futures.
+//!
+//! The Tokio thread pool supports scheduling futures and processing them on
+//! multiple CPU cores. It is optimized for the primary Tokio use case of many
+//! independent tasks with limited computation and with most tasks waiting on
+//! I/O. Usually, users will not create a `ThreadPool` instance directly, but
+//! will use one via a [`runtime`].
+//!
+//! The `ThreadPool` structure manages two sets of threads:
+//!
+//! * Worker threads.
+//! * Backup threads.
+//!
+//! Worker threads are used to schedule futures using a work-stealing strategy.
+//! Backup threads, on the other hand, are intended only to support the
+//! `blocking` API. Threads will transition between the two sets.
+//!
+//! The advantage of the work-stealing strategy is minimal cross-thread
+//! coordination. The thread pool attempts to make as much progress as possible
+//! without communicating across threads.
+//!
+//! ## Worker overview
+//!
+//! Each worker has two queues: a deque and a mpsc channel. The deque is the
+//! primary queue for tasks that are scheduled to run on the worker thread. Tasks
+//! can only be pushed onto the deque by the worker, but other workers may
+//! "steal" from that deque. The mpsc channel is used to submit futures while
+//! external to the pool.
+//!
+//! As long as the thread pool has not been shutdown, a worker will run in a
+//! loop. Each loop, it consumes all tasks on its mpsc channel and pushes it onto
+//! the deque. It then pops tasks off of the deque and executes them.
+//!
+//! If a worker has no work, i.e., both queues are empty. It attempts to steal.
+//! To do this, it randomly scans other workers' deques and tries to pop a task.
+//! If it finds no work to steal, the thread goes to sleep.
+//!
+//! When the worker detects that the pool has been shut down, it exits the loop,
+//! cleans up its state, and shuts the thread down.
+//!
+//! ## Thread pool initialization
+//!
+//! Note, users normally will use the threadpool created by a [`runtime`].
+//!
+//! By default, no threads are spawned on creation. Instead, when new futures are
+//! spawned, the pool first checks if there are enough active worker threads. If
+//! not, a new worker thread is spawned.
+//!
+//! ## Spawning futures
+//!
+//! The spawning behavior depends on whether a future was spawned from within a
+//! worker or thread or if it was spawned from an external handle.
+//!
+//! When spawning a future while external to the thread pool, the current
+//! strategy is to randomly pick a worker to submit the task to. The task is then
+//! pushed onto that worker's mpsc channel.
+//!
+//! When spawning a future while on a worker thread, the task is pushed onto the
+//! back of the current worker's deque.
+//!
+//! ## Blocking annotation strategy
+//!
+//! The [`blocking`] function is used to annotate a section of code that
+//! performs a blocking operation, either by issuing a blocking syscall or
+//! performing any long running CPU-bound computation.
+//!
+//! The strategy for handling blocking closures is to hand off the worker to a
+//! new thread. This implies handing off the `deque` and `mpsc`. Once this is
+//! done, the new thread continues to process the work queue and the original
+//! thread is able to block. Once it finishes processing the blocking future, the
+//! thread has no additional work and is inserted into the backup pool. This
+//! makes it available to other workers that encounter a [`blocking`] call.
+//!
+//! [`blocking`]: fn.blocking.html
+//! [`runtime`]: https://docs.rs/tokio/0.1/tokio/runtime/
+
+extern crate tokio_executor;
+
+extern crate crossbeam_deque;
+extern crate crossbeam_queue;
+extern crate crossbeam_utils;
+#[macro_use]
+extern crate futures;
+#[macro_use]
+extern crate lazy_static;
+extern crate num_cpus;
+extern crate slab;
+
+#[macro_use]
+extern crate log;
+
+// ## Crate layout
+//
+// The primary type, `Pool`, holds the majority of a thread pool's state,
+// including the state for each worker. Each worker's state is maintained in an
+// instance of `worker::Entry`.
+//
+// `Worker` contains the logic that runs on each worker thread. It holds an
+// `Arc` to `Pool` and is able to access its state from `Pool`.
+//
+// `Task` is a harness around an individual future. It manages polling and
+// scheduling that future.
+//
+// ## Sleeping workers
+//
+// Sleeping workers are tracked using a [Treiber stack]. This results in the
+// thread that most recently went to sleep getting woken up first. When the pool
+// is not under load, this helps threads shutdown faster.
+//
+// Sleeping is done by using `tokio_executor::Park` implementations. This allows
+// the user of the thread pool to customize the work that is performed to sleep.
+// This is how injecting timers and other functionality into the thread pool is
+// done.
+//
+// ## Notifying workers
+//
+// When there is work to be done, workers must be notified. However, notifying a
+// worker requires cross thread coordination. Ideally, a worker would only be
+// notified when it is sleeping, but there is no way to know if a worker is
+// sleeping without cross thread communication.
+//
+// The two cases when a worker might need to be notified are:
+//
+// 1. A task is externally submitted to a worker via the mpsc channel.
+// 2. A worker has a back log of work and needs other workers to steal from it.
+//
+// In the first case, the worker will always be notified. However, it could be
+// possible to avoid the notification if the mpsc channel has two or greater
+// number of tasks *after* the task is submitted. In this case, we are able to
+// assume that the worker has previously been notified.
+//
+// The second case is trickier. Currently, whenever a worker spawns a new future
+// (pushing it onto its deque) and when it pops a future from its mpsc, it tries
+// to notify a sleeping worker to wake up and start stealing. This is a lot of
+// notification and it **might** be possible to reduce it.
+//
+// Also, whenever a worker is woken up via a signal and it does find work, it,
+// in turn, will try to wake up a new worker.
+//
+// [Treiber stack]: https://en.wikipedia.org/wiki/Treiber_Stack
+
+#[doc(hidden)]
+pub mod blocking;
+mod builder;
+mod callback;
+mod config;
+mod notifier;
+pub mod park;
+mod pool;
+mod sender;
+mod shutdown;
+mod task;
+mod thread_pool;
+mod worker;
+
+pub use blocking::{blocking, BlockingError};
+pub use builder::Builder;
+pub use sender::Sender;
+pub use shutdown::Shutdown;
+pub use thread_pool::{SpawnHandle, ThreadPool};
+pub use worker::{Worker, WorkerId};
diff --git a/third_party/rust/tokio-threadpool/src/notifier.rs b/third_party/rust/tokio-threadpool/src/notifier.rs
new file mode 100644
index 0000000000..fe8e8f4ad0
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/notifier.rs
@@ -0,0 +1,92 @@
+use pool::Pool;
+use task::Task;
+
+use std::mem;
+use std::ops;
+use std::sync::Arc;
+
+use futures::executor::Notify;
+
+/// Implements the future `Notify` API.
+///
+/// This is how external events are able to signal the task, informing it to try
+/// to poll the future again.
+#[derive(Debug)]
+pub(crate) struct Notifier {
+ pub pool: Arc<Pool>,
+}
+
+/// A guard that ensures that the inner value gets forgotten.
+#[derive(Debug)]
+struct Forget<T>(Option<T>);
+
+impl Notify for Notifier {
+ fn notify(&self, id: usize) {
+ trace!("Notifier::notify; id=0x{:x}", id);
+
+ unsafe {
+ let ptr = id as *const Task;
+
+ // We did not actually take ownership of the `Arc` in this function
+ // so we must ensure that the Arc is forgotten.
+ let task = Forget::new(Arc::from_raw(ptr));
+
+ // TODO: Unify this with Task::notify
+ if task.schedule() {
+ // TODO: Check if the pool is still running
+ //
+ // Bump the ref count
+ let task = task.clone();
+
+ let _ = self.pool.submit(task, &self.pool);
+ }
+ }
+ }
+
+ fn clone_id(&self, id: usize) -> usize {
+ let ptr = id as *const Task;
+
+ // This function doesn't actually get a strong ref to the task here.
+ // However, the only method we have to convert a raw pointer -> &Arc<T>
+ // is to call `Arc::from_raw` which returns a strong ref. So, to
+ // maintain the invariants, `t1` has to be forgotten. This prevents the
+ // ref count from being decremented.
+ let t1 = Forget::new(unsafe { Arc::from_raw(ptr) });
+
+ // The clone is forgotten so that the fn exits without decrementing the ref
+ // count. The caller of `clone_id` ensures that `drop_id` is called when
+ // the ref count needs to be decremented.
+ let _ = Forget::new(t1.clone());
+
+ id
+ }
+
+ fn drop_id(&self, id: usize) {
+ unsafe {
+ let ptr = id as *const Task;
+ let _ = Arc::from_raw(ptr);
+ }
+ }
+}
+
+// ===== impl Forget =====
+
+impl<T> Forget<T> {
+ fn new(t: T) -> Self {
+ Forget(Some(t))
+ }
+}
+
+impl<T> ops::Deref for Forget<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ self.0.as_ref().unwrap()
+ }
+}
+
+impl<T> Drop for Forget<T> {
+ fn drop(&mut self) {
+ mem::forget(self.0.take());
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/park/boxed.rs b/third_party/rust/tokio-threadpool/src/park/boxed.rs
new file mode 100644
index 0000000000..030866450d
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/park/boxed.rs
@@ -0,0 +1,45 @@
+use tokio_executor::park::{Park, Unpark};
+
+use std::error::Error;
+use std::time::Duration;
+
+pub(crate) type BoxPark = Box<dyn Park<Unpark = BoxUnpark, Error = ()> + Send>;
+pub(crate) type BoxUnpark = Box<dyn Unpark>;
+
+pub(crate) struct BoxedPark<T>(T);
+
+impl<T> BoxedPark<T> {
+ pub fn new(inner: T) -> Self {
+ BoxedPark(inner)
+ }
+}
+
+impl<T: Park + Send> Park for BoxedPark<T>
+where
+ T::Error: Error,
+{
+ type Unpark = BoxUnpark;
+ type Error = ();
+
+ fn unpark(&self) -> Self::Unpark {
+ Box::new(self.0.unpark())
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.0.park().map_err(|e| {
+ warn!(
+ "calling `park` on worker thread errored -- shutting down thread: {}",
+ e
+ );
+ })
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.0.park_timeout(duration).map_err(|e| {
+ warn!(
+ "calling `park` on worker thread errored -- shutting down thread: {}",
+ e
+ );
+ })
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/park/default_park.rs b/third_party/rust/tokio-threadpool/src/park/default_park.rs
new file mode 100644
index 0000000000..ecc22350f0
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/park/default_park.rs
@@ -0,0 +1,97 @@
+use tokio_executor::park::{Park, Unpark};
+
+use std::error::Error;
+use std::fmt;
+use std::time::Duration;
+
+use crossbeam_utils::sync::{Parker, Unparker};
+
+/// Parks the thread.
+#[derive(Debug)]
+pub struct DefaultPark {
+ inner: Parker,
+}
+
+/// Unparks threads that were parked by `DefaultPark`.
+#[derive(Debug)]
+pub struct DefaultUnpark {
+ inner: Unparker,
+}
+
+/// Error returned by [`ParkThread`]
+///
+/// This currently is never returned, but might at some point in the future.
+///
+/// [`ParkThread`]: struct.ParkThread.html
+#[derive(Debug)]
+pub struct ParkError {
+ _p: (),
+}
+
+// ===== impl DefaultPark =====
+
+impl DefaultPark {
+ /// Creates a new `DefaultPark` instance.
+ pub fn new() -> DefaultPark {
+ DefaultPark {
+ inner: Parker::new(),
+ }
+ }
+
+ /// Unpark the thread without having to clone the unpark handle.
+ ///
+ /// Named `notify` to avoid conflicting with the `unpark` fn.
+ pub(crate) fn notify(&self) {
+ self.inner.unparker().unpark();
+ }
+
+ pub(crate) fn park_sync(&self, duration: Option<Duration>) {
+ match duration {
+ None => self.inner.park(),
+ Some(duration) => self.inner.park_timeout(duration),
+ }
+ }
+}
+
+impl Park for DefaultPark {
+ type Unpark = DefaultUnpark;
+ type Error = ParkError;
+
+ fn unpark(&self) -> Self::Unpark {
+ DefaultUnpark {
+ inner: self.inner.unparker().clone(),
+ }
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park();
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.inner.park_timeout(duration);
+ Ok(())
+ }
+}
+
+// ===== impl DefaultUnpark =====
+
+impl Unpark for DefaultUnpark {
+ fn unpark(&self) {
+ self.inner.unpark();
+ }
+}
+
+// ===== impl ParkError =====
+
+impl fmt::Display for ParkError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ self.description().fmt(fmt)
+ }
+}
+
+impl Error for ParkError {
+ fn description(&self) -> &str {
+ "unknown park error"
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/park/mod.rs b/third_party/rust/tokio-threadpool/src/park/mod.rs
new file mode 100644
index 0000000000..e7c5f40d36
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/park/mod.rs
@@ -0,0 +1,8 @@
+//! Thread parking utilities.
+
+mod boxed;
+mod default_park;
+
+pub use self::default_park::{DefaultPark, DefaultUnpark, ParkError};
+
+pub(crate) use self::boxed::{BoxPark, BoxUnpark, BoxedPark};
diff --git a/third_party/rust/tokio-threadpool/src/pool/backup.rs b/third_party/rust/tokio-threadpool/src/pool/backup.rs
new file mode 100644
index 0000000000..e94e95d6f2
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/backup.rs
@@ -0,0 +1,308 @@
+use park::DefaultPark;
+use worker::WorkerId;
+
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed};
+use std::time::{Duration, Instant};
+
+/// State associated with a thread in the thread pool.
+///
+/// The pool manages a number of threads. Some of those threads are considered
+/// "primary" threads and process the work queue. When a task being run on a
+/// primary thread enters a blocking context, the responsibility of processing
+/// the work queue must be handed off to another thread. This is done by first
+/// checking for idle threads on the backup stack. If one is found, the worker
+/// token (`WorkerId`) is handed off to that running thread. If none are found,
+/// a new thread is spawned.
+///
+/// This state manages the exchange. A thread that is idle, not assigned to a
+/// work queue, sits around for a specified amount of time. When the worker
+/// token is handed off, it is first stored in `handoff`. The backup thread is
+/// then signaled. At this point, the backup thread wakes up from sleep and
+/// reads `handoff`. At that point, it has been promoted to a primary thread and
+/// will begin processing inbound work on the work queue.
+///
+/// The name `Backup` isn't really great for what the type does, but I have not
+/// come up with a better name... Maybe it should just be named `Thread`.
+#[derive(Debug)]
+pub(crate) struct Backup {
+ /// Worker ID that is being handed to this thread.
+ handoff: UnsafeCell<Option<WorkerId>>,
+
+ /// Thread state.
+ ///
+ /// This tracks:
+ ///
+ /// * Is queued flag
+ /// * If the pool is shutting down.
+ /// * If the thread is running
+ state: AtomicUsize,
+
+ /// Next entry in the Treiber stack.
+ next_sleeper: UnsafeCell<BackupId>,
+
+ /// Used to put the thread to sleep
+ park: DefaultPark,
+}
+
+#[derive(Debug, Eq, PartialEq, Copy, Clone)]
+pub(crate) struct BackupId(pub(crate) usize);
+
+#[derive(Debug)]
+pub(crate) enum Handoff {
+ Worker(WorkerId),
+ Idle,
+ Terminated,
+}
+
+/// Tracks thread state.
+#[derive(Clone, Copy, Eq, PartialEq)]
+struct State(usize);
+
+/// Set when the worker is pushed onto the scheduler's stack of sleeping
+/// threads.
+///
+/// This flag also serves as a "notification" bit. If another thread is
+/// attempting to hand off a worker to the backup thread, then the pushed bit
+/// will not be set when the thread tries to shutdown.
+pub const PUSHED: usize = 0b001;
+
+/// Set when the thread is running
+pub const RUNNING: usize = 0b010;
+
+/// Set when the thread pool has terminated
+pub const TERMINATED: usize = 0b100;
+
+// ===== impl Backup =====
+
+impl Backup {
+ pub fn new() -> Backup {
+ Backup {
+ handoff: UnsafeCell::new(None),
+ state: AtomicUsize::new(State::new().into()),
+ next_sleeper: UnsafeCell::new(BackupId(0)),
+ park: DefaultPark::new(),
+ }
+ }
+
+ /// Called when the thread is starting
+ pub fn start(&self, worker_id: &WorkerId) {
+ debug_assert!({
+ let state: State = self.state.load(Relaxed).into();
+
+ debug_assert!(!state.is_pushed());
+ debug_assert!(state.is_running());
+ debug_assert!(!state.is_terminated());
+
+ true
+ });
+
+ // The handoff value is equal to `worker_id`
+ debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id));
+
+ unsafe {
+ *self.handoff.get() = None;
+ }
+ }
+
+ pub fn is_running(&self) -> bool {
+ let state: State = self.state.load(Relaxed).into();
+ state.is_running()
+ }
+
+ /// Hands off the worker to a thread.
+ ///
+ /// Returns `true` if the thread needs to be spawned.
+ pub fn worker_handoff(&self, worker_id: WorkerId) -> bool {
+ unsafe {
+ // The backup worker should not already have been handoff a worker.
+ debug_assert!((*self.handoff.get()).is_none());
+
+ // Set the handoff
+ *self.handoff.get() = Some(worker_id);
+ }
+
+ // This *probably* can just be `Release`... memory orderings, how do
+ // they work?
+ let prev = State::worker_handoff(&self.state);
+ debug_assert!(prev.is_pushed());
+
+ if prev.is_running() {
+ // Wakeup the backup thread
+ self.park.notify();
+ false
+ } else {
+ true
+ }
+ }
+
+ /// Terminate the worker
+ pub fn signal_stop(&self) {
+ let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into();
+
+ debug_assert!(!prev.is_terminated());
+ debug_assert!(prev.is_pushed());
+
+ if prev.is_running() {
+ self.park.notify();
+ }
+ }
+
+ /// Release the worker
+ pub fn release(&self) {
+ let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into();
+
+ debug_assert!(prev.is_running());
+ }
+
+ /// Wait for a worker handoff
+ pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff {
+ let sleep_until = timeout.map(|dur| Instant::now() + dur);
+ let mut state: State = self.state.load(Acquire).into();
+
+ // Run in a loop since there can be spurious wakeups
+ loop {
+ if !state.is_pushed() {
+ if state.is_terminated() {
+ return Handoff::Terminated;
+ }
+
+ let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") };
+ return Handoff::Worker(worker_id);
+ }
+
+ match sleep_until {
+ None => {
+ self.park.park_sync(None);
+ state = self.state.load(Acquire).into();
+ }
+ Some(when) => {
+ let now = Instant::now();
+
+ if now < when {
+ self.park.park_sync(Some(when - now));
+ state = self.state.load(Acquire).into();
+ } else {
+ debug_assert!(state.is_running());
+
+ // Transition out of running
+ let mut next = state;
+ next.unset_running();
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ debug_assert!(!next.is_running());
+ return Handoff::Idle;
+ }
+
+ state = actual;
+ }
+ }
+ }
+ }
+ }
+
+ pub fn is_pushed(&self) -> bool {
+ let state: State = self.state.load(Relaxed).into();
+ state.is_pushed()
+ }
+
+ pub fn set_pushed(&self, ordering: Ordering) {
+ let prev: State = self.state.fetch_or(PUSHED, ordering).into();
+ debug_assert!(!prev.is_pushed());
+ }
+
+ #[inline]
+ pub fn next_sleeper(&self) -> BackupId {
+ unsafe { *self.next_sleeper.get() }
+ }
+
+ #[inline]
+ pub fn set_next_sleeper(&self, val: BackupId) {
+ unsafe {
+ *self.next_sleeper.get() = val;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ /// Returns a new, default, thread `State`
+ pub fn new() -> State {
+ State(0)
+ }
+
+ /// Returns true if the thread entry is pushed in the sleeper stack
+ pub fn is_pushed(&self) -> bool {
+ self.0 & PUSHED == PUSHED
+ }
+
+ fn unset_pushed(&mut self) {
+ self.0 &= !PUSHED;
+ }
+
+ pub fn is_running(&self) -> bool {
+ self.0 & RUNNING == RUNNING
+ }
+
+ pub fn set_running(&mut self) {
+ self.0 |= RUNNING;
+ }
+
+ pub fn unset_running(&mut self) {
+ self.0 &= !RUNNING;
+ }
+
+ pub fn is_terminated(&self) -> bool {
+ self.0 & TERMINATED == TERMINATED
+ }
+
+ fn worker_handoff(state: &AtomicUsize) -> State {
+ let mut curr: State = state.load(Acquire).into();
+
+ loop {
+ let mut next = curr;
+ next.set_running();
+ next.unset_pushed();
+
+ let actual = state
+ .compare_and_swap(curr.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == curr {
+ return curr;
+ }
+
+ curr = actual;
+ }
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> State {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("backup::State")
+ .field("is_pushed", &self.is_pushed())
+ .field("is_running", &self.is_running())
+ .field("is_terminated", &self.is_terminated())
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs
new file mode 100644
index 0000000000..b9a46d08ef
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs
@@ -0,0 +1,191 @@
+use pool::{Backup, BackupId};
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
+
+#[derive(Debug)]
+pub(crate) struct BackupStack {
+ state: AtomicUsize,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+struct State(usize);
+
+pub(crate) const MAX_BACKUP: usize = 1 << 15;
+
+/// Extracts the head of the backup stack from the state
+const STACK_MASK: usize = ((1 << 16) - 1);
+
+/// Used to mark the stack as empty
+pub(crate) const EMPTY: BackupId = BackupId(MAX_BACKUP);
+
+/// Used to mark the stack as terminated
+pub(crate) const TERMINATED: BackupId = BackupId(EMPTY.0 + 1);
+
+/// How many bits the Treiber ABA guard is offset by
+const ABA_GUARD_SHIFT: usize = 16;
+
+#[cfg(target_pointer_width = "64")]
+const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
+
+#[cfg(target_pointer_width = "32")]
+const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
+
+// ===== impl BackupStack =====
+
+impl BackupStack {
+ pub fn new() -> BackupStack {
+ let state = AtomicUsize::new(State::new().into());
+ BackupStack { state }
+ }
+
+ /// Push a backup thread onto the stack
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` on success.
+ ///
+ /// Returns `Err` if the pool has transitioned to the `TERMINATED` state.
+ /// When terminated, pushing new entries is no longer permitted.
+ pub fn push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()> {
+ let mut state: State = self.state.load(Acquire).into();
+
+ entries[id.0].set_pushed(AcqRel);
+
+ loop {
+ let mut next = state;
+
+ let head = state.head();
+
+ if head == TERMINATED {
+ // The pool is terminated, cannot push the sleeper.
+ return Err(());
+ }
+
+ entries[id.0].set_next_sleeper(head);
+ next.set_head(id);
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ return Ok(());
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Pop a backup thread off the stack.
+ ///
+ /// If `terminate` is set and the stack is empty when this function is
+ /// called, the state of the stack is transitioned to "terminated". At this
+ /// point, no further entries can be pushed onto the stack.
+ ///
+ /// # Return
+ ///
+ /// * Returns the index of the popped worker and the worker's observed
+ /// state.
+ ///
+ /// * `Ok(None)` if the stack is empty.
+ /// * `Err(_)` is returned if the pool has been shutdown.
+ pub fn pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()> {
+ // Figure out the empty value
+ let terminal = match terminate {
+ true => TERMINATED,
+ false => EMPTY,
+ };
+
+ let mut state: State = self.state.load(Acquire).into();
+
+ loop {
+ let head = state.head();
+
+ if head == EMPTY {
+ let mut next = state;
+ next.set_head(terminal);
+
+ if next == state {
+ debug_assert!(terminal == EMPTY);
+ return Ok(None);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual != state {
+ state = actual;
+ continue;
+ }
+
+ return Ok(None);
+ } else if head == TERMINATED {
+ return Err(());
+ }
+
+ debug_assert!(head.0 < MAX_BACKUP);
+
+ let mut next = state;
+
+ let next_head = entries[head.0].next_sleeper();
+
+ // TERMINATED can never be set as the "next pointer" on a worker.
+ debug_assert!(next_head != TERMINATED);
+
+ if next_head == EMPTY {
+ next.set_head(terminal);
+ } else {
+ next.set_head(next_head);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ debug_assert!(entries[head.0].is_pushed());
+ return Ok(Some(head));
+ }
+
+ state = actual;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ fn new() -> State {
+ State(EMPTY.0)
+ }
+
+ fn head(&self) -> BackupId {
+ BackupId(self.0 & STACK_MASK)
+ }
+
+ fn set_head(&mut self, val: BackupId) {
+ let val = val.0;
+
+ // The ABA guard protects against the ABA problem w/ Treiber stacks
+ let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
+
+ self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/pool/mod.rs b/third_party/rust/tokio-threadpool/src/pool/mod.rs
new file mode 100644
index 0000000000..0a42359b3c
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/mod.rs
@@ -0,0 +1,475 @@
+mod backup;
+mod backup_stack;
+mod state;
+
+pub(crate) use self::backup::{Backup, BackupId};
+pub(crate) use self::backup_stack::MAX_BACKUP;
+pub(crate) use self::state::{Lifecycle, State, MAX_FUTURES};
+
+use self::backup::Handoff;
+use self::backup_stack::BackupStack;
+
+use config::Config;
+use shutdown::ShutdownTrigger;
+use task::{Blocking, Task};
+use worker::{self, Worker, WorkerId};
+
+use futures::Poll;
+
+use std::cell::Cell;
+use std::collections::hash_map::RandomState;
+use std::hash::{BuildHasher, Hash, Hasher};
+use std::num::Wrapping;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
+use std::sync::{Arc, Weak};
+use std::thread;
+
+use crossbeam_deque::Injector;
+use crossbeam_utils::CachePadded;
+
+#[derive(Debug)]
+pub(crate) struct Pool {
+ // Tracks the state of the thread pool (running, shutting down, ...).
+ //
+ // While workers check this field as a hint to detect shutdown, it is
+ // **not** used as a primary point of coordination for workers. The sleep
+ // stack is used as the primary point of coordination for workers.
+ //
+ // The value of this atomic is deserialized into a `pool::State` instance.
+ // See comments for that type.
+ pub state: CachePadded<AtomicUsize>,
+
+ // Stack tracking sleeping workers.
+ sleep_stack: CachePadded<worker::Stack>,
+
+ // Worker state
+ //
+ // A worker is a thread that is processing the work queue and polling
+ // futures.
+ //
+ // The number of workers will *usually* be small.
+ pub workers: Arc<[worker::Entry]>,
+
+ // The global MPMC queue of tasks.
+ //
+ // Spawned tasks are pushed into this queue. Although worker threads have their own dedicated
+ // task queues, they periodically steal tasks from this global queue, too.
+ pub queue: Arc<Injector<Arc<Task>>>,
+
+ // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
+ //
+ // When spawning a new `Worker`, this weak reference is upgraded and handed out to the new
+ // thread.
+ pub trigger: Weak<ShutdownTrigger>,
+
+ // Backup thread state
+ //
+ // In order to efficiently support `blocking`, a pool of backup threads is
+ // needed. These backup threads are ready to take over a worker if the
+ // future being processed requires blocking.
+ backup: Box<[Backup]>,
+
+ // Stack of sleeping backup threads
+ pub backup_stack: BackupStack,
+
+ // State regarding coordinating blocking sections and tracking tasks that
+ // are pending blocking capacity.
+ blocking: Blocking,
+
+ // Configuration
+ pub config: Config,
+}
+
+impl Pool {
+ /// Create a new `Pool`
+ pub fn new(
+ workers: Arc<[worker::Entry]>,
+ trigger: Weak<ShutdownTrigger>,
+ max_blocking: usize,
+ config: Config,
+ queue: Arc<Injector<Arc<Task>>>,
+ ) -> Pool {
+ let pool_size = workers.len();
+ let total_size = max_blocking + pool_size;
+
+ // Create the set of backup entries
+ //
+ // This is `backup + pool_size` because the core thread pool running the
+ // workers is spawned from backup as well.
+ let backup = (0..total_size)
+ .map(|_| Backup::new())
+ .collect::<Vec<_>>()
+ .into_boxed_slice();
+
+ let backup_stack = BackupStack::new();
+
+ for i in (0..backup.len()).rev() {
+ backup_stack.push(&backup, BackupId(i)).unwrap();
+ }
+
+ // Initialize the blocking state
+ let blocking = Blocking::new(max_blocking);
+
+ let ret = Pool {
+ state: CachePadded::new(AtomicUsize::new(State::new().into())),
+ sleep_stack: CachePadded::new(worker::Stack::new()),
+ workers,
+ queue,
+ trigger,
+ backup,
+ backup_stack,
+ blocking,
+ config,
+ };
+
+ // Now, we prime the sleeper stack
+ for i in 0..pool_size {
+ ret.sleep_stack.push(&ret.workers, i).unwrap();
+ }
+
+ ret
+ }
+
+ /// Start shutting down the pool. This means that no new futures will be
+ /// accepted.
+ pub fn shutdown(&self, now: bool, purge_queue: bool) {
+ let mut state: State = self.state.load(Acquire).into();
+
+ trace!("shutdown; state={:?}", state);
+
+ // For now, this must be true
+ debug_assert!(!purge_queue || now);
+
+ // Start by setting the shutdown flag
+ loop {
+ let mut next = state;
+
+ let num_futures = next.num_futures();
+
+ if next.lifecycle() == Lifecycle::ShutdownNow {
+ // Already transitioned to shutting down state
+
+ if !purge_queue || num_futures == 0 {
+ // Nothing more to do
+ return;
+ }
+
+ // The queue must be purged
+ debug_assert!(purge_queue);
+ next.clear_num_futures();
+ } else {
+ next.set_lifecycle(if now || num_futures == 0 {
+ // If already idle, always transition to shutdown now.
+ Lifecycle::ShutdownNow
+ } else {
+ Lifecycle::ShutdownOnIdle
+ });
+
+ if purge_queue {
+ next.clear_num_futures();
+ }
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ state = next;
+ break;
+ }
+
+ state = actual;
+ }
+
+ trace!(" -> transitioned to shutdown");
+
+ // Only transition to terminate if there are no futures currently on the
+ // pool
+ if state.num_futures() != 0 {
+ return;
+ }
+
+ self.terminate_sleeping_workers();
+ }
+
+ /// Called by `Worker` as it tries to enter a sleeping state. Before it
+ /// sleeps, it must push itself onto the sleep stack. This enables other
+ /// threads to see it when signaling work.
+ pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> {
+ self.sleep_stack.push(&self.workers, idx)
+ }
+
+ pub fn terminate_sleeping_workers(&self) {
+ use worker::Lifecycle::Signaled;
+
+ trace!(" -> shutting down workers");
+ // Wakeup all sleeping workers. They will wake up, see the state
+ // transition, and terminate.
+ while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) {
+ self.workers[idx].signal_stop(worker_state);
+ }
+
+ // Now terminate any backup threads
+ //
+ // The call to `pop` must be successful because shutting down the pool
+ // is coordinated and at this point, this is the only thread that will
+ // attempt to transition the backup stack to "terminated".
+ while let Ok(Some(backup_id)) = self.backup_stack.pop(&self.backup, true) {
+ self.backup[backup_id.0].signal_stop();
+ }
+ }
+
+ pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
+ self.blocking.poll_blocking_capacity(task)
+ }
+
+ /// Submit a task to the scheduler.
+ ///
+ /// Called from either inside or outside of the scheduler. If currently on
+ /// the scheduler, then a fast path is taken.
+ pub fn submit(&self, task: Arc<Task>, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+
+ Worker::with_current(|worker| {
+ if let Some(worker) = worker {
+ // If the worker is in blocking mode, then even though the
+ // thread-local variable is set, the current thread does not
+ // have ownership of that worker entry. This is because the
+ // worker entry has already been handed off to another thread.
+ //
+ // The second check handles the case where the current thread is
+ // part of a different threadpool than the one being submitted
+ // to.
+ if !worker.is_blocking() && *self == *worker.pool {
+ let idx = worker.id.0;
+
+ trace!(" -> submit internal; idx={}", idx);
+
+ worker.pool.workers[idx].submit_internal(task);
+ worker.pool.signal_work(pool);
+ return;
+ }
+ }
+
+ self.submit_external(task, pool);
+ });
+ }
+
+ /// Submit a task to the scheduler from off worker
+ ///
+ /// Called from outside of the scheduler, this function is how new tasks
+ /// enter the system.
+ pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+
+ trace!(" -> submit external");
+
+ self.queue.push(task);
+ self.signal_work(pool);
+ }
+
+ pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> {
+ // First update the state, this cannot fail because the caller must have
+ // exclusive access to the backup token.
+ self.backup[backup_id.0].release();
+
+ // Push the backup entry back on the stack
+ self.backup_stack.push(&self.backup, backup_id)
+ }
+
+ pub fn notify_blocking_task(&self, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+ self.blocking.notify_task(&pool);
+ }
+
+ /// Provision a thread to run a worker
+ pub fn spawn_thread(&self, id: WorkerId, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+
+ let backup_id = match self.backup_stack.pop(&self.backup, false) {
+ Ok(Some(backup_id)) => backup_id,
+ Ok(None) => panic!("no thread available"),
+ Err(_) => {
+ debug!("failed to spawn worker thread due to the thread pool shutting down");
+ return;
+ }
+ };
+
+ let need_spawn = self.backup[backup_id.0].worker_handoff(id.clone());
+
+ if !need_spawn {
+ return;
+ }
+
+ let trigger = match self.trigger.upgrade() {
+ None => {
+ // The pool is shutting down.
+ return;
+ }
+ Some(t) => t,
+ };
+
+ let mut th = thread::Builder::new();
+
+ if let Some(ref prefix) = pool.config.name_prefix {
+ th = th.name(format!("{}{}", prefix, backup_id.0));
+ }
+
+ if let Some(stack) = pool.config.stack_size {
+ th = th.stack_size(stack);
+ }
+
+ let pool = pool.clone();
+
+ let res = th.spawn(move || {
+ if let Some(ref f) = pool.config.after_start {
+ f();
+ }
+
+ let mut worker_id = id;
+
+ pool.backup[backup_id.0].start(&worker_id);
+
+ loop {
+ // The backup token should be in the running state.
+ debug_assert!(pool.backup[backup_id.0].is_running());
+
+ // TODO: Avoid always cloning
+ let worker = Worker::new(worker_id, backup_id, pool.clone(), trigger.clone());
+
+ // Run the worker. If the worker transitioned to a "blocking"
+ // state, then `is_blocking` will be true.
+ if !worker.do_run() {
+ // The worker shutdown, so exit the thread.
+ break;
+ }
+
+ debug_assert!(!pool.backup[backup_id.0].is_pushed());
+
+ // Push the thread back onto the backup stack. This makes it
+ // available for future handoffs.
+ //
+ // This **must** happen before notifying the task.
+ let res = pool.backup_stack.push(&pool.backup, backup_id);
+
+ if res.is_err() {
+ // The pool is being shutdown.
+ break;
+ }
+
+ // The task switched the current thread to blocking mode.
+ // Now that the blocking task completed, any tasks
+ pool.notify_blocking_task(&pool);
+
+ debug_assert!(pool.backup[backup_id.0].is_running());
+
+ // Wait for a handoff
+ let handoff = pool.backup[backup_id.0].wait_for_handoff(pool.config.keep_alive);
+
+ match handoff {
+ Handoff::Worker(id) => {
+ debug_assert!(pool.backup[backup_id.0].is_running());
+ worker_id = id;
+ }
+ Handoff::Idle | Handoff::Terminated => {
+ break;
+ }
+ }
+ }
+
+ if let Some(ref f) = pool.config.before_stop {
+ f();
+ }
+ });
+
+ if let Err(e) = res {
+ error!("failed to spawn worker thread; err={:?}", e);
+ panic!("failed to spawn worker thread: {:?}", e);
+ }
+ }
+
+ /// If there are any other workers currently relaxing, signal them that work
+ /// is available so that they can try to find more work to process.
+ pub fn signal_work(&self, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+
+ use worker::Lifecycle::Signaled;
+
+ if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
+ let entry = &self.workers[idx];
+
+ debug_assert!(
+ worker_state.lifecycle() != Signaled,
+ "actual={:?}",
+ worker_state.lifecycle(),
+ );
+
+ trace!("signal_work -- notify; idx={}", idx);
+
+ if !entry.notify(worker_state) {
+ trace!("signal_work -- spawn; idx={}", idx);
+ self.spawn_thread(WorkerId(idx), pool);
+ }
+ }
+ }
+
+ /// Generates a random number
+ ///
+ /// Uses a thread-local random number generator based on XorShift.
+ pub fn rand_usize(&self) -> usize {
+ thread_local! {
+ static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(prng_seed()));
+ }
+
+ RNG.with(|rng| {
+ // This is the 32-bit variant of Xorshift.
+ // https://en.wikipedia.org/wiki/Xorshift
+ let mut x = rng.get();
+ x ^= x << 13;
+ x ^= x >> 17;
+ x ^= x << 5;
+ rng.set(x);
+ x.0 as usize
+ })
+ }
+}
+
+impl PartialEq for Pool {
+ fn eq(&self, other: &Pool) -> bool {
+ self as *const _ == other as *const _
+ }
+}
+
+unsafe impl Send for Pool {}
+unsafe impl Sync for Pool {}
+
+// Return a thread-specific, 32-bit, non-zero seed value suitable for a 32-bit
+// PRNG. This uses one libstd RandomState for a default hasher and hashes on
+// the current thread ID to obtain an unpredictable, collision resistant seed.
+fn prng_seed() -> u32 {
+ // This obtains a small number of random bytes from the host system (for
+ // example, on unix via getrandom(2)) in order to seed an unpredictable and
+ // HashDoS resistant 64-bit hash function (currently: `SipHasher13` with
+ // 128-bit state). We only need one of these, to make the seeds for all
+ // process threads different via hashed IDs, collision resistant, and
+ // unpredictable.
+ lazy_static! {
+ static ref RND_STATE: RandomState = RandomState::new();
+ }
+
+ // Hash the current thread ID to produce a u32 value
+ let mut hasher = RND_STATE.build_hasher();
+ thread::current().id().hash(&mut hasher);
+ let hash: u64 = hasher.finish();
+ let seed = (hash as u32) ^ ((hash >> 32) as u32);
+
+ // Ensure non-zero seed (Xorshift yields only zero's for that seed)
+ if seed == 0 {
+ 0x9b4e_6d25 // misc bits, could be any non-zero
+ } else {
+ seed
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/pool/state.rs b/third_party/rust/tokio-threadpool/src/pool/state.rs
new file mode 100644
index 0000000000..5ecb514e5c
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/state.rs
@@ -0,0 +1,132 @@
+use std::{fmt, usize};
+
+/// ThreadPool state.
+///
+/// The two least significant bits are the shutdown flags. (0 for active, 1 for
+/// shutdown on idle, 2 for shutting down). The remaining bits represent the
+/// number of futures that still need to complete.
+#[derive(Eq, PartialEq, Clone, Copy)]
+pub(crate) struct State(usize);
+
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
+#[repr(usize)]
+pub(crate) enum Lifecycle {
+ /// The thread pool is currently running
+ Running = 0,
+
+ /// The thread pool should shutdown once it reaches an idle state.
+ ShutdownOnIdle = 1,
+
+ /// The thread pool should start the process of shutting down.
+ ShutdownNow = 2,
+}
+
+/// Mask used to extract the number of futures from the state
+const LIFECYCLE_MASK: usize = 0b11;
+const NUM_FUTURES_MASK: usize = !LIFECYCLE_MASK;
+const NUM_FUTURES_OFFSET: usize = 2;
+
+/// Max number of futures the pool can handle.
+pub(crate) const MAX_FUTURES: usize = usize::MAX >> NUM_FUTURES_OFFSET;
+
+// ===== impl State =====
+
+impl State {
+ #[inline]
+ pub fn new() -> State {
+ State(0)
+ }
+
+ /// Returns the number of futures still pending completion.
+ pub fn num_futures(&self) -> usize {
+ self.0 >> NUM_FUTURES_OFFSET
+ }
+
+ /// Increment the number of futures pending completion.
+ ///
+ /// Returns false on failure.
+ pub fn inc_num_futures(&mut self) {
+ debug_assert!(self.num_futures() < MAX_FUTURES);
+ debug_assert!(self.lifecycle() < Lifecycle::ShutdownNow);
+
+ self.0 += 1 << NUM_FUTURES_OFFSET;
+ }
+
+ /// Decrement the number of futures pending completion.
+ pub fn dec_num_futures(&mut self) {
+ let num_futures = self.num_futures();
+
+ if num_futures == 0 {
+ // Already zero
+ return;
+ }
+
+ self.0 -= 1 << NUM_FUTURES_OFFSET;
+
+ if self.lifecycle() == Lifecycle::ShutdownOnIdle && num_futures == 1 {
+ self.set_lifecycle(Lifecycle::ShutdownNow);
+ }
+ }
+
+ /// Set the number of futures pending completion to zero
+ pub fn clear_num_futures(&mut self) {
+ self.0 = self.0 & LIFECYCLE_MASK;
+ }
+
+ pub fn lifecycle(&self) -> Lifecycle {
+ (self.0 & LIFECYCLE_MASK).into()
+ }
+
+ pub fn set_lifecycle(&mut self, val: Lifecycle) {
+ self.0 = (self.0 & NUM_FUTURES_MASK) | (val as usize);
+ }
+
+ pub fn is_terminated(&self) -> bool {
+ self.lifecycle() == Lifecycle::ShutdownNow && self.num_futures() == 0
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("pool::State")
+ .field("lifecycle", &self.lifecycle())
+ .field("num_futures", &self.num_futures())
+ .finish()
+ }
+}
+
+// ===== impl Lifecycle =====
+
+impl From<usize> for Lifecycle {
+ fn from(src: usize) -> Lifecycle {
+ use self::Lifecycle::*;
+
+ debug_assert!(
+ src == Running as usize
+ || src == ShutdownOnIdle as usize
+ || src == ShutdownNow as usize
+ );
+
+ unsafe { ::std::mem::transmute(src) }
+ }
+}
+
+impl From<Lifecycle> for usize {
+ fn from(src: Lifecycle) -> usize {
+ let v = src as usize;
+ debug_assert!(v & LIFECYCLE_MASK == v);
+ v
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/sender.rs b/third_party/rust/tokio-threadpool/src/sender.rs
new file mode 100644
index 0000000000..3ae3deca4d
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/sender.rs
@@ -0,0 +1,218 @@
+use pool::{self, Lifecycle, Pool, MAX_FUTURES};
+use task::Task;
+
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
+use std::sync::Arc;
+
+use futures::{future, Future};
+use tokio_executor::{self, SpawnError};
+
+/// Submit futures to the associated thread pool for execution.
+///
+/// A `Sender` instance is a handle to a single thread pool, allowing the owner
+/// of the handle to spawn futures onto the thread pool. New futures are spawned
+/// using [`Sender::spawn`].
+///
+/// The `Sender` handle is *only* used for spawning new futures. It does not
+/// impact the lifecycle of the thread pool in any way.
+///
+/// `Sender` instances are obtained by calling [`ThreadPool::sender`]. The
+/// `Sender` struct implements the `Executor` trait.
+///
+/// [`Sender::spawn`]: #method.spawn
+/// [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender
+#[derive(Debug)]
+pub struct Sender {
+ pub(crate) pool: Arc<Pool>,
+}
+
+impl Sender {
+ /// Spawn a future onto the thread pool
+ ///
+ /// This function takes ownership of the future and spawns it onto the
+ /// thread pool, assigning it to a worker thread. The exact strategy used to
+ /// assign a future to a worker depends on if the caller is already on a
+ /// worker thread or external to the thread pool.
+ ///
+ /// If the caller is currently on the thread pool, the spawned future will
+ /// be assigned to the same worker that the caller is on. If the caller is
+ /// external to the thread pool, the future will be assigned to a random
+ /// worker.
+ ///
+ /// If `spawn` returns `Ok`, this does not mean that the future will be
+ /// executed. The thread pool can be forcibly shutdown between the time
+ /// `spawn` is called and the future has a chance to execute.
+ ///
+ /// If `spawn` returns `Err`, then the future failed to be spawned. There
+ /// are two possible causes:
+ ///
+ /// * The thread pool is at capacity and is unable to spawn a new future.
+ /// This is a temporary failure. At some point in the future, the thread
+ /// pool might be able to spawn new futures.
+ /// * The thread pool is shutdown. This is a permanent failure indicating
+ /// that the handle will never be able to spawn new futures.
+ ///
+ /// The status of the thread pool can be queried before calling `spawn`
+ /// using the `status` function (part of the `Executor` trait).
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::ThreadPool;
+ /// use futures::future::{Future, lazy};
+ ///
+ /// # pub fn main() {
+ /// // Create a thread pool with default configuration values
+ /// let thread_pool = ThreadPool::new();
+ ///
+ /// thread_pool.sender().spawn(lazy(|| {
+ /// println!("called from a worker thread");
+ /// Ok(())
+ /// })).unwrap();
+ ///
+ /// // Gracefully shutdown the threadpool
+ /// thread_pool.shutdown().wait().unwrap();
+ /// # }
+ /// ```
+ pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
+ where
+ F: Future<Item = (), Error = ()> + Send + 'static,
+ {
+ let mut s = self;
+ tokio_executor::Executor::spawn(&mut s, Box::new(future))
+ }
+
+ /// Logic to prepare for spawning
+ fn prepare_for_spawn(&self) -> Result<(), SpawnError> {
+ let mut state: pool::State = self.pool.state.load(Acquire).into();
+
+ // Increment the number of futures spawned on the pool as well as
+ // validate that the pool is still running/
+ loop {
+ let mut next = state;
+
+ if next.num_futures() == MAX_FUTURES {
+ // No capacity
+ return Err(SpawnError::at_capacity());
+ }
+
+ if next.lifecycle() == Lifecycle::ShutdownNow {
+ // Cannot execute the future, executor is shutdown.
+ return Err(SpawnError::shutdown());
+ }
+
+ next.inc_num_futures();
+
+ let actual = self
+ .pool
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ trace!("execute; count={:?}", next.num_futures());
+ break;
+ }
+
+ state = actual;
+ }
+
+ Ok(())
+ }
+}
+
+impl tokio_executor::Executor for Sender {
+ fn status(&self) -> Result<(), tokio_executor::SpawnError> {
+ let s = self;
+ tokio_executor::Executor::status(&s)
+ }
+
+ fn spawn(
+ &mut self,
+ future: Box<dyn Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError> {
+ let mut s = &*self;
+ tokio_executor::Executor::spawn(&mut s, future)
+ }
+}
+
+impl<'a> tokio_executor::Executor for &'a Sender {
+ fn status(&self) -> Result<(), tokio_executor::SpawnError> {
+ let state: pool::State = self.pool.state.load(Acquire).into();
+
+ if state.num_futures() == MAX_FUTURES {
+ // No capacity
+ return Err(SpawnError::at_capacity());
+ }
+
+ if state.lifecycle() == Lifecycle::ShutdownNow {
+ // Cannot execute the future, executor is shutdown.
+ return Err(SpawnError::shutdown());
+ }
+
+ Ok(())
+ }
+
+ fn spawn(
+ &mut self,
+ future: Box<dyn Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError> {
+ self.prepare_for_spawn()?;
+
+ // At this point, the pool has accepted the future, so schedule it for
+ // execution.
+
+ // Create a new task for the future
+ let task = Arc::new(Task::new(future));
+
+ // Call `submit_external()` in order to place the task into the global
+ // queue. This way all workers have equal chance of running this task,
+ // which means IO handles will be assigned to reactors more evenly.
+ self.pool.submit_external(task, &self.pool);
+
+ Ok(())
+ }
+}
+
+impl<T> tokio_executor::TypedExecutor<T> for Sender
+where
+ T: Future<Item = (), Error = ()> + Send + 'static,
+{
+ fn status(&self) -> Result<(), tokio_executor::SpawnError> {
+ tokio_executor::Executor::status(self)
+ }
+
+ fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
+ tokio_executor::Executor::spawn(self, Box::new(future))
+ }
+}
+
+impl<T> future::Executor<T> for Sender
+where
+ T: Future<Item = (), Error = ()> + Send + 'static,
+{
+ fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
+ if let Err(e) = tokio_executor::Executor::status(self) {
+ let kind = if e.is_at_capacity() {
+ future::ExecuteErrorKind::NoCapacity
+ } else {
+ future::ExecuteErrorKind::Shutdown
+ };
+
+ return Err(future::ExecuteError::new(kind, future));
+ }
+
+ let _ = self.spawn(future);
+ Ok(())
+ }
+}
+
+impl Clone for Sender {
+ #[inline]
+ fn clone(&self) -> Sender {
+ let pool = self.pool.clone();
+ Sender { pool }
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/shutdown.rs b/third_party/rust/tokio-threadpool/src/shutdown.rs
new file mode 100644
index 0000000000..c3d04a002d
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/shutdown.rs
@@ -0,0 +1,103 @@
+use task::Task;
+use worker;
+
+use crossbeam_deque::Injector;
+use futures::task::AtomicTask;
+use futures::{Async, Future, Poll};
+
+use std::sync::{Arc, Mutex};
+
+/// Future that resolves when the thread pool is shutdown.
+///
+/// A `ThreadPool` is shutdown once all the worker have drained their queues and
+/// shutdown their threads.
+///
+/// `Shutdown` is returned by [`shutdown`], [`shutdown_on_idle`], and
+/// [`shutdown_now`].
+///
+/// [`shutdown`]: struct.ThreadPool.html#method.shutdown
+/// [`shutdown_on_idle`]: struct.ThreadPool.html#method.shutdown_on_idle
+/// [`shutdown_now`]: struct.ThreadPool.html#method.shutdown_now
+#[derive(Debug)]
+pub struct Shutdown {
+ inner: Arc<Mutex<Inner>>,
+}
+
+/// Shared state between `Shutdown` and `ShutdownTrigger`.
+///
+/// This is used for notifying the `Shutdown` future when `ShutdownTrigger` gets dropped.
+#[derive(Debug)]
+struct Inner {
+ /// The task to notify when the threadpool completes the shutdown process.
+ task: AtomicTask,
+ /// `true` if the threadpool has been shut down.
+ completed: bool,
+}
+
+impl Shutdown {
+ pub(crate) fn new(trigger: &ShutdownTrigger) -> Shutdown {
+ Shutdown {
+ inner: trigger.inner.clone(),
+ }
+ }
+}
+
+impl Future for Shutdown {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ let inner = self.inner.lock().unwrap();
+
+ if !inner.completed {
+ inner.task.register();
+ Ok(Async::NotReady)
+ } else {
+ Ok(().into())
+ }
+ }
+}
+
+/// When dropped, cleans up threadpool's resources and completes the shutdown process.
+#[derive(Debug)]
+pub(crate) struct ShutdownTrigger {
+ inner: Arc<Mutex<Inner>>,
+ workers: Arc<[worker::Entry]>,
+ queue: Arc<Injector<Arc<Task>>>,
+}
+
+unsafe impl Send for ShutdownTrigger {}
+unsafe impl Sync for ShutdownTrigger {}
+
+impl ShutdownTrigger {
+ pub(crate) fn new(
+ workers: Arc<[worker::Entry]>,
+ queue: Arc<Injector<Arc<Task>>>,
+ ) -> ShutdownTrigger {
+ ShutdownTrigger {
+ inner: Arc::new(Mutex::new(Inner {
+ task: AtomicTask::new(),
+ completed: false,
+ })),
+ workers,
+ queue,
+ }
+ }
+}
+
+impl Drop for ShutdownTrigger {
+ fn drop(&mut self) {
+ // Drain the global task queue.
+ while !self.queue.steal().is_empty() {}
+
+ // Drop the remaining incomplete tasks and parkers assosicated with workers.
+ for worker in self.workers.iter() {
+ worker.shutdown();
+ }
+
+ // Notify the task interested in shutdown.
+ let mut inner = self.inner.lock().unwrap();
+ inner.completed = true;
+ inner.task.notify();
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/task/blocking.rs b/third_party/rust/tokio-threadpool/src/task/blocking.rs
new file mode 100644
index 0000000000..ded59edfef
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/blocking.rs
@@ -0,0 +1,496 @@
+use pool::Pool;
+use task::{BlockingState, Task};
+
+use futures::{Async, Poll};
+
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::ptr;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+use std::sync::Arc;
+use std::thread;
+
+/// Manages the state around entering a blocking section and tasks that are
+/// queued pending the ability to block.
+///
+/// This is a hybrid counter and intrusive mpsc channel (like `Queue`).
+#[derive(Debug)]
+pub(crate) struct Blocking {
+ /// Queue head.
+ ///
+ /// This is either the current remaining capacity for blocking sections
+ /// **or** if the max has been reached, the head of a pending blocking
+ /// capacity channel of tasks.
+ ///
+ /// When this points to a task, it represents a strong reference, i.e.
+ /// `Arc<Task>`.
+ state: AtomicUsize,
+
+ /// Tail pointer. This is `Arc<Task>` unless it points to `stub`.
+ tail: UnsafeCell<*mut Task>,
+
+ /// Stub pointer, used as part of the intrusive mpsc channel algorithm
+ /// described by 1024cores.
+ stub: Box<Task>,
+
+ /// The channel algorithm is MPSC. This means that, in order to pop tasks,
+ /// coordination is required.
+ ///
+ /// Since it doesn't matter *which* task pops & notifies the queued task, we
+ /// can avoid a full mutex and make the "lock" lock free.
+ ///
+ /// Instead, threads race to set the "entered" bit. When the transition is
+ /// successfully made, the thread has permission to pop tasks off of the
+ /// queue. If a thread loses the race, instead of waiting to pop a task, it
+ /// signals to the winning thread that it should pop an additional task.
+ lock: AtomicUsize,
+}
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) enum CanBlock {
+ /// Blocking capacity has been allocated to this task.
+ ///
+ /// The capacity allocation is initially checked before a task is polled. If
+ /// capacity has been allocated, it is consumed and tracked as `Allocated`.
+ Allocated,
+
+ /// Allocation capacity must be either available to the task when it is
+ /// polled or not available. This means that a task can only ask for
+ /// capacity once. This state is used to track a task that has not yet asked
+ /// for blocking capacity. When a task needs blocking capacity, if it is in
+ /// this state, it can immediately try to get an allocation.
+ CanRequest,
+
+ /// The task has requested blocking capacity, but none is available.
+ NoCapacity,
+}
+
+/// Decorates the `usize` value of `Blocking::state`, providing fns to
+/// manipulate the state instead of requiring bit ops.
+#[derive(Copy, Clone, Eq, PartialEq)]
+struct State(usize);
+
+/// Flag differentiating between remaining capacity and task pointers.
+///
+/// If we assume pointers are properly aligned, then the least significant bit
+/// will always be zero. So, we use that bit to track if the value represents a
+/// number.
+const NUM_FLAG: usize = 1;
+
+/// When representing "numbers", the state has to be shifted this much (to get
+/// rid of the flag bit).
+const NUM_SHIFT: usize = 1;
+
+// ====== impl Blocking =====
+//
+impl Blocking {
+ /// Create a new `Blocking`.
+ pub fn new(capacity: usize) -> Blocking {
+ assert!(capacity > 0, "blocking capacity must be greater than zero");
+
+ let stub = Box::new(Task::stub());
+ let ptr = &*stub as *const _ as *mut _;
+
+ // Allocations are aligned
+ debug_assert!(ptr as usize & NUM_FLAG == 0);
+
+ // The initial state value. This starts at the max capacity.
+ let init = State::new(capacity);
+
+ Blocking {
+ state: AtomicUsize::new(init.into()),
+ tail: UnsafeCell::new(ptr),
+ stub: stub,
+ lock: AtomicUsize::new(0),
+ }
+ }
+
+ /// Atomically either acquire blocking capacity or queue the task to be
+ /// notified once capacity becomes available.
+ ///
+ /// The caller must ensure that `task` has not previously been queued to be
+ /// notified when capacity becomes available.
+ pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
+ // This requires atomically claiming blocking capacity and if none is
+ // available, queuing &task.
+
+ // The task cannot be queued at this point. The caller must ensure this.
+ debug_assert!(!BlockingState::from(task.blocking.load(Acquire)).is_queued());
+
+ // Don't bump the ref count unless necessary.
+ let mut strong: Option<*const Task> = None;
+
+ // Load the state
+ let mut curr: State = self.state.load(Acquire).into();
+
+ loop {
+ let mut next = curr;
+
+ if !next.claim_capacity(&self.stub) {
+ debug_assert!(curr.ptr().is_some());
+
+ // Unable to claim capacity, so we must queue `task` onto the
+ // channel.
+ //
+ // This guard also serves to ensure that queuing work that is
+ // only needed to run once only gets run once.
+ if strong.is_none() {
+ // First, transition the task to a "queued" state. This
+ // prevents double queuing.
+ //
+ // This is also the only thread that can set the queued flag
+ // at this point. And, the goal is for this to only be
+ // visible when the task node is polled from the channel.
+ // The memory ordering is established by MPSC queue
+ // operation.
+ //
+ // Note that, if the task doesn't get queued (because the
+ // CAS fails and capacity is now available) then this flag
+ // must be unset. Again, there is no race because until the
+ // task is queued, no other thread can see it.
+ let prev = BlockingState::toggle_queued(&task.blocking, Relaxed);
+ debug_assert!(!prev.is_queued());
+
+ // Bump the ref count
+ strong = Some(Arc::into_raw(task.clone()));
+
+ // Set the next pointer. This does not require an atomic
+ // operation as this node is not currently accessible to
+ // other threads via the queue.
+ task.next_blocking.store(ptr::null_mut(), Relaxed);
+ }
+
+ let ptr = strong.unwrap();
+
+ // Update the head to point to the new node. We need to see the
+ // previous node in order to update the next pointer as well as
+ // release `task` to any other threads calling `push`.
+ next.set_ptr(ptr);
+ }
+
+ debug_assert_ne!(curr.0, 0);
+ debug_assert_ne!(next.0, 0);
+
+ let actual = self
+ .state
+ .compare_and_swap(curr.into(), next.into(), AcqRel)
+ .into();
+
+ if curr == actual {
+ break;
+ }
+
+ curr = actual;
+ }
+
+ match curr.ptr() {
+ Some(prev) => {
+ let ptr = strong.unwrap();
+
+ // Finish pushing
+ unsafe {
+ (*prev).next_blocking.store(ptr as *mut _, Release);
+ }
+
+ // The node was queued to be notified once capacity is made
+ // available.
+ Ok(Async::NotReady)
+ }
+ None => {
+ debug_assert!(curr.remaining_capacity() > 0);
+
+ // If `strong` is set, gotta undo a bunch of work
+ if let Some(ptr) = strong {
+ let _ = unsafe { Arc::from_raw(ptr) };
+
+ // Unset the queued flag.
+ let prev = BlockingState::toggle_queued(&task.blocking, Relaxed);
+ debug_assert!(prev.is_queued());
+ }
+
+ // Capacity has been obtained
+ Ok(().into())
+ }
+ }
+ }
+
+ unsafe fn push_stub(&self) {
+ let task: *mut Task = &*self.stub as *const _ as *mut _;
+
+ // Set the next pointer. This does not require an atomic operation as
+ // this node is not accessible. The write will be flushed with the next
+ // operation
+ (*task).next_blocking.store(ptr::null_mut(), Relaxed);
+
+ // Update the head to point to the new node. We need to see the previous
+ // node in order to update the next pointer as well as release `task`
+ // to any other threads calling `push`.
+ let prev = self.state.swap(task as usize, AcqRel);
+
+ // The stub is only pushed when there are pending tasks. Because of
+ // this, the state must *always* be in pointer mode.
+ debug_assert!(State::from(prev).is_ptr());
+
+ let prev = prev as *const Task;
+
+ // We don't want the *existing* pointer to be a stub.
+ debug_assert_ne!(prev, task);
+
+ // Release `task` to the consume end.
+ (*prev).next_blocking.store(task, Release);
+ }
+
+ pub fn notify_task(&self, pool: &Arc<Pool>) {
+ let prev = self.lock.fetch_add(1, AcqRel);
+
+ if prev != 0 {
+ // Another thread has the lock and will be responsible for notifying
+ // pending tasks.
+ return;
+ }
+
+ let mut dec = 1;
+
+ loop {
+ let mut remaining_pops = dec;
+ while remaining_pops > 0 {
+ remaining_pops -= 1;
+
+ let task = match self.pop(remaining_pops) {
+ Some(t) => t,
+ None => break,
+ };
+
+ Task::notify_blocking(task, pool);
+ }
+
+ // Decrement the number of handled notifications
+ let actual = self.lock.fetch_sub(dec, AcqRel);
+
+ if actual == dec {
+ break;
+ }
+
+ // This can only be greater than expected as we are the only thread
+ // that is decrementing.
+ debug_assert!(actual > dec);
+ dec = actual - dec;
+ }
+ }
+
+ /// Pop a task
+ ///
+ /// `rem` represents the remaining number of times the caller will pop. If
+ /// there are no more tasks to pop, `rem` is used to set the remaining
+ /// capacity.
+ fn pop(&self, rem: usize) -> Option<Arc<Task>> {
+ 'outer: loop {
+ unsafe {
+ let mut tail = *self.tail.get();
+ let mut next = (*tail).next_blocking.load(Acquire);
+
+ let stub = &*self.stub as *const _ as *mut _;
+
+ if tail == stub {
+ if next.is_null() {
+ // This loop is not part of the standard intrusive mpsc
+ // channel algorithm. This is where we atomically pop
+ // the last task and add `rem` to the remaining capacity.
+ //
+ // This modification to the pop algorithm works because,
+ // at this point, we have not done any work (only done
+ // reading). We have a *pretty* good idea that there is
+ // no concurrent pusher.
+ //
+ // The capacity is then atomically added by doing an
+ // AcqRel CAS on `state`. The `state` cell is the
+ // linchpin of the algorithm.
+ //
+ // By successfully CASing `head` w/ AcqRel, we ensure
+ // that, if any thread was racing and entered a push, we
+ // see that and abort pop, retrying as it is
+ // "inconsistent".
+ let mut curr: State = self.state.load(Acquire).into();
+
+ loop {
+ if curr.has_task(&self.stub) {
+ // Inconsistent state, yield the thread and try
+ // again.
+ thread::yield_now();
+ continue 'outer;
+ }
+
+ let mut after = curr;
+
+ // +1 here because `rem` represents the number of
+ // pops that will come after the current one.
+ after.add_capacity(rem + 1, &self.stub);
+
+ let actual: State = self
+ .state
+ .compare_and_swap(curr.into(), after.into(), AcqRel)
+ .into();
+
+ if actual == curr {
+ // Successfully returned the remaining capacity
+ return None;
+ }
+
+ curr = actual;
+ }
+ }
+
+ *self.tail.get() = next;
+ tail = next;
+ next = (*next).next_blocking.load(Acquire);
+ }
+
+ if !next.is_null() {
+ *self.tail.get() = next;
+
+ // No ref_count inc is necessary here as this poll is paired
+ // with a `push` which "forgets" the handle.
+ return Some(Arc::from_raw(tail));
+ }
+
+ let state = self.state.load(Acquire);
+
+ // This must always be a pointer
+ debug_assert!(State::from(state).is_ptr());
+
+ if state != tail as usize {
+ // Try again
+ thread::yield_now();
+ continue 'outer;
+ }
+
+ self.push_stub();
+
+ next = (*tail).next_blocking.load(Acquire);
+
+ if !next.is_null() {
+ *self.tail.get() = next;
+
+ return Some(Arc::from_raw(tail));
+ }
+
+ thread::yield_now();
+ // Try again
+ }
+ }
+ }
+}
+
+// ====== impl State =====
+
+impl State {
+ /// Return a new `State` representing the remaining capacity at the maximum
+ /// value.
+ fn new(capacity: usize) -> State {
+ State((capacity << NUM_SHIFT) | NUM_FLAG)
+ }
+
+ fn remaining_capacity(&self) -> usize {
+ if !self.has_remaining_capacity() {
+ return 0;
+ }
+
+ self.0 >> 1
+ }
+
+ fn has_remaining_capacity(&self) -> bool {
+ self.0 & NUM_FLAG == NUM_FLAG
+ }
+
+ fn has_task(&self, stub: &Task) -> bool {
+ !(self.has_remaining_capacity() || self.is_stub(stub))
+ }
+
+ fn is_stub(&self, stub: &Task) -> bool {
+ self.0 == stub as *const _ as usize
+ }
+
+ /// Try to claim blocking capacity.
+ ///
+ /// # Return
+ ///
+ /// Returns `true` if the capacity was claimed, `false` otherwise. If
+ /// `false` is returned, it can be assumed that `State` represents the head
+ /// pointer in the mpsc channel.
+ fn claim_capacity(&mut self, stub: &Task) -> bool {
+ if !self.has_remaining_capacity() {
+ return false;
+ }
+
+ debug_assert!(self.0 != 1);
+
+ self.0 -= 1 << NUM_SHIFT;
+
+ if self.0 == NUM_FLAG {
+ // Set the state to the stub pointer.
+ self.0 = stub as *const _ as usize;
+ }
+
+ true
+ }
+
+ /// Add blocking capacity.
+ fn add_capacity(&mut self, capacity: usize, stub: &Task) -> bool {
+ debug_assert!(capacity > 0);
+
+ if self.is_stub(stub) {
+ self.0 = (capacity << NUM_SHIFT) | NUM_FLAG;
+ true
+ } else if self.has_remaining_capacity() {
+ self.0 += capacity << NUM_SHIFT;
+ true
+ } else {
+ false
+ }
+ }
+
+ fn is_ptr(&self) -> bool {
+ self.0 & NUM_FLAG == 0
+ }
+
+ fn ptr(&self) -> Option<*const Task> {
+ if self.is_ptr() {
+ Some(self.0 as *const Task)
+ } else {
+ None
+ }
+ }
+
+ fn set_ptr(&mut self, ptr: *const Task) {
+ let ptr = ptr as usize;
+ debug_assert!(ptr & NUM_FLAG == 0);
+ self.0 = ptr
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> State {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ let mut fmt = fmt.debug_struct("State");
+
+ if self.is_ptr() {
+ fmt.field("ptr", &self.0);
+ } else {
+ fmt.field("remaining", &self.remaining_capacity());
+ }
+
+ fmt.finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/task/blocking_state.rs b/third_party/rust/tokio-threadpool/src/task/blocking_state.rs
new file mode 100644
index 0000000000..b41fc4868d
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/blocking_state.rs
@@ -0,0 +1,89 @@
+use task::CanBlock;
+
+use std::fmt;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+/// State tracking task level state to support `blocking`.
+///
+/// This tracks two separate flags.
+///
+/// a) If the task is queued in the pending blocking channel. This prevents
+/// double queuing (which would break the linked list).
+///
+/// b) If the task has been allocated capacity to block.
+#[derive(Eq, PartialEq)]
+pub(crate) struct BlockingState(usize);
+
+const QUEUED: usize = 0b01;
+const ALLOCATED: usize = 0b10;
+
+impl BlockingState {
+ /// Create a new, default, `BlockingState`.
+ pub fn new() -> BlockingState {
+ BlockingState(0)
+ }
+
+ /// Returns `true` if the state represents the associated task being queued
+ /// in the pending blocking capacity channel
+ pub fn is_queued(&self) -> bool {
+ self.0 & QUEUED == QUEUED
+ }
+
+ /// Toggle the queued flag
+ ///
+ /// Returns the state before the flag has been toggled.
+ pub fn toggle_queued(state: &AtomicUsize, ordering: Ordering) -> BlockingState {
+ state.fetch_xor(QUEUED, ordering).into()
+ }
+
+ /// Returns `true` if the state represents the associated task having been
+ /// allocated capacity to block.
+ pub fn is_allocated(&self) -> bool {
+ self.0 & ALLOCATED == ALLOCATED
+ }
+
+ /// Atomically consume the capacity allocation and return if the allocation
+ /// was present.
+ ///
+ /// If this returns `true`, then the task has the ability to block for the
+ /// duration of the `poll`.
+ pub fn consume_allocation(state: &AtomicUsize, ordering: Ordering) -> CanBlock {
+ let state: Self = state.fetch_and(!ALLOCATED, ordering).into();
+
+ if state.is_allocated() {
+ CanBlock::Allocated
+ } else if state.is_queued() {
+ CanBlock::NoCapacity
+ } else {
+ CanBlock::CanRequest
+ }
+ }
+
+ pub fn notify_blocking(state: &AtomicUsize, ordering: Ordering) {
+ let prev: Self = state.fetch_xor(ALLOCATED | QUEUED, ordering).into();
+
+ debug_assert!(prev.is_queued());
+ debug_assert!(!prev.is_allocated());
+ }
+}
+
+impl From<usize> for BlockingState {
+ fn from(src: usize) -> BlockingState {
+ BlockingState(src)
+ }
+}
+
+impl From<BlockingState> for usize {
+ fn from(src: BlockingState) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for BlockingState {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BlockingState")
+ .field("is_queued", &self.is_queued())
+ .field("is_allocated", &self.is_allocated())
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/task/mod.rs b/third_party/rust/tokio-threadpool/src/task/mod.rs
new file mode 100644
index 0000000000..9c5a448fcd
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/mod.rs
@@ -0,0 +1,308 @@
+mod blocking;
+mod blocking_state;
+mod state;
+
+pub(crate) use self::blocking::{Blocking, CanBlock};
+use self::blocking_state::BlockingState;
+use self::state::State;
+
+use notifier::Notifier;
+use pool::Pool;
+
+use futures::executor::{self, Spawn};
+use futures::{self, Async, Future};
+
+use std::cell::{Cell, UnsafeCell};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+use std::sync::atomic::{AtomicPtr, AtomicUsize};
+use std::sync::Arc;
+use std::{fmt, panic, ptr};
+
+/// Harness around a future.
+///
+/// This also behaves as a node in the inbound work queue and the blocking
+/// queue.
+pub(crate) struct Task {
+ /// Task lifecycle state
+ state: AtomicUsize,
+
+ /// Task blocking related state
+ blocking: AtomicUsize,
+
+ /// Next pointer in the queue of tasks pending blocking capacity.
+ next_blocking: AtomicPtr<Task>,
+
+ /// ID of the worker that polled this task first.
+ ///
+ /// This field can be a `Cell` because it's only accessed by the worker thread that is
+ /// executing the task.
+ ///
+ /// The worker ID is represented by a `u32` rather than `usize` in order to save some space
+ /// on 64-bit platforms.
+ pub reg_worker: Cell<Option<u32>>,
+
+ /// The key associated with this task in the `Slab` it was registered in.
+ ///
+ /// This field can be a `Cell` because it's only accessed by the worker thread that has
+ /// registered the task.
+ pub reg_index: Cell<usize>,
+
+ /// Store the future at the head of the struct
+ ///
+ /// The future is dropped immediately when it transitions to Complete
+ future: UnsafeCell<Option<Spawn<BoxFuture>>>,
+}
+
+#[derive(Debug)]
+pub(crate) enum Run {
+ Idle,
+ Schedule,
+ Complete,
+}
+
+type BoxFuture = Box<dyn Future<Item = (), Error = ()> + Send + 'static>;
+
+// ===== impl Task =====
+
+impl Task {
+ /// Create a new `Task` as a harness for `future`.
+ pub fn new(future: BoxFuture) -> Task {
+ // Wrap the future with an execution context.
+ let task_fut = executor::spawn(future);
+
+ Task {
+ state: AtomicUsize::new(State::new().into()),
+ blocking: AtomicUsize::new(BlockingState::new().into()),
+ next_blocking: AtomicPtr::new(ptr::null_mut()),
+ reg_worker: Cell::new(None),
+ reg_index: Cell::new(0),
+ future: UnsafeCell::new(Some(task_fut)),
+ }
+ }
+
+ /// Create a fake `Task` to be used as part of the intrusive mpsc channel
+ /// algorithm.
+ fn stub() -> Task {
+ let future = Box::new(futures::empty()) as BoxFuture;
+ let task_fut = executor::spawn(future);
+
+ Task {
+ state: AtomicUsize::new(State::stub().into()),
+ blocking: AtomicUsize::new(BlockingState::new().into()),
+ next_blocking: AtomicPtr::new(ptr::null_mut()),
+ reg_worker: Cell::new(None),
+ reg_index: Cell::new(0),
+ future: UnsafeCell::new(Some(task_fut)),
+ }
+ }
+
+ /// Execute the task returning `Run::Schedule` if the task needs to be
+ /// scheduled again.
+ pub fn run(&self, unpark: &Arc<Notifier>) -> Run {
+ use self::State::*;
+
+ // Transition task to running state. At this point, the task must be
+ // scheduled.
+ let actual: State = self
+ .state
+ .compare_and_swap(Scheduled.into(), Running.into(), AcqRel)
+ .into();
+
+ match actual {
+ Scheduled => {}
+ _ => panic!("unexpected task state; {:?}", actual),
+ }
+
+ trace!(
+ "Task::run; state={:?}",
+ State::from(self.state.load(Relaxed))
+ );
+
+ // The transition to `Running` done above ensures that a lock on the
+ // future has been obtained.
+ let fut = unsafe { &mut (*self.future.get()) };
+
+ // This block deals with the future panicking while being polled.
+ //
+ // If the future panics, then the drop handler must be called such that
+ // `thread::panicking() -> true`. To do this, the future is dropped from
+ // within the catch_unwind block.
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ struct Guard<'a>(&'a mut Option<Spawn<BoxFuture>>, bool);
+
+ impl<'a> Drop for Guard<'a> {
+ fn drop(&mut self) {
+ // This drops the future
+ if self.1 {
+ let _ = self.0.take();
+ }
+ }
+ }
+
+ let mut g = Guard(fut, true);
+
+ let ret =
+ g.0.as_mut()
+ .unwrap()
+ .poll_future_notify(unpark, self as *const _ as usize);
+
+ g.1 = false;
+
+ ret
+ }));
+
+ match res {
+ Ok(Ok(Async::Ready(_))) | Ok(Err(_)) | Err(_) => {
+ trace!(" -> task complete");
+
+ // The future has completed. Drop it immediately to free
+ // resources and run drop handlers.
+ //
+ // The `Task` harness will stay around longer if it is contained
+ // by any of the various queues.
+ self.drop_future();
+
+ // Transition to the completed state
+ self.state.store(State::Complete.into(), Release);
+
+ if let Err(panic_err) = res {
+ if let Some(ref f) = unpark.pool.config.panic_handler {
+ f(panic_err);
+ }
+ }
+
+ Run::Complete
+ }
+ Ok(Ok(Async::NotReady)) => {
+ trace!(" -> not ready");
+
+ // Attempt to transition from Running -> Idle, if successful,
+ // then the task does not need to be scheduled again. If the CAS
+ // fails, then the task has been unparked concurrent to running,
+ // in which case it transitions immediately back to scheduled
+ // and we return `true`.
+ let prev: State = self
+ .state
+ .compare_and_swap(Running.into(), Idle.into(), AcqRel)
+ .into();
+
+ match prev {
+ Running => Run::Idle,
+ Notified => {
+ self.state.store(Scheduled.into(), Release);
+ Run::Schedule
+ }
+ _ => unreachable!(),
+ }
+ }
+ }
+ }
+
+ /// Aborts this task.
+ ///
+ /// This is called when the threadpool shuts down and the task has already beed polled but not
+ /// completed.
+ pub fn abort(&self) {
+ use self::State::*;
+
+ let mut state = self.state.load(Acquire).into();
+
+ loop {
+ match state {
+ Idle | Scheduled => {}
+ Running | Notified | Complete | Aborted => {
+ // It is assumed that no worker threads are running so the task must be either
+ // in the idle or scheduled state.
+ panic!("unexpected state while aborting task: {:?}", state);
+ }
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), Aborted.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ // The future has been aborted. Drop it immediately to free resources and run drop
+ // handlers.
+ self.drop_future();
+ break;
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Notify the task
+ pub fn notify(me: Arc<Task>, pool: &Arc<Pool>) {
+ if me.schedule() {
+ let _ = pool.submit(me, pool);
+ }
+ }
+
+ /// Notify the task it has been allocated blocking capacity
+ pub fn notify_blocking(me: Arc<Task>, pool: &Arc<Pool>) {
+ BlockingState::notify_blocking(&me.blocking, AcqRel);
+ Task::notify(me, pool);
+ }
+
+ /// Transition the task state to scheduled.
+ ///
+ /// Returns `true` if the caller is permitted to schedule the task.
+ pub fn schedule(&self) -> bool {
+ use self::State::*;
+
+ loop {
+ // Scheduling can only be done from the `Idle` state.
+ let actual = self
+ .state
+ .compare_and_swap(Idle.into(), Scheduled.into(), AcqRel)
+ .into();
+
+ match actual {
+ Idle => return true,
+ Running => {
+ // The task is already running on another thread. Transition
+ // the state to `Notified`. If this CAS fails, then restart
+ // the logic again from `Idle`.
+ let actual = self
+ .state
+ .compare_and_swap(Running.into(), Notified.into(), AcqRel)
+ .into();
+
+ match actual {
+ Idle => continue,
+ _ => return false,
+ }
+ }
+ Complete | Aborted | Notified | Scheduled => return false,
+ }
+ }
+ }
+
+ /// Consumes any allocated capacity to block.
+ ///
+ /// Returns `true` if capacity was allocated, `false` otherwise.
+ pub fn consume_blocking_allocation(&self) -> CanBlock {
+ // This flag is the primary point of coordination. The queued flag
+ // happens "around" setting the blocking capacity.
+ BlockingState::consume_allocation(&self.blocking, AcqRel)
+ }
+
+ /// Drop the future
+ ///
+ /// This must only be called by the thread that successfully transitioned
+ /// the future state to `Running`.
+ fn drop_future(&self) {
+ let _ = unsafe { (*self.future.get()).take() };
+ }
+}
+
+impl fmt::Debug for Task {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Task")
+ .field("state", &self.state)
+ .field("future", &"Spawn<BoxFuture>")
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/task/state.rs b/third_party/rust/tokio-threadpool/src/task/state.rs
new file mode 100644
index 0000000000..3e00f89bc5
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/state.rs
@@ -0,0 +1,57 @@
+#[repr(usize)]
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) enum State {
+ /// Task is currently idle
+ Idle = 0,
+
+ /// Task is currently running
+ Running = 1,
+
+ /// Task is currently running, but has been notified that it must run again.
+ Notified = 2,
+
+ /// Task has been scheduled
+ Scheduled = 3,
+
+ /// Task is complete
+ Complete = 4,
+
+ /// Task was aborted because the thread pool has been shut down
+ Aborted = 5,
+}
+
+// ===== impl State =====
+
+impl State {
+ /// Returns the initial task state.
+ ///
+ /// Tasks start in the scheduled state as they are immediately scheduled on
+ /// creation.
+ pub fn new() -> State {
+ State::Scheduled
+ }
+
+ pub fn stub() -> State {
+ State::Idle
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ use self::State::*;
+
+ debug_assert!(
+ src >= Idle as usize && src <= Aborted as usize,
+ "actual={}",
+ src
+ );
+
+ unsafe { ::std::mem::transmute(src) }
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src as usize
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/thread_pool.rs b/third_party/rust/tokio-threadpool/src/thread_pool.rs
new file mode 100644
index 0000000000..30f58e96ad
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/thread_pool.rs
@@ -0,0 +1,217 @@
+use builder::Builder;
+use pool::Pool;
+use sender::Sender;
+use shutdown::{Shutdown, ShutdownTrigger};
+
+use futures::sync::oneshot;
+use futures::{Future, Poll};
+
+use std::sync::Arc;
+
+/// Work-stealing based thread pool for executing futures.
+///
+/// If a `ThreadPool` instance is dropped without explicitly being shutdown,
+/// `shutdown_now` is called implicitly, forcing all tasks that have not yet
+/// completed to be dropped.
+///
+/// Create `ThreadPool` instances using `Builder`.
+#[derive(Debug)]
+pub struct ThreadPool {
+ inner: Option<Inner>,
+}
+
+#[derive(Debug)]
+struct Inner {
+ sender: Sender,
+ trigger: Arc<ShutdownTrigger>,
+}
+
+impl ThreadPool {
+ /// Create a new `ThreadPool` with default values.
+ ///
+ /// Use [`Builder`] for creating a configured thread pool.
+ ///
+ /// [`Builder`]: struct.Builder.html
+ pub fn new() -> ThreadPool {
+ Builder::new().build()
+ }
+
+ pub(crate) fn new2(pool: Arc<Pool>, trigger: Arc<ShutdownTrigger>) -> ThreadPool {
+ ThreadPool {
+ inner: Some(Inner {
+ sender: Sender { pool },
+ trigger,
+ }),
+ }
+ }
+
+ /// Spawn a future onto the thread pool.
+ ///
+ /// This function takes ownership of the future and randomly assigns it to a
+ /// worker thread. The thread will then start executing the future.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::ThreadPool;
+ /// use futures::future::{Future, lazy};
+ ///
+ /// # pub fn main() {
+ /// // Create a thread pool with default configuration values
+ /// let thread_pool = ThreadPool::new();
+ ///
+ /// thread_pool.spawn(lazy(|| {
+ /// println!("called from a worker thread");
+ /// Ok(())
+ /// }));
+ ///
+ /// // Gracefully shutdown the threadpool
+ /// thread_pool.shutdown().wait().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the spawn fails. Use [`Sender::spawn`] for a
+ /// version that returns a `Result` instead of panicking.
+ pub fn spawn<F>(&self, future: F)
+ where
+ F: Future<Item = (), Error = ()> + Send + 'static,
+ {
+ self.sender().spawn(future).unwrap();
+ }
+
+ /// Spawn a future on to the thread pool, return a future representing
+ /// the produced value.
+ ///
+ /// The SpawnHandle returned is a future that is a proxy for future itself.
+ /// When future completes on this thread pool then the SpawnHandle will itself
+ /// be resolved.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # extern crate tokio_threadpool;
+ /// # extern crate futures;
+ /// # use tokio_threadpool::ThreadPool;
+ /// use futures::future::{Future, lazy};
+ ///
+ /// # pub fn main() {
+ /// // Create a thread pool with default configuration values
+ /// let thread_pool = ThreadPool::new();
+ ///
+ /// let handle = thread_pool.spawn_handle(lazy(|| Ok::<_, ()>(42)));
+ ///
+ /// let value = handle.wait().unwrap();
+ /// assert_eq!(value, 42);
+ ///
+ /// // Gracefully shutdown the threadpool
+ /// thread_pool.shutdown().wait().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the spawn fails.
+ pub fn spawn_handle<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error>
+ where
+ F: Future + Send + 'static,
+ F::Item: Send + 'static,
+ F::Error: Send + 'static,
+ {
+ SpawnHandle(oneshot::spawn(future, self.sender()))
+ }
+
+ /// Return a reference to the sender handle
+ ///
+ /// The handle is used to spawn futures onto the thread pool. It also
+ /// implements the `Executor` trait.
+ pub fn sender(&self) -> &Sender {
+ &self.inner.as_ref().unwrap().sender
+ }
+
+ /// Return a mutable reference to the sender handle
+ pub fn sender_mut(&mut self) -> &mut Sender {
+ &mut self.inner.as_mut().unwrap().sender
+ }
+
+ /// Shutdown the pool once it becomes idle.
+ ///
+ /// Idle is defined as the completion of all futures that have been spawned
+ /// onto the thread pool. There may still be outstanding handles when the
+ /// thread pool reaches an idle state.
+ ///
+ /// Once the idle state is reached, calling `spawn` on any outstanding
+ /// handle will result in an error. All worker threads are signaled and will
+ /// shutdown. The returned future completes once all worker threads have
+ /// completed the shutdown process.
+ pub fn shutdown_on_idle(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ inner.sender.pool.shutdown(false, false);
+ Shutdown::new(&inner.trigger)
+ }
+
+ /// Shutdown the pool
+ ///
+ /// This prevents the thread pool from accepting new tasks but will allow
+ /// any existing tasks to complete.
+ ///
+ /// Calling `spawn` on any outstanding handle will result in an error. All
+ /// worker threads are signaled and will shutdown. The returned future
+ /// completes once all worker threads have completed the shutdown process.
+ pub fn shutdown(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ inner.sender.pool.shutdown(true, false);
+ Shutdown::new(&inner.trigger)
+ }
+
+ /// Shutdown the pool immediately
+ ///
+ /// This will prevent the thread pool from accepting new tasks **and**
+ /// abort any tasks that are currently running on the thread pool.
+ ///
+ /// Calling `spawn` on any outstanding handle will result in an error. All
+ /// worker threads are signaled and will shutdown. The returned future
+ /// completes once all worker threads have completed the shutdown process.
+ pub fn shutdown_now(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ inner.sender.pool.shutdown(true, true);
+ Shutdown::new(&inner.trigger)
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.take() {
+ // Begin the shutdown process.
+ inner.sender.pool.shutdown(true, true);
+ let shutdown = Shutdown::new(&inner.trigger);
+
+ // Drop `inner` in order to drop its shutdown trigger.
+ drop(inner);
+
+ // Wait until all worker threads terminate and the threadpool's resources clean up.
+ let _ = shutdown.wait();
+ }
+ }
+}
+
+/// Handle returned from ThreadPool::spawn_handle.
+///
+/// This handle is a future representing the completion of a different future
+/// spawned on to the thread pool. Created through the ThreadPool::spawn_handle
+/// function this handle will resolve when the future provided resolves on the
+/// thread pool.
+#[derive(Debug)]
+pub struct SpawnHandle<T, E>(oneshot::SpawnHandle<T, E>);
+
+impl<T, E> Future for SpawnHandle<T, E> {
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<T, E> {
+ self.0.poll()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/worker/entry.rs b/third_party/rust/tokio-threadpool/src/worker/entry.rs
new file mode 100644
index 0000000000..0dcf5108b8
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/entry.rs
@@ -0,0 +1,330 @@
+use park::{BoxPark, BoxUnpark};
+use task::Task;
+use worker::state::{State, PUSHED_MASK};
+
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+
+use crossbeam_deque::{Steal, Stealer, Worker};
+use crossbeam_queue::SegQueue;
+use crossbeam_utils::CachePadded;
+use slab::Slab;
+
+// TODO: None of the fields should be public
+//
+// It would also be helpful to split up the state across what fields /
+// operations are thread-safe vs. which ones require ownership of the worker.
+pub(crate) struct WorkerEntry {
+ // Worker state. This is mutated when notifying the worker.
+ //
+ // The `usize` value is deserialized to a `worker::State` instance. See
+ // comments on that type.
+ pub state: CachePadded<AtomicUsize>,
+
+ // Next entry in the parked Trieber stack
+ next_sleeper: UnsafeCell<usize>,
+
+ // Worker half of deque
+ pub worker: Worker<Arc<Task>>,
+
+ // Stealer half of deque
+ stealer: Stealer<Arc<Task>>,
+
+ // Thread parker
+ park: UnsafeCell<Option<BoxPark>>,
+
+ // Thread unparker
+ unpark: UnsafeCell<Option<BoxUnpark>>,
+
+ // Tasks that have been first polled by this worker, but not completed yet.
+ running_tasks: UnsafeCell<Slab<Arc<Task>>>,
+
+ // Tasks that have been first polled by this worker, but completed by another worker.
+ remotely_completed_tasks: SegQueue<Arc<Task>>,
+
+ // Set to `true` when `remotely_completed_tasks` has tasks that need to be removed from
+ // `running_tasks`.
+ needs_drain: AtomicBool,
+}
+
+impl WorkerEntry {
+ pub fn new(park: BoxPark, unpark: BoxUnpark) -> Self {
+ let w = Worker::new_fifo();
+ let s = w.stealer();
+
+ WorkerEntry {
+ state: CachePadded::new(AtomicUsize::new(State::default().into())),
+ next_sleeper: UnsafeCell::new(0),
+ worker: w,
+ stealer: s,
+ park: UnsafeCell::new(Some(park)),
+ unpark: UnsafeCell::new(Some(unpark)),
+ running_tasks: UnsafeCell::new(Slab::new()),
+ remotely_completed_tasks: SegQueue::new(),
+ needs_drain: AtomicBool::new(false),
+ }
+ }
+
+ /// Atomically unset the pushed flag.
+ ///
+ /// # Return
+ ///
+ /// The state *before* the push flag is unset.
+ ///
+ /// # Ordering
+ ///
+ /// The specified ordering is established on the entry's state variable.
+ pub fn fetch_unset_pushed(&self, ordering: Ordering) -> State {
+ self.state.fetch_and(!PUSHED_MASK, ordering).into()
+ }
+
+ /// Submit a task to this worker while currently on the same thread that is
+ /// running the worker.
+ #[inline]
+ pub fn submit_internal(&self, task: Arc<Task>) {
+ self.push_internal(task);
+ }
+
+ /// Notifies the worker and returns `false` if it needs to be spawned.
+ ///
+ /// # Ordering
+ ///
+ /// The `state` must have been obtained with an `Acquire` ordering.
+ #[inline]
+ pub fn notify(&self, mut state: State) -> bool {
+ use worker::Lifecycle::*;
+
+ loop {
+ let mut next = state;
+ next.notify();
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ break;
+ }
+
+ state = actual;
+ }
+
+ match state.lifecycle() {
+ Sleeping => {
+ // The worker is currently sleeping, the condition variable must
+ // be signaled
+ self.unpark();
+ true
+ }
+ Shutdown => false,
+ Running | Notified | Signaled => {
+ // In these states, the worker is active and will eventually see
+ // the task that was just submitted.
+ true
+ }
+ }
+ }
+
+ /// Signals to the worker that it should stop
+ ///
+ /// `state` is the last observed state for the worker. This allows skipping
+ /// the initial load from the state atomic.
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` when the worker was successfully signaled.
+ ///
+ /// Returns `Err` if the worker has already terminated.
+ pub fn signal_stop(&self, mut state: State) {
+ use worker::Lifecycle::*;
+
+ // Transition the worker state to signaled
+ loop {
+ let mut next = state;
+
+ match state.lifecycle() {
+ Shutdown => {
+ return;
+ }
+ Running | Sleeping => {}
+ Notified | Signaled => {
+ // These two states imply that the worker is active, thus it
+ // will eventually see the shutdown signal, so we don't need
+ // to do anything.
+ //
+ // The worker is forced to see the shutdown signal
+ // eventually as:
+ //
+ // a) No more work will arrive
+ // b) The shutdown signal is stored as the head of the
+ // sleep, stack which will prevent the worker from going to
+ // sleep again.
+ return;
+ }
+ }
+
+ next.set_lifecycle(Signaled);
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ break;
+ }
+
+ state = actual;
+ }
+
+ // Wakeup the worker
+ self.unpark();
+ }
+
+ /// Pop a task
+ ///
+ /// This **must** only be called by the thread that owns the worker entry.
+ /// This function is not `Sync`.
+ #[inline]
+ pub fn pop_task(&self) -> Option<Arc<Task>> {
+ self.worker.pop()
+ }
+
+ /// Steal tasks
+ ///
+ /// This is called by *other* workers to steal a task for processing. This
+ /// function is `Sync`.
+ ///
+ /// At the same time, this method steals some additional tasks and moves
+ /// them into `dest` in order to balance the work distribution among
+ /// workers.
+ pub fn steal_tasks(&self, dest: &Self) -> Steal<Arc<Task>> {
+ self.stealer.steal_batch_and_pop(&dest.worker)
+ }
+
+ /// Drain (and drop) all tasks that are queued for work.
+ ///
+ /// This is called when the pool is shutting down.
+ pub fn drain_tasks(&self) {
+ while self.worker.pop().is_some() {}
+ }
+
+ /// Parks the worker thread.
+ pub fn park(&self) {
+ if let Some(park) = unsafe { (*self.park.get()).as_mut() } {
+ park.park().unwrap();
+ }
+ }
+
+ /// Parks the worker thread for at most `duration`.
+ pub fn park_timeout(&self, duration: Duration) {
+ if let Some(park) = unsafe { (*self.park.get()).as_mut() } {
+ park.park_timeout(duration).unwrap();
+ }
+ }
+
+ /// Unparks the worker thread.
+ #[inline]
+ pub fn unpark(&self) {
+ if let Some(park) = unsafe { (*self.unpark.get()).as_ref() } {
+ park.unpark();
+ }
+ }
+
+ /// Registers a task in this worker.
+ ///
+ /// Called when the task is being polled for the first time.
+ #[inline]
+ pub fn register_task(&self, task: &Arc<Task>) {
+ let running_tasks = unsafe { &mut *self.running_tasks.get() };
+
+ let key = running_tasks.insert(task.clone());
+ task.reg_index.set(key);
+ }
+
+ /// Unregisters a task from this worker.
+ ///
+ /// Called when the task is completed and was previously registered in this worker.
+ #[inline]
+ pub fn unregister_task(&self, task: Arc<Task>) {
+ let running_tasks = unsafe { &mut *self.running_tasks.get() };
+ running_tasks.remove(task.reg_index.get());
+ self.drain_remotely_completed_tasks();
+ }
+
+ /// Unregisters a task from this worker.
+ ///
+ /// Called when the task is completed by another worker and was previously registered in this
+ /// worker.
+ #[inline]
+ pub fn remotely_complete_task(&self, task: Arc<Task>) {
+ self.remotely_completed_tasks.push(task);
+ self.needs_drain.store(true, Release);
+ }
+
+ /// Drops the remaining incomplete tasks and the parker associated with this worker.
+ ///
+ /// This function is called by the shutdown trigger.
+ pub fn shutdown(&self) {
+ self.drain_remotely_completed_tasks();
+
+ // Abort all incomplete tasks.
+ let running_tasks = unsafe { &mut *self.running_tasks.get() };
+ for (_, task) in running_tasks.iter() {
+ task.abort();
+ }
+ running_tasks.clear();
+
+ unsafe {
+ *self.park.get() = None;
+ *self.unpark.get() = None;
+ }
+ }
+
+ /// Drains the `remotely_completed_tasks` queue and removes tasks from `running_tasks`.
+ #[inline]
+ fn drain_remotely_completed_tasks(&self) {
+ if self.needs_drain.compare_and_swap(true, false, Acquire) {
+ let running_tasks = unsafe { &mut *self.running_tasks.get() };
+
+ while let Ok(task) = self.remotely_completed_tasks.pop() {
+ running_tasks.remove(task.reg_index.get());
+ }
+ }
+ }
+
+ #[inline]
+ pub fn push_internal(&self, task: Arc<Task>) {
+ self.worker.push(task);
+ }
+
+ #[inline]
+ pub fn next_sleeper(&self) -> usize {
+ unsafe { *self.next_sleeper.get() }
+ }
+
+ #[inline]
+ pub fn set_next_sleeper(&self, val: usize) {
+ unsafe {
+ *self.next_sleeper.get() = val;
+ }
+ }
+}
+
+impl fmt::Debug for WorkerEntry {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("WorkerEntry")
+ .field("state", &self.state.load(Relaxed))
+ .field("next_sleeper", &"UnsafeCell<usize>")
+ .field("worker", &self.worker)
+ .field("stealer", &self.stealer)
+ .field("park", &"UnsafeCell<BoxPark>")
+ .field("unpark", &"BoxUnpark")
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/worker/mod.rs b/third_party/rust/tokio-threadpool/src/worker/mod.rs
new file mode 100644
index 0000000000..d380c5d561
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/mod.rs
@@ -0,0 +1,797 @@
+mod entry;
+mod stack;
+mod state;
+
+pub(crate) use self::entry::WorkerEntry as Entry;
+pub(crate) use self::stack::Stack;
+pub(crate) use self::state::{Lifecycle, State};
+
+use notifier::Notifier;
+use pool::{self, BackupId, Pool};
+use sender::Sender;
+use shutdown::ShutdownTrigger;
+use task::{self, CanBlock, Task};
+
+use tokio_executor;
+
+use futures::{Async, Poll};
+
+use std::cell::Cell;
+use std::marker::PhantomData;
+use std::rc::Rc;
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+/// Thread worker
+///
+/// This is passed to the [`around_worker`] callback set on [`Builder`]. This
+/// callback is only expected to call [`run`] on it.
+///
+/// [`Builder`]: struct.Builder.html
+/// [`around_worker`]: struct.Builder.html#method.around_worker
+/// [`run`]: struct.Worker.html#method.run
+#[derive(Debug)]
+pub struct Worker {
+ // Shared scheduler data
+ pub(crate) pool: Arc<Pool>,
+
+ // WorkerEntry index
+ pub(crate) id: WorkerId,
+
+ // Backup thread ID assigned to processing this worker.
+ backup_id: BackupId,
+
+ // Set to the task that is currently being polled by the worker. This is
+ // needed so that `blocking` blocks are able to interact with this task.
+ //
+ // This has to be a raw pointer to make it compile, but great care is taken
+ // when this is set.
+ current_task: CurrentTask,
+
+ // Set when the thread is in blocking mode.
+ is_blocking: Cell<bool>,
+
+ // Set when the worker should finalize on drop
+ should_finalize: Cell<bool>,
+
+ // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
+ trigger: Arc<ShutdownTrigger>,
+
+ // Keep the value on the current thread.
+ _p: PhantomData<Rc<()>>,
+}
+
+/// Tracks the state related to the currently running task.
+#[derive(Debug)]
+struct CurrentTask {
+ /// This has to be a raw pointer to make it compile, but great care is taken
+ /// when this is set.
+ task: Cell<Option<*const Arc<Task>>>,
+
+ /// Tracks the blocking capacity allocation state.
+ can_block: Cell<CanBlock>,
+}
+
+/// Identifies a thread pool worker.
+///
+/// This identifier is unique scoped by the thread pool. It is possible that
+/// different thread pool instances share worker identifier values.
+#[derive(Debug, Clone, Hash, Eq, PartialEq)]
+pub struct WorkerId(pub(crate) usize);
+
+// Pointer to the current worker info
+thread_local!(static CURRENT_WORKER: Cell<*const Worker> = Cell::new(0 as *const _));
+
+impl Worker {
+ pub(crate) fn new(
+ id: WorkerId,
+ backup_id: BackupId,
+ pool: Arc<Pool>,
+ trigger: Arc<ShutdownTrigger>,
+ ) -> Worker {
+ Worker {
+ pool,
+ id,
+ backup_id,
+ current_task: CurrentTask::new(),
+ is_blocking: Cell::new(false),
+ should_finalize: Cell::new(false),
+ trigger,
+ _p: PhantomData,
+ }
+ }
+
+ pub(crate) fn is_blocking(&self) -> bool {
+ self.is_blocking.get()
+ }
+
+ /// Run the worker
+ ///
+ /// Returns `true` if the thread should keep running as a `backup` thread.
+ pub(crate) fn do_run(&self) -> bool {
+ // Create another worker... It's ok, this is just a new type around
+ // `Pool` that is expected to stay on the current thread.
+ CURRENT_WORKER.with(|c| {
+ c.set(self as *const _);
+
+ let pool = self.pool.clone();
+ let mut sender = Sender { pool };
+
+ // Enter an execution context
+ let mut enter = tokio_executor::enter().unwrap();
+
+ tokio_executor::with_default(&mut sender, &mut enter, |enter| {
+ if let Some(ref callback) = self.pool.config.around_worker {
+ callback.call(self, enter);
+ } else {
+ self.run();
+ }
+ });
+ });
+
+ // Can't be in blocking mode and finalization mode
+ debug_assert!(!self.is_blocking.get() || !self.should_finalize.get());
+
+ self.is_blocking.get()
+ }
+
+ pub(crate) fn with_current<F: FnOnce(Option<&Worker>) -> R, R>(f: F) -> R {
+ CURRENT_WORKER.with(move |c| {
+ let ptr = c.get();
+
+ if ptr.is_null() {
+ f(None)
+ } else {
+ f(Some(unsafe { &*ptr }))
+ }
+ })
+ }
+
+ /// Transition the current worker to a blocking worker
+ pub(crate) fn transition_to_blocking(&self) -> Poll<(), ::BlockingError> {
+ use self::CanBlock::*;
+
+ // If we get this far, then `current_task` has been set.
+ let task_ref = self.current_task.get_ref();
+
+ // First step is to acquire blocking capacity for the task.
+ match self.current_task.can_block() {
+ // Capacity to block has already been allocated to this task.
+ Allocated => {}
+
+ // The task has already requested capacity to block, but there is
+ // none yet available.
+ NoCapacity => return Ok(Async::NotReady),
+
+ // The task has yet to ask for capacity
+ CanRequest => {
+ // Atomically attempt to acquire blocking capacity, and if none
+ // is available, register the task to be notified once capacity
+ // becomes available.
+ match self.pool.poll_blocking_capacity(task_ref)? {
+ Async::Ready(()) => {
+ self.current_task.set_can_block(Allocated);
+ }
+ Async::NotReady => {
+ self.current_task.set_can_block(NoCapacity);
+ return Ok(Async::NotReady);
+ }
+ }
+ }
+ }
+
+ // The task has been allocated blocking capacity. At this point, this is
+ // when the current thread transitions from a worker to a backup thread.
+ // To do so requires handing over the worker to another backup thread.
+
+ if self.is_blocking.get() {
+ // The thread is already in blocking mode, so there is nothing else
+ // to do. Return `Ready` and allow the caller to block the thread.
+ return Ok(().into());
+ }
+
+ trace!("transition to blocking state");
+
+ // Transitioning to blocking requires handing over the worker state to
+ // another thread so that the work queue can continue to be processed.
+
+ self.pool.spawn_thread(self.id.clone(), &self.pool);
+
+ // Track that the thread has now fully entered the blocking state.
+ self.is_blocking.set(true);
+
+ Ok(().into())
+ }
+
+ /// Transition from blocking
+ pub(crate) fn transition_from_blocking(&self) {
+ // TODO: Attempt to take ownership of the worker again.
+ }
+
+ /// Returns a reference to the worker's identifier.
+ ///
+ /// This identifier is unique scoped by the thread pool. It is possible that
+ /// different thread pool instances share worker identifier values.
+ pub fn id(&self) -> &WorkerId {
+ &self.id
+ }
+
+ /// Run the worker
+ ///
+ /// This function blocks until the worker is shutting down.
+ pub fn run(&self) {
+ const MAX_SPINS: usize = 3;
+ const LIGHT_SLEEP_INTERVAL: usize = 32;
+
+ // Get the notifier.
+ let notify = Arc::new(Notifier {
+ pool: self.pool.clone(),
+ });
+
+ let mut first = true;
+ let mut spin_cnt = 0;
+ let mut tick = 0;
+
+ while self.check_run_state(first) {
+ first = false;
+
+ // Run the next available task
+ if self.try_run_task(&notify) {
+ if self.is_blocking.get() {
+ // Exit out of the run state
+ return;
+ }
+
+ // Poll the reactor and the global queue every now and then to
+ // ensure no task gets left behind.
+ if tick % LIGHT_SLEEP_INTERVAL == 0 {
+ self.sleep_light();
+ }
+
+ tick = tick.wrapping_add(1);
+ spin_cnt = 0;
+
+ // As long as there is work, keep looping.
+ continue;
+ }
+
+ spin_cnt += 1;
+
+ // Yield the thread several times before it actually goes to sleep.
+ if spin_cnt <= MAX_SPINS {
+ thread::yield_now();
+ continue;
+ }
+
+ tick = 0;
+ spin_cnt = 0;
+
+ // Starting to get sleeeeepy
+ if !self.sleep() {
+ return;
+ }
+
+ // If there still isn't any work to do, shutdown the worker?
+ }
+
+ // The pool is terminating. However, transitioning the pool state to
+ // terminated is the very first step of the finalization process. Other
+ // threads may not see this state and try to spawn a new thread. To
+ // ensure consistency, before the current thread shuts down, it must
+ // return the backup token to the stack.
+ //
+ // The returned result is ignored because `Err` represents the pool
+ // shutting down. We are currently aware of this fact.
+ let _ = self.pool.release_backup(self.backup_id);
+
+ self.should_finalize.set(true);
+ }
+
+ /// Try to run a task
+ ///
+ /// Returns `true` if work was found.
+ #[inline]
+ fn try_run_task(&self, notify: &Arc<Notifier>) -> bool {
+ if self.try_run_owned_task(notify) {
+ return true;
+ }
+
+ self.try_steal_task(notify)
+ }
+
+ /// Checks the worker's current state, updating it as needed.
+ ///
+ /// Returns `true` if the worker should run.
+ #[inline]
+ fn check_run_state(&self, first: bool) -> bool {
+ use self::Lifecycle::*;
+
+ debug_assert!(!self.is_blocking.get());
+
+ let mut state: State = self.entry().state.load(Acquire).into();
+
+ loop {
+ let pool_state: pool::State = self.pool.state.load(Acquire).into();
+
+ if pool_state.is_terminated() {
+ return false;
+ }
+
+ let mut next = state;
+
+ match state.lifecycle() {
+ Running => break,
+ Notified | Signaled => {
+ // transition back to running
+ next.set_lifecycle(Running);
+ }
+ Shutdown | Sleeping => {
+ // The worker should never be in these states when calling
+ // this function.
+ panic!("unexpected worker state; lifecycle={:?}", state.lifecycle());
+ }
+ }
+
+ let actual = self
+ .entry()
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ break;
+ }
+
+ state = actual;
+ }
+
+ // `first` is set to true the first time this function is called after
+ // the thread has started.
+ //
+ // This check is to handle the scenario where a worker gets signaled
+ // while it is already happily running. The `is_signaled` state is
+ // intended to wake up a worker that has been previously sleeping in
+ // effect increasing the number of active workers. If this is the first
+ // time `check_run_state` is called, then being in a signalled state is
+ // normal and the thread was started to handle it. However, if this is
+ // **not** the first time the fn was called, then the number of active
+ // workers has not been increased by the signal, so `signal_work` has to
+ // be called again to try to wake up another worker.
+ //
+ // For example, if the thread pool is configured to allow 4 workers.
+ // Worker 1 is processing tasks from its `deque`. Worker 2 receives its
+ // first task. Worker 2 will pick a random worker to signal. It does
+ // this by popping off the sleep stack, but there is no guarantee that
+ // workers on the sleep stack are actually sleeping. It is possible that
+ // Worker 1 gets signaled.
+ //
+ // Without this check, in the above case, no additional workers will get
+ // started, which results in the thread pool permanently being at 2
+ // workers even though it should reach 4.
+ if !first && state.is_signaled() {
+ trace!("Worker::check_run_state; delegate signal");
+ // This worker is not ready to be signaled, so delegate the signal
+ // to another worker.
+ self.pool.signal_work(&self.pool);
+ }
+
+ true
+ }
+
+ /// Runs the next task on this worker's queue.
+ ///
+ /// Returns `true` if work was found.
+ fn try_run_owned_task(&self, notify: &Arc<Notifier>) -> bool {
+ // Poll the internal queue for a task to run
+ match self.entry().pop_task() {
+ Some(task) => {
+ self.run_task(task, notify);
+ true
+ }
+ None => false,
+ }
+ }
+
+ /// Tries to steal a task from another worker.
+ ///
+ /// Returns `true` if work was found
+ fn try_steal_task(&self, notify: &Arc<Notifier>) -> bool {
+ use crossbeam_deque::Steal;
+
+ debug_assert!(!self.is_blocking.get());
+
+ let len = self.pool.workers.len();
+ let mut idx = self.pool.rand_usize() % len;
+ let mut found_work = false;
+ let start = idx;
+
+ loop {
+ if idx < len {
+ match self.pool.workers[idx].steal_tasks(self.entry()) {
+ Steal::Success(task) => {
+ trace!("stole task from another worker");
+
+ self.run_task(task, notify);
+
+ trace!(
+ "try_steal_task -- signal_work; self={}; from={}",
+ self.id.0,
+ idx
+ );
+
+ // Signal other workers that work is available
+ //
+ // TODO: Should this be called here or before
+ // `run_task`?
+ self.pool.signal_work(&self.pool);
+
+ return true;
+ }
+ Steal::Empty => {}
+ Steal::Retry => found_work = true,
+ }
+
+ idx += 1;
+ } else {
+ idx = 0;
+ }
+
+ if idx == start {
+ break;
+ }
+ }
+
+ found_work
+ }
+
+ fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>) {
+ use task::Run::*;
+
+ // If this is the first time this task is being polled, register it so that we can keep
+ // track of tasks that are in progress.
+ if task.reg_worker.get().is_none() {
+ task.reg_worker.set(Some(self.id.0 as u32));
+ self.entry().register_task(&task);
+ }
+
+ let run = self.run_task2(&task, notify);
+
+ // TODO: Try to claim back the worker state in case the backup thread
+ // did not start up fast enough. This is a performance optimization.
+
+ match run {
+ Idle => {}
+ Schedule => {
+ if self.is_blocking.get() {
+ // The future has been notified while it was running.
+ // However, the future also entered a blocking section,
+ // which released the worker state from this thread.
+ //
+ // This means that scheduling the future must be done from
+ // a point of view external to the worker set.
+ //
+ // We have to call `submit_external` instead of `submit`
+ // here because `self` is still set as the current worker.
+ self.pool.submit_external(task, &self.pool);
+ } else {
+ self.entry().push_internal(task);
+ }
+ }
+ Complete => {
+ let mut state: pool::State = self.pool.state.load(Acquire).into();
+
+ loop {
+ let mut next = state;
+ next.dec_num_futures();
+
+ let actual = self
+ .pool
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ trace!("task complete; state={:?}", next);
+
+ if state.num_futures() == 1 {
+ // If the thread pool has been flagged as shutdown,
+ // start terminating workers. This involves waking
+ // up any sleeping worker so that they can notice
+ // the shutdown state.
+ if next.is_terminated() {
+ self.pool.terminate_sleeping_workers();
+ }
+ }
+
+ // Find which worker polled this task first.
+ let worker = task.reg_worker.get().unwrap() as usize;
+
+ // Unregister the task from the worker it was registered in.
+ if !self.is_blocking.get() && worker == self.id.0 {
+ self.entry().unregister_task(task);
+ } else {
+ self.pool.workers[worker].remotely_complete_task(task);
+ }
+
+ // The worker's run loop will detect the shutdown state
+ // next iteration.
+ return;
+ }
+
+ state = actual;
+ }
+ }
+ }
+ }
+
+ /// Actually run the task. This is where `Worker::current_task` is set.
+ ///
+ /// Great care is needed to ensure that `current_task` is unset in this
+ /// function.
+ fn run_task2(&self, task: &Arc<Task>, notify: &Arc<Notifier>) -> task::Run {
+ struct Guard<'a> {
+ worker: &'a Worker,
+ }
+
+ impl<'a> Drop for Guard<'a> {
+ fn drop(&mut self) {
+ // A task is allocated at run when it was explicitly notified
+ // that the task has capacity to block. When this happens, that
+ // capacity is automatically allocated to the notified task.
+ // This capacity is "use it or lose it", so if the thread is not
+ // transitioned to blocking in this call, then another task has
+ // to be notified.
+ //
+ // If the task has consumed its blocking allocation but hasn't
+ // used it, it must be given to some other task instead.
+ if !self.worker.is_blocking.get() {
+ let can_block = self.worker.current_task.can_block();
+ if can_block == CanBlock::Allocated {
+ self.worker.pool.notify_blocking_task(&self.worker.pool);
+ }
+ }
+
+ self.worker.current_task.clear();
+ }
+ }
+
+ // Set `current_task`
+ self.current_task.set(task, CanBlock::CanRequest);
+
+ // Create the guard, this ensures that `current_task` is unset when the
+ // function returns, even if the return is caused by a panic.
+ let _g = Guard { worker: self };
+
+ task.run(notify)
+ }
+
+ /// Put the worker to sleep
+ ///
+ /// Returns `true` if woken up due to new work arriving.
+ fn sleep(&self) -> bool {
+ use self::Lifecycle::*;
+
+ // Putting a worker to sleep is a multipart operation. This is, in part,
+ // due to the fact that a worker can be notified without it being popped
+ // from the sleep stack. Extra care is needed to deal with this.
+
+ trace!("Worker::sleep; worker={:?}", self.id);
+
+ let mut state: State = self.entry().state.load(Acquire).into();
+
+ // The first part of the sleep process is to transition the worker state
+ // to "pushed". Now, it may be that the worker is already pushed on the
+ // sleeper stack, in which case, we don't push again.
+
+ loop {
+ let mut next = state;
+
+ match state.lifecycle() {
+ Running => {
+ // Try setting the pushed state
+ next.set_pushed();
+
+ // Transition the worker state to sleeping
+ next.set_lifecycle(Sleeping);
+ }
+ Notified | Signaled => {
+ // No need to sleep, transition back to running and move on.
+ next.set_lifecycle(Running);
+ }
+ Shutdown | Sleeping => {
+ // The worker cannot transition to sleep when already in a
+ // sleeping state.
+ panic!("unexpected worker state; actual={:?}", state.lifecycle());
+ }
+ }
+
+ let actual = self
+ .entry()
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ if state.is_notified() {
+ // The previous state was notified, so we don't need to
+ // sleep.
+ return true;
+ }
+
+ if !state.is_pushed() {
+ debug_assert!(next.is_pushed());
+
+ trace!(" sleeping -- push to stack; idx={}", self.id.0);
+
+ // We obtained permission to push the worker into the
+ // sleeper queue.
+ if let Err(_) = self.pool.push_sleeper(self.id.0) {
+ trace!(" sleeping -- push to stack failed; idx={}", self.id.0);
+ // The push failed due to the pool being terminated.
+ //
+ // This is true because the "work" being woken up for is
+ // shutting down.
+ return true;
+ }
+ }
+
+ break;
+ }
+
+ state = actual;
+ }
+
+ trace!(" -> starting to sleep; idx={}", self.id.0);
+
+ // Do a quick check to see if there are any notifications in the
+ // reactor or new tasks in the global queue. Since this call will
+ // clear the wakeup token, we need to check the state again and
+ // only after that go to sleep.
+ self.sleep_light();
+
+ // The state has been transitioned to sleeping, we can now wait by
+ // calling the parker. This is done in a loop as condvars can wakeup
+ // spuriously.
+ loop {
+ // Reload the state
+ state = self.entry().state.load(Acquire).into();
+
+ // If the worker has been notified, transition back to running.
+ match state.lifecycle() {
+ Sleeping => {
+ // Still sleeping. Park again.
+ }
+ Notified | Signaled => {
+ // Transition back to running
+ loop {
+ let mut next = state;
+ next.set_lifecycle(Running);
+
+ let actual = self
+ .entry()
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ return true;
+ }
+
+ state = actual;
+ }
+ }
+ Shutdown | Running => {
+ // To get here, the block above transitioned the state to
+ // `Sleeping`. No other thread can concurrently
+ // transition to `Shutdown` or `Running`.
+ unreachable!();
+ }
+ }
+
+ self.entry().park();
+
+ trace!(" -> wakeup; idx={}", self.id.0);
+ }
+ }
+
+ /// This doesn't actually put the thread to sleep. It calls
+ /// `park.park_timeout` with a duration of 0. This allows the park
+ /// implementation to perform any work that might be done on an interval.
+ ///
+ /// Returns `true` if this worker has tasks in its queue.
+ fn sleep_light(&self) {
+ self.entry().park_timeout(Duration::from_millis(0));
+
+ use crossbeam_deque::Steal;
+ loop {
+ match self.pool.queue.steal_batch(&self.entry().worker) {
+ Steal::Success(()) => {
+ self.pool.signal_work(&self.pool);
+ break;
+ }
+ Steal::Empty => break,
+ Steal::Retry => {}
+ }
+ }
+ }
+
+ fn entry(&self) -> &Entry {
+ debug_assert!(!self.is_blocking.get());
+ &self.pool.workers[self.id.0]
+ }
+}
+
+impl Drop for Worker {
+ fn drop(&mut self) {
+ trace!("shutting down thread; idx={}", self.id.0);
+
+ if self.should_finalize.get() {
+ // Drain the work queue
+ self.entry().drain_tasks();
+ }
+ }
+}
+
+// ===== impl CurrentTask =====
+
+impl CurrentTask {
+ /// Returns a default `CurrentTask` representing no task.
+ fn new() -> CurrentTask {
+ CurrentTask {
+ task: Cell::new(None),
+ can_block: Cell::new(CanBlock::CanRequest),
+ }
+ }
+
+ /// Returns a reference to the task.
+ fn get_ref(&self) -> &Arc<Task> {
+ unsafe { &*self.task.get().unwrap() }
+ }
+
+ fn can_block(&self) -> CanBlock {
+ use self::CanBlock::*;
+
+ match self.can_block.get() {
+ Allocated => Allocated,
+ CanRequest | NoCapacity => {
+ let can_block = self.get_ref().consume_blocking_allocation();
+ self.can_block.set(can_block);
+ can_block
+ }
+ }
+ }
+
+ fn set_can_block(&self, can_block: CanBlock) {
+ self.can_block.set(can_block);
+ }
+
+ fn set(&self, task: &Arc<Task>, can_block: CanBlock) {
+ self.task.set(Some(task as *const _));
+ self.can_block.set(can_block);
+ }
+
+ /// Reset the `CurrentTask` to null state.
+ fn clear(&self) {
+ self.task.set(None);
+ self.can_block.set(CanBlock::CanRequest);
+ }
+}
+
+// ===== impl WorkerId =====
+
+impl WorkerId {
+ /// Returns a `WorkerId` representing the worker entry at index `idx`.
+ pub(crate) fn new(idx: usize) -> WorkerId {
+ WorkerId(idx)
+ }
+
+ /// Returns this identifier represented as an integer.
+ ///
+ /// Worker identifiers in a single thread pool are guaranteed to correspond to integers in the
+ /// range `0..pool_size`.
+ pub fn to_usize(&self) -> usize {
+ self.0
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/worker/stack.rs b/third_party/rust/tokio-threadpool/src/worker/stack.rs
new file mode 100644
index 0000000000..d02c277fed
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/stack.rs
@@ -0,0 +1,260 @@
+use config::MAX_WORKERS;
+use worker;
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
+use std::{fmt, usize};
+
+/// Lock-free stack of sleeping workers.
+///
+/// This is implemented as a Treiber stack and references to nodes are
+/// `usize` values, indexing the entry in the `[worker::Entry]` array stored by
+/// `Pool`. Each `Entry` instance maintains a `pushed` bit in its state. This
+/// bit tracks if the entry is already pushed onto the stack or not. A single
+/// entry can only be stored on the stack a single time.
+///
+/// By using indexes instead of pointers, that allows a much greater amount of
+/// data to be used for the ABA guard (see correctness section of wikipedia
+/// page).
+///
+/// Treiber stack: https://en.wikipedia.org/wiki/Treiber_Stack
+#[derive(Debug)]
+pub(crate) struct Stack {
+ state: AtomicUsize,
+}
+
+/// State related to the stack of sleeping workers.
+///
+/// - Parked head 16 bits
+/// - Sequence remaining
+///
+/// The parked head value has a couple of special values:
+///
+/// - EMPTY: No sleepers
+/// - TERMINATED: Don't spawn more threads
+#[derive(Eq, PartialEq, Clone, Copy)]
+pub struct State(usize);
+
+/// Extracts the head of the worker stack from the scheduler state
+///
+/// The 16 relates to the value of MAX_WORKERS
+const STACK_MASK: usize = ((1 << 16) - 1);
+
+/// Used to mark the stack as empty
+pub(crate) const EMPTY: usize = MAX_WORKERS;
+
+/// Used to mark the stack as terminated
+pub(crate) const TERMINATED: usize = EMPTY + 1;
+
+/// How many bits the Treiber ABA guard is offset by
+const ABA_GUARD_SHIFT: usize = 16;
+
+#[cfg(target_pointer_width = "64")]
+const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
+
+#[cfg(target_pointer_width = "32")]
+const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
+
+// ===== impl Stack =====
+
+impl Stack {
+ /// Create a new `Stack` representing the empty state.
+ pub fn new() -> Stack {
+ let state = AtomicUsize::new(State::new().into());
+ Stack { state }
+ }
+
+ /// Push a worker onto the stack
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` on success.
+ ///
+ /// Returns `Err` if the pool has transitioned to the `TERMINATED` state.
+ /// When terminated, pushing new entries is no longer permitted.
+ pub fn push(&self, entries: &[worker::Entry], idx: usize) -> Result<(), ()> {
+ let mut state: State = self.state.load(Acquire).into();
+
+ debug_assert!(worker::State::from(entries[idx].state.load(Relaxed)).is_pushed());
+
+ loop {
+ let mut next = state;
+
+ let head = state.head();
+
+ if head == TERMINATED {
+ // The pool is terminated, cannot push the sleeper.
+ return Err(());
+ }
+
+ entries[idx].set_next_sleeper(head);
+ next.set_head(idx);
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ return Ok(());
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Pop a worker off the stack.
+ ///
+ /// If `terminate` is set and the stack is empty when this function is
+ /// called, the state of the stack is transitioned to "terminated". At this
+ /// point, no further workers can be pushed onto the stack.
+ ///
+ /// # Return
+ ///
+ /// Returns the index of the popped worker and the worker's observed state.
+ ///
+ /// `None` if the stack is empty.
+ pub fn pop(
+ &self,
+ entries: &[worker::Entry],
+ max_lifecycle: worker::Lifecycle,
+ terminate: bool,
+ ) -> Option<(usize, worker::State)> {
+ // Figure out the empty value
+ let terminal = match terminate {
+ true => TERMINATED,
+ false => EMPTY,
+ };
+
+ // If terminating, the max lifecycle *must* be `Signaled`, which is the
+ // highest lifecycle. By passing the greatest possible lifecycle value,
+ // no entries are skipped by this function.
+ //
+ // TODO: It would be better to terminate in a separate function that
+ // atomically takes all values and transitions to a terminated state.
+ debug_assert!(!terminate || max_lifecycle == worker::Lifecycle::Signaled);
+
+ let mut state: State = self.state.load(Acquire).into();
+
+ loop {
+ let head = state.head();
+
+ if head == EMPTY {
+ let mut next = state;
+ next.set_head(terminal);
+
+ if next == state {
+ debug_assert!(terminal == EMPTY);
+ return None;
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual != state {
+ state = actual;
+ continue;
+ }
+
+ return None;
+ } else if head == TERMINATED {
+ return None;
+ }
+
+ debug_assert!(head < MAX_WORKERS);
+
+ let mut next = state;
+
+ let next_head = entries[head].next_sleeper();
+
+ // TERMINATED can never be set as the "next pointer" on a worker.
+ debug_assert!(next_head != TERMINATED);
+
+ if next_head == EMPTY {
+ next.set_head(terminal);
+ } else {
+ next.set_head(next_head);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ // Release ordering is needed to ensure that unsetting the
+ // `pushed` flag happens after popping the sleeper from the
+ // stack.
+ //
+ // Acquire ordering is required to acquire any memory associated
+ // with transitioning the worker's lifecycle.
+ let state = entries[head].fetch_unset_pushed(AcqRel);
+
+ if state.lifecycle() >= max_lifecycle {
+ // If the worker has already been notified, then it is
+ // warming up to do more work. In this case, try to pop
+ // another thread that might be in a relaxed state.
+ continue;
+ }
+
+ return Some((head, state));
+ }
+
+ state = actual;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ #[inline]
+ fn new() -> State {
+ State(EMPTY)
+ }
+
+ #[inline]
+ fn head(&self) -> usize {
+ self.0 & STACK_MASK
+ }
+
+ #[inline]
+ fn set_head(&mut self, val: usize) {
+ // The ABA guard protects against the ABA problem w/ Treiber stacks
+ let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
+
+ self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ let head = self.head();
+
+ let mut fmt = fmt.debug_struct("stack::State");
+
+ if head < MAX_WORKERS {
+ fmt.field("head", &head);
+ } else if head == EMPTY {
+ fmt.field("head", &"EMPTY");
+ } else if head == TERMINATED {
+ fmt.field("head", &"TERMINATED");
+ }
+
+ fmt.finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/worker/state.rs b/third_party/rust/tokio-threadpool/src/worker/state.rs
new file mode 100644
index 0000000000..c388f6c99e
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/state.rs
@@ -0,0 +1,153 @@
+use std::fmt;
+
+/// Tracks worker state
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub(crate) struct State(usize);
+
+/// Set when the worker is pushed onto the scheduler's stack of sleeping
+/// threads.
+pub(crate) const PUSHED_MASK: usize = 0b001;
+
+/// Manages the worker lifecycle part of the state
+const LIFECYCLE_MASK: usize = 0b1110;
+const LIFECYCLE_SHIFT: usize = 1;
+
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
+#[repr(usize)]
+pub(crate) enum Lifecycle {
+ /// The worker does not currently have an associated thread.
+ Shutdown = 0 << LIFECYCLE_SHIFT,
+
+ /// The worker is doing work
+ Running = 1 << LIFECYCLE_SHIFT,
+
+ /// The worker is currently asleep in the condvar
+ Sleeping = 2 << LIFECYCLE_SHIFT,
+
+ /// The worker has been notified it should process more work.
+ Notified = 3 << LIFECYCLE_SHIFT,
+
+ /// A stronger form of notification. In this case, the worker is expected to
+ /// wakeup and try to acquire more work... if it enters this state while
+ /// already busy with other work, it is expected to signal another worker.
+ Signaled = 4 << LIFECYCLE_SHIFT,
+}
+
+impl State {
+ /// Returns true if the worker entry is pushed in the sleeper stack
+ pub fn is_pushed(&self) -> bool {
+ self.0 & PUSHED_MASK == PUSHED_MASK
+ }
+
+ pub fn set_pushed(&mut self) {
+ self.0 |= PUSHED_MASK
+ }
+
+ pub fn is_notified(&self) -> bool {
+ use self::Lifecycle::*;
+
+ match self.lifecycle() {
+ Notified | Signaled => true,
+ _ => false,
+ }
+ }
+
+ pub fn lifecycle(&self) -> Lifecycle {
+ Lifecycle::from(self.0 & LIFECYCLE_MASK)
+ }
+
+ pub fn set_lifecycle(&mut self, val: Lifecycle) {
+ self.0 = (self.0 & !LIFECYCLE_MASK) | (val as usize)
+ }
+
+ pub fn is_signaled(&self) -> bool {
+ self.lifecycle() == Lifecycle::Signaled
+ }
+
+ pub fn notify(&mut self) {
+ use self::Lifecycle::Signaled;
+
+ if self.lifecycle() != Signaled {
+ self.set_lifecycle(Signaled)
+ }
+ }
+}
+
+impl Default for State {
+ fn default() -> State {
+ // All workers will start pushed in the sleeping stack
+ State(PUSHED_MASK)
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("worker::State")
+ .field("lifecycle", &self.lifecycle())
+ .field("is_pushed", &self.is_pushed())
+ .finish()
+ }
+}
+
+// ===== impl Lifecycle =====
+
+impl From<usize> for Lifecycle {
+ fn from(src: usize) -> Lifecycle {
+ use self::Lifecycle::*;
+
+ debug_assert!(
+ src == Shutdown as usize
+ || src == Running as usize
+ || src == Sleeping as usize
+ || src == Notified as usize
+ || src == Signaled as usize
+ );
+
+ unsafe { ::std::mem::transmute(src) }
+ }
+}
+
+impl From<Lifecycle> for usize {
+ fn from(src: Lifecycle) -> usize {
+ let v = src as usize;
+ debug_assert!(v & LIFECYCLE_MASK == v);
+ v
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::Lifecycle::*;
+ use super::*;
+
+ #[test]
+ fn lifecycle_encode() {
+ let lifecycles = &[Shutdown, Running, Sleeping, Notified, Signaled];
+
+ for &lifecycle in lifecycles {
+ let mut v: usize = lifecycle.into();
+ v &= LIFECYCLE_MASK;
+
+ assert_eq!(lifecycle, Lifecycle::from(v));
+ }
+ }
+
+ #[test]
+ fn lifecycle_ord() {
+ assert!(Running >= Shutdown);
+ assert!(Signaled >= Notified);
+ assert!(Signaled >= Sleeping);
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/tests/blocking.rs b/third_party/rust/tokio-threadpool/tests/blocking.rs
new file mode 100644
index 0000000000..74520e5299
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/tests/blocking.rs
@@ -0,0 +1,421 @@
+extern crate tokio_executor;
+extern crate tokio_threadpool;
+
+extern crate env_logger;
+extern crate futures;
+extern crate rand;
+
+use tokio_threadpool::*;
+
+use futures::future::{lazy, poll_fn};
+use futures::*;
+use rand::*;
+
+use std::sync::atomic::Ordering::*;
+use std::sync::atomic::*;
+use std::sync::*;
+use std::thread;
+use std::time::Duration;
+
+#[test]
+fn basic() {
+ let _ = ::env_logger::try_init();
+
+ let pool = Builder::new().pool_size(1).max_blocking(1).build();
+
+ let (tx1, rx1) = mpsc::channel();
+ let (tx2, rx2) = mpsc::channel();
+
+ pool.spawn(lazy(move || {
+ let res = blocking(|| {
+ let v = rx1.recv().unwrap();
+ tx2.send(v).unwrap();
+ })
+ .unwrap();
+
+ assert!(res.is_ready());
+ Ok(().into())
+ }));
+
+ pool.spawn(lazy(move || {
+ tx1.send(()).unwrap();
+ Ok(().into())
+ }));
+
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn other_executors_can_run_inside_blocking() {
+ let _ = ::env_logger::try_init();
+
+ let pool = Builder::new().pool_size(1).max_blocking(1).build();
+
+ let (tx, rx) = mpsc::channel();
+
+ pool.spawn(lazy(move || {
+ let res = blocking(|| {
+ let _e = tokio_executor::enter().expect("nested blocking enter");
+ tx.send(()).unwrap();
+ })
+ .unwrap();
+
+ assert!(res.is_ready());
+ Ok(().into())
+ }));
+
+ rx.recv().unwrap();
+}
+
+#[test]
+fn notify_task_on_capacity() {
+ const BLOCKING: usize = 10;
+
+ let pool = Builder::new().pool_size(1).max_blocking(1).build();
+
+ let rem = Arc::new(AtomicUsize::new(BLOCKING));
+ let (tx, rx) = mpsc::channel();
+
+ for _ in 0..BLOCKING {
+ let rem = rem.clone();
+ let tx = tx.clone();
+
+ pool.spawn(lazy(move || {
+ poll_fn(move || {
+ blocking(|| {
+ thread::sleep(Duration::from_millis(100));
+ let prev = rem.fetch_sub(1, Relaxed);
+
+ if prev == 1 {
+ tx.send(()).unwrap();
+ }
+ })
+ .map_err(|e| panic!("blocking err {:?}", e))
+ })
+ }));
+ }
+
+ rx.recv().unwrap();
+
+ assert_eq!(0, rem.load(Relaxed));
+}
+
+#[test]
+fn capacity_is_use_it_or_lose_it() {
+ use futures::sync::oneshot;
+ use futures::task::Task;
+ use futures::Async::*;
+ use futures::*;
+
+ // TODO: Run w/ bigger pool size
+
+ let pool = Builder::new().pool_size(1).max_blocking(1).build();
+
+ let (tx1, rx1) = mpsc::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = mpsc::channel();
+ let (tx4, rx4) = mpsc::channel();
+
+ // First, fill the blocking capacity
+ pool.spawn(lazy(move || {
+ poll_fn(move || {
+ blocking(|| {
+ rx1.recv().unwrap();
+ })
+ .map_err(|_| panic!())
+ })
+ }));
+
+ pool.spawn(lazy(move || {
+ rx2.map_err(|_| panic!()).and_then(|task: Task| {
+ poll_fn(move || {
+ blocking(|| {
+ // Notify the other task
+ task.notify();
+
+ // Block until woken
+ rx3.recv().unwrap();
+ })
+ .map_err(|_| panic!())
+ })
+ })
+ }));
+
+ // Spawn a future that will try to block, get notified, then not actually
+ // use the blocking
+ let mut i = 0;
+ let mut tx2 = Some(tx2);
+
+ pool.spawn(lazy(move || {
+ poll_fn(move || {
+ match i {
+ 0 => {
+ i = 1;
+
+ let res = blocking(|| unreachable!()).map_err(|_| panic!());
+
+ assert!(res.unwrap().is_not_ready());
+
+ // Unblock the first blocker
+ tx1.send(()).unwrap();
+
+ return Ok(NotReady);
+ }
+ 1 => {
+ i = 2;
+
+ // Skip blocking, and notify the second task that it should
+ // start blocking
+ let me = task::current();
+ tx2.take().unwrap().send(me).unwrap();
+
+ return Ok(NotReady);
+ }
+ 2 => {
+ let res = blocking(|| unreachable!()).map_err(|_| panic!());
+
+ assert!(res.unwrap().is_not_ready());
+
+ // Unblock the first blocker
+ tx3.send(()).unwrap();
+ tx4.send(()).unwrap();
+ Ok(().into())
+ }
+ _ => unreachable!(),
+ }
+ })
+ }));
+
+ rx4.recv().unwrap();
+}
+
+#[test]
+fn blocking_thread_does_not_take_over_shutdown_worker_thread() {
+ let pool = Builder::new().pool_size(2).max_blocking(1).build();
+
+ let (enter_tx, enter_rx) = mpsc::channel();
+ let (exit_tx, exit_rx) = mpsc::channel();
+ let (try_tx, try_rx) = mpsc::channel();
+
+ let exited = Arc::new(AtomicBool::new(false));
+
+ {
+ let exited = exited.clone();
+
+ pool.spawn(lazy(move || {
+ poll_fn(move || {
+ blocking(|| {
+ enter_tx.send(()).unwrap();
+ exit_rx.recv().unwrap();
+ exited.store(true, Relaxed);
+ })
+ .map_err(|_| panic!())
+ })
+ }));
+ }
+
+ // Wait for the task to block
+ let _ = enter_rx.recv().unwrap();
+
+ // Spawn another task that attempts to block
+ pool.spawn(lazy(move || {
+ poll_fn(move || {
+ let res = blocking(|| {}).unwrap();
+
+ assert_eq!(res.is_ready(), exited.load(Relaxed));
+
+ try_tx.send(res.is_ready()).unwrap();
+
+ Ok(res)
+ })
+ }));
+
+ // Wait for the second task to try to block (and not be ready).
+ let res = try_rx.recv().unwrap();
+ assert!(!res);
+
+ // Unblock the first task
+ exit_tx.send(()).unwrap();
+
+ // Wait for the second task to successfully block.
+ let res = try_rx.recv().unwrap();
+ assert!(res);
+
+ drop(pool);
+}
+
+#[test]
+fn blocking_one_time_gets_capacity_for_multiple_blocks() {
+ const ITER: usize = 1;
+ const BLOCKING: usize = 2;
+
+ for _ in 0..ITER {
+ let pool = Builder::new().pool_size(4).max_blocking(1).build();
+
+ let rem = Arc::new(AtomicUsize::new(BLOCKING));
+ let (tx, rx) = mpsc::channel();
+
+ for _ in 0..BLOCKING {
+ let rem = rem.clone();
+ let tx = tx.clone();
+
+ pool.spawn(lazy(move || {
+ poll_fn(move || {
+ // First block
+ let res = blocking(|| {
+ thread::sleep(Duration::from_millis(100));
+ })
+ .map_err(|e| panic!("blocking err {:?}", e));
+
+ try_ready!(res);
+
+ let res = blocking(|| {
+ thread::sleep(Duration::from_millis(100));
+ let prev = rem.fetch_sub(1, Relaxed);
+
+ if prev == 1 {
+ tx.send(()).unwrap();
+ }
+ });
+
+ assert!(res.unwrap().is_ready());
+
+ Ok(().into())
+ })
+ }));
+ }
+
+ rx.recv().unwrap();
+
+ assert_eq!(0, rem.load(Relaxed));
+ }
+}
+
+#[test]
+fn shutdown() {
+ const ITER: usize = 1_000;
+ const BLOCKING: usize = 10;
+
+ for _ in 0..ITER {
+ let num_inc = Arc::new(AtomicUsize::new(0));
+ let num_dec = Arc::new(AtomicUsize::new(0));
+ let (tx, rx) = mpsc::channel();
+
+ let pool = {
+ let num_inc = num_inc.clone();
+ let num_dec = num_dec.clone();
+
+ Builder::new()
+ .pool_size(1)
+ .max_blocking(BLOCKING)
+ .after_start(move || {
+ num_inc.fetch_add(1, Relaxed);
+ })
+ .before_stop(move || {
+ num_dec.fetch_add(1, Relaxed);
+ })
+ .build()
+ };
+
+ let barrier = Arc::new(Barrier::new(BLOCKING));
+
+ for _ in 0..BLOCKING {
+ let barrier = barrier.clone();
+ let tx = tx.clone();
+
+ pool.spawn(lazy(move || {
+ let res = blocking(|| {
+ barrier.wait();
+ Ok::<_, ()>(())
+ })
+ .unwrap();
+
+ tx.send(()).unwrap();
+
+ assert!(res.is_ready());
+ Ok(().into())
+ }));
+ }
+
+ for _ in 0..BLOCKING {
+ rx.recv().unwrap();
+ }
+
+ // Shutdown
+ drop(pool);
+
+ assert_eq!(11, num_inc.load(Relaxed));
+ assert_eq!(11, num_dec.load(Relaxed));
+ }
+}
+
+#[derive(Debug, Copy, Clone)]
+enum Sleep {
+ Skip,
+ Yield,
+ Rand,
+ Fixed(Duration),
+}
+
+#[test]
+fn hammer() {
+ use self::Sleep::*;
+
+ const ITER: usize = 5;
+
+ let combos = [
+ (2, 4, 1_000, Skip),
+ (2, 4, 1_000, Yield),
+ (2, 4, 100, Rand),
+ (2, 4, 100, Fixed(Duration::from_millis(3))),
+ (2, 4, 100, Fixed(Duration::from_millis(12))),
+ ];
+
+ for &(size, max_blocking, n, sleep) in &combos {
+ for _ in 0..ITER {
+ let pool = Builder::new()
+ .pool_size(size)
+ .max_blocking(max_blocking)
+ .build();
+
+ let cnt_task = Arc::new(AtomicUsize::new(0));
+ let cnt_block = Arc::new(AtomicUsize::new(0));
+
+ for _ in 0..n {
+ let cnt_task = cnt_task.clone();
+ let cnt_block = cnt_block.clone();
+
+ pool.spawn(lazy(move || {
+ cnt_task.fetch_add(1, Relaxed);
+
+ poll_fn(move || {
+ blocking(|| {
+ match sleep {
+ Skip => {}
+ Yield => {
+ thread::yield_now();
+ }
+ Rand => {
+ let ms = thread_rng().gen_range(3, 12);
+ thread::sleep(Duration::from_millis(ms));
+ }
+ Fixed(dur) => {
+ thread::sleep(dur);
+ }
+ }
+
+ cnt_block.fetch_add(1, Relaxed);
+ })
+ .map_err(|_| panic!())
+ })
+ }));
+ }
+
+ // Wait for the work to complete
+ pool.shutdown_on_idle().wait().unwrap();
+
+ assert_eq!(n, cnt_task.load(Relaxed));
+ assert_eq!(n, cnt_block.load(Relaxed));
+ }
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/tests/hammer.rs b/third_party/rust/tokio-threadpool/tests/hammer.rs
new file mode 100644
index 0000000000..d9ee41ca6a
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/tests/hammer.rs
@@ -0,0 +1,101 @@
+extern crate futures;
+extern crate tokio_threadpool;
+
+use tokio_threadpool::*;
+
+use futures::{Future, Poll, Sink, Stream};
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::*;
+use std::sync::Arc;
+
+#[test]
+fn hammer() {
+ use futures::future;
+ use futures::sync::{mpsc, oneshot};
+
+ const N: usize = 1000;
+ const ITER: usize = 20;
+
+ struct Counted<T> {
+ cnt: Arc<AtomicUsize>,
+ inner: T,
+ }
+
+ impl<T: Future> Future for Counted<T> {
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<T::Item, T::Error> {
+ self.inner.poll()
+ }
+ }
+
+ impl<T> Drop for Counted<T> {
+ fn drop(&mut self) {
+ self.cnt.fetch_add(1, Relaxed);
+ }
+ }
+
+ for _ in 0..ITER {
+ let pool = Builder::new()
+ // .pool_size(30)
+ .build();
+
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ let (listen_tx, listen_rx) = mpsc::unbounded::<oneshot::Sender<oneshot::Sender<()>>>();
+ let mut listen_tx = listen_tx.wait();
+
+ pool.spawn({
+ let c1 = cnt.clone();
+ let c2 = cnt.clone();
+ let pool = pool.sender().clone();
+ let task = listen_rx
+ .map_err(|e| panic!("accept error = {:?}", e))
+ .for_each(move |tx| {
+ let task = future::lazy(|| {
+ let (tx2, rx2) = oneshot::channel();
+
+ tx.send(tx2).unwrap();
+ rx2
+ })
+ .map_err(|e| panic!("e={:?}", e))
+ .and_then(|_| Ok(()));
+
+ pool.spawn(Counted {
+ inner: task,
+ cnt: c1.clone(),
+ })
+ .unwrap();
+
+ Ok(())
+ });
+
+ Counted {
+ inner: task,
+ cnt: c2,
+ }
+ });
+
+ for _ in 0..N {
+ let cnt = cnt.clone();
+ let (tx, rx) = oneshot::channel();
+ listen_tx.send(tx).unwrap();
+
+ pool.spawn({
+ let task = rx.map_err(|e| panic!("rx err={:?}", e)).and_then(|tx| {
+ tx.send(()).unwrap();
+ Ok(())
+ });
+
+ Counted { inner: task, cnt }
+ });
+ }
+
+ drop(listen_tx);
+
+ pool.shutdown_on_idle().wait().unwrap();
+ assert_eq!(N * 2 + 1, cnt.load(Relaxed));
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/tests/threadpool.rs b/third_party/rust/tokio-threadpool/tests/threadpool.rs
new file mode 100644
index 0000000000..bd6736f9aa
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/tests/threadpool.rs
@@ -0,0 +1,555 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_threadpool;
+
+use tokio_executor::park::{Park, Unpark};
+use tokio_threadpool::park::{DefaultPark, DefaultUnpark};
+use tokio_threadpool::*;
+
+use futures::future::lazy;
+use futures::{Async, Future, Poll, Sink, Stream};
+
+use std::cell::Cell;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::atomic::*;
+use std::sync::{mpsc, Arc};
+use std::time::Duration;
+
+thread_local!(static FOO: Cell<u32> = Cell::new(0));
+
+fn ignore_results<F: Future + Send + 'static>(
+ f: F,
+) -> Box<dyn Future<Item = (), Error = ()> + Send> {
+ Box::new(f.map(|_| ()).map_err(|_| ()))
+}
+
+#[test]
+fn natural_shutdown_simple_futures() {
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..1_000 {
+ let num_inc = Arc::new(AtomicUsize::new(0));
+ let num_dec = Arc::new(AtomicUsize::new(0));
+
+ FOO.with(|f| {
+ f.set(1);
+
+ let pool = {
+ let num_inc = num_inc.clone();
+ let num_dec = num_dec.clone();
+
+ Builder::new()
+ .around_worker(move |w, _| {
+ num_inc.fetch_add(1, Relaxed);
+ w.run();
+ num_dec.fetch_add(1, Relaxed);
+ })
+ .build()
+ };
+
+ let tx = pool.sender().clone();
+
+ let a = {
+ let (t, rx) = mpsc::channel();
+ tx.spawn(lazy(move || {
+ // Makes sure this runs on a worker thread
+ FOO.with(|f| assert_eq!(f.get(), 0));
+
+ t.send("one").unwrap();
+ Ok(())
+ }))
+ .unwrap();
+ rx
+ };
+
+ let b = {
+ let (t, rx) = mpsc::channel();
+ tx.spawn(lazy(move || {
+ // Makes sure this runs on a worker thread
+ FOO.with(|f| assert_eq!(f.get(), 0));
+
+ t.send("two").unwrap();
+ Ok(())
+ }))
+ .unwrap();
+ rx
+ };
+
+ drop(tx);
+
+ assert_eq!("one", a.recv().unwrap());
+ assert_eq!("two", b.recv().unwrap());
+
+ // Wait for the pool to shutdown
+ pool.shutdown().wait().unwrap();
+
+ // Assert that at least one thread started
+ let num_inc = num_inc.load(Relaxed);
+ assert!(num_inc > 0);
+
+ // Assert that all threads shutdown
+ let num_dec = num_dec.load(Relaxed);
+ assert_eq!(num_inc, num_dec);
+ });
+ }
+}
+
+#[test]
+fn force_shutdown_drops_futures() {
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..1_000 {
+ let num_inc = Arc::new(AtomicUsize::new(0));
+ let num_dec = Arc::new(AtomicUsize::new(0));
+ let num_drop = Arc::new(AtomicUsize::new(0));
+
+ struct Never(Arc<AtomicUsize>);
+
+ impl Future for Never {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ impl Drop for Never {
+ fn drop(&mut self) {
+ self.0.fetch_add(1, Relaxed);
+ }
+ }
+
+ let a = num_inc.clone();
+ let b = num_dec.clone();
+
+ let pool = Builder::new()
+ .around_worker(move |w, _| {
+ a.fetch_add(1, Relaxed);
+ w.run();
+ b.fetch_add(1, Relaxed);
+ })
+ .build();
+ let mut tx = pool.sender().clone();
+
+ tx.spawn(Never(num_drop.clone())).unwrap();
+
+ // Wait for the pool to shutdown
+ pool.shutdown_now().wait().unwrap();
+
+ // Assert that only a single thread was spawned.
+ let a = num_inc.load(Relaxed);
+ assert!(a >= 1);
+
+ // Assert that all threads shutdown
+ let b = num_dec.load(Relaxed);
+ assert_eq!(a, b);
+
+ // Assert that the future was dropped
+ let c = num_drop.load(Relaxed);
+ assert_eq!(c, 1);
+ }
+}
+
+#[test]
+fn drop_threadpool_drops_futures() {
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..1_000 {
+ let num_inc = Arc::new(AtomicUsize::new(0));
+ let num_dec = Arc::new(AtomicUsize::new(0));
+ let num_drop = Arc::new(AtomicUsize::new(0));
+
+ struct Never(Arc<AtomicUsize>);
+
+ impl Future for Never {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ impl Drop for Never {
+ fn drop(&mut self) {
+ self.0.fetch_add(1, Relaxed);
+ }
+ }
+
+ let a = num_inc.clone();
+ let b = num_dec.clone();
+
+ let pool = Builder::new()
+ .max_blocking(2)
+ .pool_size(20)
+ .around_worker(move |w, _| {
+ a.fetch_add(1, Relaxed);
+ w.run();
+ b.fetch_add(1, Relaxed);
+ })
+ .build();
+ let mut tx = pool.sender().clone();
+
+ tx.spawn(Never(num_drop.clone())).unwrap();
+
+ // Wait for the pool to shutdown
+ drop(pool);
+
+ // Assert that only a single thread was spawned.
+ let a = num_inc.load(Relaxed);
+ assert!(a >= 1);
+
+ // Assert that all threads shutdown
+ let b = num_dec.load(Relaxed);
+ assert_eq!(a, b);
+
+ // Assert that the future was dropped
+ let c = num_drop.load(Relaxed);
+ assert_eq!(c, 1);
+ }
+}
+
+#[test]
+fn many_oneshot_futures() {
+ const NUM: usize = 10_000;
+
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..50 {
+ let pool = ThreadPool::new();
+ let mut tx = pool.sender().clone();
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ for _ in 0..NUM {
+ let cnt = cnt.clone();
+ tx.spawn(lazy(move || {
+ cnt.fetch_add(1, Relaxed);
+ Ok(())
+ }))
+ .unwrap();
+ }
+
+ // Wait for the pool to shutdown
+ pool.shutdown().wait().unwrap();
+
+ let num = cnt.load(Relaxed);
+ assert_eq!(num, NUM);
+ }
+}
+
+#[test]
+fn many_multishot_futures() {
+ use futures::sync::mpsc;
+
+ const CHAIN: usize = 200;
+ const CYCLES: usize = 5;
+ const TRACKS: usize = 50;
+
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..50 {
+ let pool = ThreadPool::new();
+ let mut pool_tx = pool.sender().clone();
+
+ let mut start_txs = Vec::with_capacity(TRACKS);
+ let mut final_rxs = Vec::with_capacity(TRACKS);
+
+ for _ in 0..TRACKS {
+ let (start_tx, mut chain_rx) = mpsc::channel(10);
+
+ for _ in 0..CHAIN {
+ let (next_tx, next_rx) = mpsc::channel(10);
+
+ let rx = chain_rx.map_err(|e| panic!("{:?}", e));
+
+ // Forward all the messages
+ pool_tx
+ .spawn(
+ next_tx
+ .send_all(rx)
+ .map(|_| ())
+ .map_err(|e| panic!("{:?}", e)),
+ )
+ .unwrap();
+
+ chain_rx = next_rx;
+ }
+
+ // This final task cycles if needed
+ let (final_tx, final_rx) = mpsc::channel(10);
+ let cycle_tx = start_tx.clone();
+ let mut rem = CYCLES;
+
+ let task = chain_rx.take(CYCLES as u64).for_each(move |msg| {
+ rem -= 1;
+ let send = if rem == 0 {
+ final_tx.clone().send(msg)
+ } else {
+ cycle_tx.clone().send(msg)
+ };
+
+ send.then(|res| {
+ res.unwrap();
+ Ok(())
+ })
+ });
+ pool_tx.spawn(ignore_results(task)).unwrap();
+
+ start_txs.push(start_tx);
+ final_rxs.push(final_rx);
+ }
+
+ for start_tx in start_txs {
+ start_tx.send("ping").wait().unwrap();
+ }
+
+ for final_rx in final_rxs {
+ final_rx.wait().next().unwrap().unwrap();
+ }
+
+ // Shutdown the pool
+ pool.shutdown().wait().unwrap();
+ }
+}
+
+#[test]
+fn global_executor_is_configured() {
+ let pool = ThreadPool::new();
+ let tx = pool.sender().clone();
+
+ let (signal_tx, signal_rx) = mpsc::channel();
+
+ tx.spawn(lazy(move || {
+ tokio_executor::spawn(lazy(move || {
+ signal_tx.send(()).unwrap();
+ Ok(())
+ }));
+
+ Ok(())
+ }))
+ .unwrap();
+
+ signal_rx.recv().unwrap();
+
+ pool.shutdown().wait().unwrap();
+}
+
+#[test]
+fn new_threadpool_is_idle() {
+ let pool = ThreadPool::new();
+ pool.shutdown_on_idle().wait().unwrap();
+}
+
+#[test]
+fn busy_threadpool_is_not_idle() {
+ use futures::sync::oneshot;
+
+ // let pool = ThreadPool::new();
+ let pool = Builder::new().pool_size(4).max_blocking(2).build();
+ let tx = pool.sender().clone();
+
+ let (term_tx, term_rx) = oneshot::channel();
+
+ tx.spawn(term_rx.then(|_| Ok(()))).unwrap();
+
+ let mut idle = pool.shutdown_on_idle();
+
+ struct IdleFut<'a>(&'a mut Shutdown);
+
+ impl<'a> Future for IdleFut<'a> {
+ type Item = ();
+ type Error = ();
+ fn poll(&mut self) -> Poll<(), ()> {
+ assert!(self.0.poll().unwrap().is_not_ready());
+ Ok(Async::Ready(()))
+ }
+ }
+
+ IdleFut(&mut idle).wait().unwrap();
+
+ term_tx.send(()).unwrap();
+
+ idle.wait().unwrap();
+}
+
+#[test]
+fn panic_in_task() {
+ let pool = ThreadPool::new();
+ let tx = pool.sender().clone();
+
+ struct Boom;
+
+ impl Future for Boom {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ panic!();
+ }
+ }
+
+ impl Drop for Boom {
+ fn drop(&mut self) {
+ assert!(::std::thread::panicking());
+ }
+ }
+
+ tx.spawn(Boom).unwrap();
+
+ pool.shutdown_on_idle().wait().unwrap();
+}
+
+#[test]
+fn count_panics() {
+ let counter = Arc::new(AtomicUsize::new(0));
+ let counter_ = counter.clone();
+ let pool = tokio_threadpool::Builder::new()
+ .panic_handler(move |_err| {
+ // We caught a panic.
+ counter_.fetch_add(1, Relaxed);
+ })
+ .build();
+ // Spawn a future that will panic.
+ pool.spawn(lazy(|| -> Result<(), ()> { panic!() }));
+ pool.shutdown_on_idle().wait().unwrap();
+ let counter = counter.load(Relaxed);
+ assert_eq!(counter, 1);
+}
+
+#[test]
+fn multi_threadpool() {
+ use futures::sync::oneshot;
+
+ let pool1 = ThreadPool::new();
+ let pool2 = ThreadPool::new();
+
+ let (tx, rx) = oneshot::channel();
+ let (done_tx, done_rx) = mpsc::channel();
+
+ pool2.spawn({
+ rx.and_then(move |_| {
+ done_tx.send(()).unwrap();
+ Ok(())
+ })
+ .map_err(|e| panic!("err={:?}", e))
+ });
+
+ pool1.spawn(lazy(move || {
+ tx.send(()).unwrap();
+ Ok(())
+ }));
+
+ done_rx.recv().unwrap();
+}
+
+#[test]
+fn eagerly_drops_futures() {
+ use futures::future::{empty, lazy, Future};
+ use futures::task;
+ use std::sync::mpsc;
+
+ struct NotifyOnDrop(mpsc::Sender<()>);
+
+ impl Drop for NotifyOnDrop {
+ fn drop(&mut self) {
+ self.0.send(()).unwrap();
+ }
+ }
+
+ struct MyPark {
+ inner: DefaultPark,
+ #[allow(dead_code)]
+ park_tx: mpsc::SyncSender<()>,
+ unpark_tx: mpsc::SyncSender<()>,
+ }
+
+ impl Park for MyPark {
+ type Unpark = MyUnpark;
+ type Error = <DefaultPark as Park>::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ MyUnpark {
+ inner: self.inner.unpark(),
+ unpark_tx: self.unpark_tx.clone(),
+ }
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park()
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.inner.park_timeout(duration)
+ }
+ }
+
+ struct MyUnpark {
+ inner: DefaultUnpark,
+ #[allow(dead_code)]
+ unpark_tx: mpsc::SyncSender<()>,
+ }
+
+ impl Unpark for MyUnpark {
+ fn unpark(&self) {
+ self.inner.unpark()
+ }
+ }
+
+ let (task_tx, task_rx) = mpsc::channel();
+ let (drop_tx, drop_rx) = mpsc::channel();
+ let (park_tx, park_rx) = mpsc::sync_channel(0);
+ let (unpark_tx, unpark_rx) = mpsc::sync_channel(0);
+
+ // Get the signal that the handler dropped.
+ let notify_on_drop = NotifyOnDrop(drop_tx);
+
+ let pool = tokio_threadpool::Builder::new()
+ .custom_park(move |_| MyPark {
+ inner: DefaultPark::new(),
+ park_tx: park_tx.clone(),
+ unpark_tx: unpark_tx.clone(),
+ })
+ .build();
+
+ pool.spawn(lazy(move || {
+ // Get a handle to the current task.
+ let task = task::current();
+
+ // Send it to the main thread to hold on to.
+ task_tx.send(task).unwrap();
+
+ // This future will never resolve, it is only used to hold on to thee
+ // `notify_on_drop` handle.
+ empty::<(), ()>().then(move |_| {
+ // This code path should never be reached.
+ if true {
+ panic!()
+ }
+
+ // Explicitly drop `notify_on_drop` here, this is mostly to ensure
+ // that the `notify_on_drop` handle gets moved into the task. It
+ // will actually get dropped when the runtime is dropped.
+ drop(notify_on_drop);
+
+ Ok(())
+ })
+ }));
+
+ // Wait until we get the task handle.
+ let task = task_rx.recv().unwrap();
+
+ // Drop the pool, this should result in futures being forcefully dropped.
+ drop(pool);
+
+ // Make sure `MyPark` and `MyUnpark` were dropped during shutdown.
+ assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
+ assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
+
+ // If the future is forcefully dropped, then we will get a signal here.
+ drop_rx.recv().unwrap();
+
+ // Ensure `task` lives until after the test completes.
+ drop(task);
+}