diff options
Diffstat (limited to 'third_party/rust/parking_lot')
21 files changed, 5454 insertions, 0 deletions
diff --git a/third_party/rust/parking_lot/.cargo-checksum.json b/third_party/rust/parking_lot/.cargo-checksum.json new file mode 100644 index 0000000000..b819e7d4d2 --- /dev/null +++ b/third_party/rust/parking_lot/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"f9559e40e966a870d64367a75e41b233ee1bc1746f6f242501b9462e69b4db12","Cargo.toml":"a7cce7f9fbc2cb0a5c388384073f9b2e820d6b47f314ef0dcccb5613741b8202","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"c9a75f18b9ab2927829a208fc6aa2cf4e63b8420887ba29cdb265d6619ae82d5","README.md":"93e21ceae3fe1b6cf9f6c2ebcbf09211a08b446f849048900d8d7efe52751614","bors.toml":"938b40da0516dd1e18f6ab893d43a3c6a011836124696157568380f3ce784958","src/condvar.rs":"bace2ee6e97eab088833a22a511029fb188e5f6bc9289bfcd5d41e3915de921d","src/deadlock.rs":"7d3ebb5b4f63658435df277bb983e352e4bc651a92c4fd48ae68bf103e452d0d","src/elision.rs":"9aceb0b27fd3cdaf4ef76bda63435a96ec2fdef24be098b9e4edbc39db000765","src/fair_mutex.rs":"d0a032e8207919da04b85f1422dfb14aa2af7aad78843c708d2fe3e0478e401a","src/lib.rs":"d83b9e6c70d2c6167d55dc2c4cf2b02abf4b11bfa092949750bc0cdafd38a45c","src/mutex.rs":"9fff878238ef798bfe2f48b410ca3074914826005a12df4f9b96619b2d3e6409","src/once.rs":"a1c38a5d87077e3d112d57e065ee126a24ab19f04fba9cb1f2cb43bc82caf33c","src/raw_fair_mutex.rs":"316f954d9673ac5b8d6bf4c19f2444800f63daf801c224d986e2d6dac810643c","src/raw_mutex.rs":"a24262800d61b8486ef0cfb1b72ead748c544ca2a551ec48346036ebb4fc385b","src/raw_rwlock.rs":"d0d93b096e68da3213da49d51849baf4ec241552eaa9b791f38314eb36c4e1a9","src/remutex.rs":"7a0de55161cd57497bb52d3aecca69a89eff2e71cdb2d762df53579e0607b489","src/rwlock.rs":"68584dffeb4368839774c0ee987481affc0547c38977a17970514f072a582d61","src/util.rs":"26325483bcd23ab8ceb16bf47541152c5eb0bc394f247795c14698fa7b586692","tests/issue_203.rs":"5fbdf6ec63f391d86457df949678c203a1e81e8aa32d4e10037fa76e768702c0"},"package":"6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"}
\ No newline at end of file diff --git a/third_party/rust/parking_lot/CHANGELOG.md b/third_party/rust/parking_lot/CHANGELOG.md new file mode 100644 index 0000000000..a4734634ef --- /dev/null +++ b/third_party/rust/parking_lot/CHANGELOG.md @@ -0,0 +1,144 @@ +## parking_lot 0.11.1, lock_api 0.4.2 (2020-11-18) + +- Fix bounds on Send and Sync impls for lock guards. (#262) +- Fix incorrect memory ordering in `RwLock`. (#260) + +## lock_api 0.4.1 (2020-07-06) + +- Add `data_ptr` method to lock types to allow unsafely accessing the inner data + without a guard. (#247) + +## parking_lot 0.11.0, parking_lot_core 0.8.0, lock_api 0.4.0 (2020-06-23) + +- Add `is_locked` method to mutex types. (#235) +- Make `RawReentrantMutex` public. (#233) +- Allow lock guard to be sent to another thread with the `send_guard` feature. (#240) +- Use `Instant` type from the `instant` crate on wasm32-unknown-unknown. (#231) +- Remove deprecated and unsound `MappedRwLockWriteGuard::downgrade`. (#244) +- Most methods on the `Raw*` traits have been made unsafe since they assume + the current thread holds the lock. (#243) + +## parking_lot_core 0.7.2 (2020-04-21) + +- Add support for `wasm32-unknown-unknown` under the "nightly" feature. (#226) + +## parking_lot 0.10.2 (2020-04-10) + +- Update minimum version of `lock_api`. + +## parking_lot 0.10.1, parking_lot_core 0.7.1, lock_api 0.3.4 (2020-04-10) + +- Add methods to construct `Mutex`, `RwLock`, etc in a `const` context. (#217) +- Add `FairMutex` which always uses fair unlocking. (#204) +- Fixed panic with deadlock detection on macOS. (#203) +- Fixed incorrect synchronization in `create_hashtable`. (#210) +- Use `llvm_asm!` instead of the deprecated `asm!`. (#223) + +## lock_api 0.3.3 (2020-01-04) + +- Deprecate unsound `MappedRwLockWriteGuard::downgrade` (#198) + +## parking_lot 0.10.0, parking_lot_core 0.7.0, lock_api 0.3.2 (2019-11-25) + +- Upgrade smallvec dependency to 1.0 in parking_lot_core. +- Replace all usage of `mem::uninitialized` with `mem::MaybeUninit`. +- The minimum required Rust version is bumped to 1.36. Because of the above two changes. +- Make methods on `WaitTimeoutResult` and `OnceState` take `self` by value instead of reference. + +## parking_lot_core 0.6.2 (2019-07-22) + +- Fixed compile error on Windows with old cfg_if version. (#164) + +## parking_lot_core 0.6.1 (2019-07-17) + +- Fixed Android build. (#163) + +## parking_lot 0.9.0, parking_lot_core 0.6.0, lock_api 0.3.1 (2019-07-14) + +- Re-export lock_api (0.3.1) from parking_lot (#150) +- Removed (non-dev) dependency on rand crate for fairness mechanism, by + including a simple xorshift PRNG in core (#144) +- Android now uses the futex-based ThreadParker. (#140) +- Fixed CloudABI ThreadParker. (#140) +- Fix race condition in lock_api::ReentrantMutex (da16c2c7) + +## lock_api 0.3.0 (2019-07-03, _yanked_) + +- Use NonZeroUsize in GetThreadId::nonzero_thread_id (#148) +- Debug assert lock_count in ReentrantMutex (#148) +- Tag as `unsafe` and document some internal methods (#148) +- This release was _yanked_ due to a regression in ReentrantMutex (da16c2c7) + +## parking_lot 0.8.1 (2019-07-03, _yanked_) + +- Re-export lock_api (0.3.0) from parking_lot (#150) +- This release was _yanked_ from crates.io due to unexpected breakage (#156) + +## parking_lot 0.8.0, parking_lot_core 0.5.0, lock_api 0.2.0 (2019-05-04) + +- Fix race conditions in deadlock detection. +- Support for more platforms by adding ThreadParker implementations for + Wasm, Redox, SGX and CloudABI. +- Drop support for older Rust. parking_lot now requires 1.31 and is a + Rust 2018 edition crate (#122). +- Disable the owning_ref feature by default. +- Fix was_last_thread value in the timeout callback of park() (#129). +- Support single byte Mutex/Once on stable Rust when compiler is at least + version 1.34. +- Make Condvar::new and Once::new const fns on stable Rust and remove + ONCE_INIT (#134). +- Add optional Serde support (#135). + +## parking_lot 0.7.1 (2019-01-01) + +- Fixed potential deadlock when upgrading a RwLock. +- Fixed overflow panic on very long timeouts (#111). + +## parking_lot 0.7.0, parking_lot_core 0.4.0 (2018-11-26) + +- Return if or how many threads were notified from `Condvar::notify_*` + +## parking_lot 0.6.3 (2018-07-18) + +- Export `RawMutex`, `RawRwLock` and `RawThreadId`. + +## parking_lot 0.6.2 (2018-06-18) + +- Enable `lock_api/nightly` feature from `parking_lot/nightly` (#79) + +## parking_lot 0.6.1 (2018-06-08) + +Added missing typedefs for mapped lock guards: + +- `MappedMutexGuard` +- `MappedReentrantMutexGuard` +- `MappedRwLockReadGuard` +- `MappedRwLockWriteGuard` + +## parking_lot 0.6.0 (2018-06-08) + +This release moves most of the code for type-safe `Mutex` and `RwLock` types +into a separate crate called `lock_api`. This new crate is compatible with +`no_std` and provides `Mutex` and `RwLock` type-safe wrapper types from a raw +mutex type which implements the `RawMutex` or `RawRwLock` trait. The API +provided by the wrapper types can be extended by implementing more traits on +the raw mutex type which provide more functionality (e.g. `RawMutexTimed`). See +the crate documentation for more details. + +There are also several major changes: + +- The minimum required Rust version is bumped to 1.26. +- All methods on `MutexGuard` (and other guard types) are no longer inherent + methods and must be called as `MutexGuard::method(self)`. This avoids + conflicts with methods from the inner type. +- `MutexGuard` (and other guard types) add the `unlocked` method which + temporarily unlocks a mutex, runs the given closure, and then re-locks the + mutex. +- `MutexGuard` (and other guard types) add the `bump` method which gives a + chance for other threads to acquire the mutex by temporarily unlocking it and + re-locking it. However this is optimized for the common case where there are + no threads waiting on the lock, in which case no unlocking is performed. +- `MutexGuard` (and other guard types) add the `map` method which returns a + `MappedMutexGuard` which holds only a subset of the original locked type. The + `MappedMutexGuard` type is identical to `MutexGuard` except that it does not + support the `unlocked` and `bump` methods, and can't be used with `CondVar`. diff --git a/third_party/rust/parking_lot/Cargo.toml b/third_party/rust/parking_lot/Cargo.toml new file mode 100644 index 0000000000..c78888f7b6 --- /dev/null +++ b/third_party/rust/parking_lot/Cargo.toml @@ -0,0 +1,46 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +edition = "2018" +name = "parking_lot" +version = "0.11.1" +authors = ["Amanieu d'Antras <amanieu@gmail.com>"] +description = "More compact and efficient implementations of the standard synchronization primitives." +readme = "README.md" +keywords = ["mutex", "condvar", "rwlock", "once", "thread"] +categories = ["concurrency"] +license = "Apache-2.0/MIT" +repository = "https://github.com/Amanieu/parking_lot" +[dependencies.instant] +version = "0.1.4" + +[dependencies.lock_api] +version = "0.4.0" + +[dependencies.parking_lot_core] +version = "0.8.0" +[dev-dependencies.bincode] +version = "1.3.0" + +[dev-dependencies.rand] +version = "0.7.3" + +[features] +deadlock_detection = ["parking_lot_core/deadlock_detection"] +default = [] +nightly = ["parking_lot_core/nightly", "lock_api/nightly"] +owning_ref = ["lock_api/owning_ref"] +send_guard = [] +serde = ["lock_api/serde"] +stdweb = ["instant/stdweb"] +wasm-bindgen = ["instant/wasm-bindgen"] diff --git a/third_party/rust/parking_lot/LICENSE-APACHE b/third_party/rust/parking_lot/LICENSE-APACHE new file mode 100644 index 0000000000..16fe87b06e --- /dev/null +++ b/third_party/rust/parking_lot/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/third_party/rust/parking_lot/LICENSE-MIT b/third_party/rust/parking_lot/LICENSE-MIT new file mode 100644 index 0000000000..40b8817a47 --- /dev/null +++ b/third_party/rust/parking_lot/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2016 The Rust Project Developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/parking_lot/README.md b/third_party/rust/parking_lot/README.md new file mode 100644 index 0000000000..d3e5b0b38e --- /dev/null +++ b/third_party/rust/parking_lot/README.md @@ -0,0 +1,151 @@ +parking_lot +============ + +![Rust](https://github.com/Amanieu/parking_lot/workflows/Rust/badge.svg) +[![Crates.io](https://img.shields.io/crates/v/parking_lot.svg)](https://crates.io/crates/parking_lot) + +[Documentation (synchronization primitives)](https://docs.rs/parking_lot/) + +[Documentation (core parking lot API)](https://docs.rs/parking_lot_core/) + +[Documentation (type-safe lock API)](https://docs.rs/lock_api/) + +This library provides implementations of `Mutex`, `RwLock`, `Condvar` and +`Once` that are smaller, faster and more flexible than those in the Rust +standard library, as well as a `ReentrantMutex` type which supports recursive +locking. It also exposes a low-level API for creating your own efficient +synchronization primitives. + +When tested on x86_64 Linux, `parking_lot::Mutex` was found to be 1.5x +faster than `std::sync::Mutex` when uncontended, and up to 5x faster when +contended from multiple threads. The numbers for `RwLock` vary depending on +the number of reader and writer threads, but are almost always faster than +the standard library `RwLock`, and even up to 50x faster in some cases. + +## Features + +The primitives provided by this library have several advantages over those +in the Rust standard library: + +1. `Mutex` and `Once` only require 1 byte of storage space, while `Condvar` + and `RwLock` only require 1 word of storage space. On the other hand the + standard library primitives require a dynamically allocated `Box` to hold + OS-specific synchronization primitives. The small size of `Mutex` in + particular encourages the use of fine-grained locks to increase + parallelism. +2. Since they consist of just a single atomic variable, have constant + initializers and don't need destructors, these primitives can be used as + `static` global variables. The standard library primitives require + dynamic initialization and thus need to be lazily initialized with + `lazy_static!`. +3. Uncontended lock acquisition and release is done through fast inline + paths which only require a single atomic operation. +4. Microcontention (a contended lock with a short critical section) is + efficiently handled by spinning a few times while trying to acquire a + lock. +5. The locks are adaptive and will suspend a thread after a few failed spin + attempts. This makes the locks suitable for both long and short critical + sections. +6. `Condvar`, `RwLock` and `Once` work on Windows XP, unlike the standard + library versions of those types. +7. `RwLock` takes advantage of hardware lock elision on processors that + support it, which can lead to huge performance wins with many readers. +8. `RwLock` uses a task-fair locking policy, which avoids reader and writer + starvation, whereas the standard library version makes no guarantees. +9. `Condvar` is guaranteed not to produce spurious wakeups. A thread will + only be woken up if it timed out or it was woken up by a notification. +10. `Condvar::notify_all` will only wake up a single thread and requeue the + rest to wait on the associated `Mutex`. This avoids a thundering herd + problem where all threads try to acquire the lock at the same time. +11. `RwLock` supports atomically downgrading a write lock into a read lock. +12. `Mutex` and `RwLock` allow raw unlocking without a RAII guard object. +13. `Mutex<()>` and `RwLock<()>` allow raw locking without a RAII guard + object. +14. `Mutex` and `RwLock` support [eventual fairness](https://trac.webkit.org/changeset/203350) + which allows them to be fair on average without sacrificing performance. +15. A `ReentrantMutex` type which supports recursive locking. +16. An *experimental* deadlock detector that works for `Mutex`, + `RwLock` and `ReentrantMutex`. This feature is disabled by default and + can be enabled via the `deadlock_detection` feature. +17. `RwLock` supports atomically upgrading an "upgradable" read lock into a + write lock. +18. Optional support for [serde](https://docs.serde.rs/serde/). Enable via the + feature `serde`. **NOTE!** this support is for `Mutex`, `ReentrantMutex`, + and `RwLock` only; `Condvar` and `Once` are not currently supported. +19. Lock guards can be sent to other threads when the `send_guard` feature is + enabled. + +## The parking lot + +To keep these primitives small, all thread queuing and suspending +functionality is offloaded to the *parking lot*. The idea behind this is +based on the Webkit [`WTF::ParkingLot`](https://webkit.org/blog/6161/locking-in-webkit/) +class, which essentially consists of a hash table mapping of lock addresses +to queues of parked (sleeping) threads. The Webkit parking lot was itself +inspired by Linux [futexes](http://man7.org/linux/man-pages/man2/futex.2.html), +but it is more powerful since it allows invoking callbacks while holding a queue +lock. + +## Nightly vs stable + +There are a few restrictions when using this library on stable Rust: + +- You will have to use the `const_*` functions (e.g. `const_mutex(val)`) to + statically initialize the locking primitives. Using e.g. `Mutex::new(val)` + does not work on stable Rust yet. +- `RwLock` will not be able to take advantage of hardware lock elision for + readers, which improves performance when there are multiple readers. +- The `wasm32-unknown-unknown` target is only supported on nightly and requires + `-C target-feature=+atomics` in `RUSTFLAGS`. + +To enable nightly-only functionality, you need to enable the `nightly` feature +in Cargo (see below). + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +parking_lot = "0.11" +``` + +To enable nightly-only features, add this to your `Cargo.toml` instead: + +```toml +[dependencies] +parking_lot = { version = "0.11", features = ["nightly"] } +``` + +The experimental deadlock detector can be enabled with the +`deadlock_detection` Cargo feature. + +To allow sending `MutexGuard`s and `RwLock*Guard`s to other threads, enable the +`send_guard` option. + +Note that the `deadlock_detection` and `send_guard` features are incompatible +and cannot be used together. + +The core parking lot API is provided by the `parking_lot_core` crate. It is +separate from the synchronization primitives in the `parking_lot` crate so that +changes to the core API do not cause breaking changes for users of `parking_lot`. + +## Minimum Rust version + +The current minimum required Rust version is 1.36. Any change to this is +considered a breaking change and will require a major version bump. + +## License + +Licensed under either of + + * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any +additional terms or conditions. diff --git a/third_party/rust/parking_lot/bors.toml b/third_party/rust/parking_lot/bors.toml new file mode 100644 index 0000000000..44056dbf4e --- /dev/null +++ b/third_party/rust/parking_lot/bors.toml @@ -0,0 +1,4 @@ +status = [ + "build_tier_one", + "build_other_platforms", +] diff --git a/third_party/rust/parking_lot/src/condvar.rs b/third_party/rust/parking_lot/src/condvar.rs new file mode 100644 index 0000000000..534b8aff8b --- /dev/null +++ b/third_party/rust/parking_lot/src/condvar.rs @@ -0,0 +1,1057 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::mutex::MutexGuard; +use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}; +use crate::{deadlock, util}; +use core::{ + fmt, ptr, + sync::atomic::{AtomicPtr, Ordering}, +}; +use instant::Instant; +use lock_api::RawMutex as RawMutex_; +use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}; +use std::time::Duration; + +/// A type indicating whether a timed wait on a condition variable returned +/// due to a time out or not. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct WaitTimeoutResult(bool); + +impl WaitTimeoutResult { + /// Returns whether the wait was known to have timed out. + #[inline] + pub fn timed_out(self) -> bool { + self.0 + } +} + +/// A Condition Variable +/// +/// Condition variables represent the ability to block a thread such that it +/// consumes no CPU time while waiting for an event to occur. Condition +/// variables are typically associated with a boolean predicate (a condition) +/// and a mutex. The predicate is always verified inside of the mutex before +/// determining that thread must block. +/// +/// Note that this module places one additional restriction over the system +/// condition variables: each condvar can be used with only one mutex at a +/// time. Any attempt to use multiple mutexes on the same condition variable +/// simultaneously will result in a runtime panic. However it is possible to +/// switch to a different mutex if there are no threads currently waiting on +/// the condition variable. +/// +/// # Differences from the standard library `Condvar` +/// +/// - No spurious wakeups: A wait will only return a non-timeout result if it +/// was woken up by `notify_one` or `notify_all`. +/// - `Condvar::notify_all` will only wake up a single thread, the rest are +/// requeued to wait for the `Mutex` to be unlocked by the thread that was +/// woken up. +/// - Only requires 1 word of space, whereas the standard library boxes the +/// `Condvar` due to platform limitations. +/// - Can be statically constructed (requires the `const_fn` nightly feature). +/// - Does not require any drop glue when dropped. +/// - Inline fast path for the uncontended case. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::{Mutex, Condvar}; +/// use std::sync::Arc; +/// use std::thread; +/// +/// let pair = Arc::new((Mutex::new(false), Condvar::new())); +/// let pair2 = pair.clone(); +/// +/// // Inside of our lock, spawn a new thread, and then wait for it to start +/// thread::spawn(move|| { +/// let &(ref lock, ref cvar) = &*pair2; +/// let mut started = lock.lock(); +/// *started = true; +/// cvar.notify_one(); +/// }); +/// +/// // wait for the thread to start up +/// let &(ref lock, ref cvar) = &*pair; +/// let mut started = lock.lock(); +/// if !*started { +/// cvar.wait(&mut started); +/// } +/// // Note that we used an if instead of a while loop above. This is only +/// // possible because parking_lot's Condvar will never spuriously wake up. +/// // This means that wait() will only return after notify_one or notify_all is +/// // called. +/// ``` +pub struct Condvar { + state: AtomicPtr<RawMutex>, +} + +impl Condvar { + /// Creates a new condition variable which is ready to be waited on and + /// notified. + #[inline] + pub const fn new() -> Condvar { + Condvar { + state: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Wakes up one blocked thread on this condvar. + /// + /// Returns whether a thread was woken up. + /// + /// If there is a blocked thread on this condition variable, then it will + /// be woken up from its call to `wait` or `wait_timeout`. Calls to + /// `notify_one` are not buffered in any way. + /// + /// To wake up all threads, see `notify_all()`. + /// + /// # Examples + /// + /// ``` + /// use parking_lot::Condvar; + /// + /// let condvar = Condvar::new(); + /// + /// // do something with condvar, share it with other threads + /// + /// if !condvar.notify_one() { + /// println!("Nobody was listening for this."); + /// } + /// ``` + #[inline] + pub fn notify_one(&self) -> bool { + // Nothing to do if there are no waiting threads + let state = self.state.load(Ordering::Relaxed); + if state.is_null() { + return false; + } + + self.notify_one_slow(state) + } + + #[cold] + fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool { + unsafe { + // Unpark one thread and requeue the rest onto the mutex + let from = self as *const _ as usize; + let to = mutex as usize; + let validate = || { + // Make sure that our atomic state still points to the same + // mutex. If not then it means that all threads on the current + // mutex were woken up and a new waiting thread switched to a + // different mutex. In that case we can get away with doing + // nothing. + if self.state.load(Ordering::Relaxed) != mutex { + return RequeueOp::Abort; + } + + // Unpark one thread if the mutex is unlocked, otherwise just + // requeue everything to the mutex. This is safe to do here + // since unlocking the mutex when the parked bit is set requires + // locking the queue. There is the possibility of a race if the + // mutex gets locked after we check, but that doesn't matter in + // this case. + if (*mutex).mark_parked_if_locked() { + RequeueOp::RequeueOne + } else { + RequeueOp::UnparkOne + } + }; + let callback = |_op, result: UnparkResult| { + // Clear our state if there are no more waiting threads + if !result.have_more_threads { + self.state.store(ptr::null_mut(), Ordering::Relaxed); + } + TOKEN_NORMAL + }; + let res = parking_lot_core::unpark_requeue(from, to, validate, callback); + + res.unparked_threads + res.requeued_threads != 0 + } + } + + /// Wakes up all blocked threads on this condvar. + /// + /// Returns the number of threads woken up. + /// + /// This method will ensure that any current waiters on the condition + /// variable are awoken. Calls to `notify_all()` are not buffered in any + /// way. + /// + /// To wake up only one thread, see `notify_one()`. + #[inline] + pub fn notify_all(&self) -> usize { + // Nothing to do if there are no waiting threads + let state = self.state.load(Ordering::Relaxed); + if state.is_null() { + return 0; + } + + self.notify_all_slow(state) + } + + #[cold] + fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize { + unsafe { + // Unpark one thread and requeue the rest onto the mutex + let from = self as *const _ as usize; + let to = mutex as usize; + let validate = || { + // Make sure that our atomic state still points to the same + // mutex. If not then it means that all threads on the current + // mutex were woken up and a new waiting thread switched to a + // different mutex. In that case we can get away with doing + // nothing. + if self.state.load(Ordering::Relaxed) != mutex { + return RequeueOp::Abort; + } + + // Clear our state since we are going to unpark or requeue all + // threads. + self.state.store(ptr::null_mut(), Ordering::Relaxed); + + // Unpark one thread if the mutex is unlocked, otherwise just + // requeue everything to the mutex. This is safe to do here + // since unlocking the mutex when the parked bit is set requires + // locking the queue. There is the possibility of a race if the + // mutex gets locked after we check, but that doesn't matter in + // this case. + if (*mutex).mark_parked_if_locked() { + RequeueOp::RequeueAll + } else { + RequeueOp::UnparkOneRequeueRest + } + }; + let callback = |op, result: UnparkResult| { + // If we requeued threads to the mutex, mark it as having + // parked threads. The RequeueAll case is already handled above. + if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 { + (*mutex).mark_parked(); + } + TOKEN_NORMAL + }; + let res = parking_lot_core::unpark_requeue(from, to, validate, callback); + + res.unparked_threads + res.requeued_threads + } + } + + /// Blocks the current thread until this condition variable receives a + /// notification. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `mutex_guard`) and block the current thread. This means that any calls + /// to `notify_*()` which happen logically after the mutex is unlocked are + /// candidates to wake this thread up. When this function call returns, the + /// lock specified will have been re-acquired. + /// + /// # Panics + /// + /// This function will panic if another thread is waiting on the `Condvar` + /// with a different `Mutex` object. + #[inline] + pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) { + self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None); + } + + /// Waits on this condition variable for a notification, timing out after + /// the specified time instant. + /// + /// The semantics of this function are equivalent to `wait()` except that + /// the thread will be blocked roughly until `timeout` is reached. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `timeout`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned `WaitTimeoutResult` value indicates if the timeout is + /// known to have elapsed. + /// + /// Like `wait`, the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// # Panics + /// + /// This function will panic if another thread is waiting on the `Condvar` + /// with a different `Mutex` object. + #[inline] + pub fn wait_until<T: ?Sized>( + &self, + mutex_guard: &mut MutexGuard<'_, T>, + timeout: Instant, + ) -> WaitTimeoutResult { + self.wait_until_internal( + unsafe { MutexGuard::mutex(mutex_guard).raw() }, + Some(timeout), + ) + } + + // This is a non-generic function to reduce the monomorphization cost of + // using `wait_until`. + fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult { + unsafe { + let result; + let mut bad_mutex = false; + let mut requeued = false; + { + let addr = self as *const _ as usize; + let lock_addr = mutex as *const _ as *mut _; + let validate = || { + // Ensure we don't use two different mutexes with the same + // Condvar at the same time. This is done while locked to + // avoid races with notify_one + let state = self.state.load(Ordering::Relaxed); + if state.is_null() { + self.state.store(lock_addr, Ordering::Relaxed); + } else if state != lock_addr { + bad_mutex = true; + return false; + } + true + }; + let before_sleep = || { + // Unlock the mutex before sleeping... + mutex.unlock(); + }; + let timed_out = |k, was_last_thread| { + // If we were requeued to a mutex, then we did not time out. + // We'll just park ourselves on the mutex again when we try + // to lock it later. + requeued = k != addr; + + // If we were the last thread on the queue then we need to + // clear our state. This is normally done by the + // notify_{one,all} functions when not timing out. + if !requeued && was_last_thread { + self.state.store(ptr::null_mut(), Ordering::Relaxed); + } + }; + result = parking_lot_core::park( + addr, + validate, + before_sleep, + timed_out, + DEFAULT_PARK_TOKEN, + timeout, + ); + } + + // Panic if we tried to use multiple mutexes with a Condvar. Note + // that at this point the MutexGuard is still locked. It will be + // unlocked by the unwinding logic. + if bad_mutex { + panic!("attempted to use a condition variable with more than one mutex"); + } + + // ... and re-lock it once we are done sleeping + if result == ParkResult::Unparked(TOKEN_HANDOFF) { + deadlock::acquire_resource(mutex as *const _ as usize); + } else { + mutex.lock(); + } + + WaitTimeoutResult(!(result.is_unparked() || requeued)) + } + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to `wait()` except that + /// the thread will be blocked for roughly no longer than `timeout`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `timeout`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned `WaitTimeoutResult` value indicates if the timeout is + /// known to have elapsed. + /// + /// Like `wait`, the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// # Panics + /// + /// Panics if the given `timeout` is so large that it can't be added to the current time. + /// This panic is not possible if the crate is built with the `nightly` feature, then a too + /// large `timeout` becomes equivalent to just calling `wait`. + #[inline] + pub fn wait_for<T: ?Sized>( + &self, + mutex_guard: &mut MutexGuard<'_, T>, + timeout: Duration, + ) -> WaitTimeoutResult { + let deadline = util::to_deadline(timeout); + self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline) + } +} + +impl Default for Condvar { + #[inline] + fn default() -> Condvar { + Condvar::new() + } +} + +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Condvar { .. }") + } +} + +#[cfg(test)] +mod tests { + use crate::{Condvar, Mutex, MutexGuard}; + use instant::Instant; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + #[test] + fn smoke() { + let c = Condvar::new(); + c.notify_one(); + c.notify_all(); + } + + #[test] + fn notify_one() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + c.wait(&mut g); + } + + #[test] + fn notify_all() { + const N: usize = 10; + + let data = Arc::new((Mutex::new(0), Condvar::new())); + let (tx, rx) = channel(); + for _ in 0..N { + let data = data.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock(); + *cnt += 1; + if *cnt == N { + tx.send(()).unwrap(); + } + while *cnt != 0 { + cond.wait(&mut cnt); + } + tx.send(()).unwrap(); + }); + } + drop(tx); + + let &(ref lock, ref cond) = &*data; + rx.recv().unwrap(); + let mut cnt = lock.lock(); + *cnt = 0; + cond.notify_all(); + drop(cnt); + + for _ in 0..N { + rx.recv().unwrap(); + } + } + + #[test] + fn notify_one_return_true() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let _t = thread::spawn(move || { + let _g = m2.lock(); + assert!(c2.notify_one()); + }); + c.wait(&mut g); + } + + #[test] + fn notify_one_return_false() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let _t = thread::spawn(move || { + let _g = m.lock(); + assert!(!c.notify_one()); + }); + } + + #[test] + fn notify_all_return() { + const N: usize = 10; + + let data = Arc::new((Mutex::new(0), Condvar::new())); + let (tx, rx) = channel(); + for _ in 0..N { + let data = data.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock(); + *cnt += 1; + if *cnt == N { + tx.send(()).unwrap(); + } + while *cnt != 0 { + cond.wait(&mut cnt); + } + tx.send(()).unwrap(); + }); + } + drop(tx); + + let &(ref lock, ref cond) = &*data; + rx.recv().unwrap(); + let mut cnt = lock.lock(); + *cnt = 0; + assert_eq!(cond.notify_all(), N); + drop(cnt); + + for _ in 0..N { + rx.recv().unwrap(); + } + + assert_eq!(cond.notify_all(), 0); + } + + #[test] + fn wait_for() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let no_timeout = c.wait_for(&mut g, Duration::from_millis(1)); + assert!(no_timeout.timed_out()); + + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait. + let very_long_timeout = if cfg!(feature = "nightly") { + Duration::from_secs(u64::max_value()) + } else { + Duration::from_millis(u32::max_value() as u64) + }; + + let timeout_res = c.wait_for(&mut g, very_long_timeout); + assert!(!timeout_res.timed_out()); + + drop(g); + } + + #[test] + fn wait_until() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1)); + assert!(no_timeout.timed_out()); + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + let timeout_res = c.wait_until( + &mut g, + Instant::now() + Duration::from_millis(u32::max_value() as u64), + ); + assert!(!timeout_res.timed_out()); + drop(g); + } + + #[test] + #[should_panic] + fn two_mutexes() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let m3 = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + // Make sure we don't leave the child thread dangling + struct PanicGuard<'a>(&'a Condvar); + impl<'a> Drop for PanicGuard<'a> { + fn drop(&mut self) { + self.0.notify_one(); + } + } + + let (tx, rx) = channel(); + let g = m.lock(); + let _t = thread::spawn(move || { + let mut g = m2.lock(); + tx.send(()).unwrap(); + c2.wait(&mut g); + }); + drop(g); + rx.recv().unwrap(); + let _g = m.lock(); + let _guard = PanicGuard(&*c); + c.wait(&mut m3.lock()); + } + + #[test] + fn two_mutexes_disjoint() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let m3 = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + c.wait(&mut g); + drop(g); + + let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1)); + } + + #[test] + fn test_debug_condvar() { + let c = Condvar::new(); + assert_eq!(format!("{:?}", c), "Condvar { .. }"); + } + + #[test] + fn test_condvar_requeue() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + let t = thread::spawn(move || { + let mut g = m2.lock(); + c2.wait(&mut g); + }); + + let mut g = m.lock(); + while !c.notify_one() { + // Wait for the thread to get into wait() + MutexGuard::bump(&mut g); + // Yield, so the other thread gets a chance to do something. + // (At least Miri needs this, because it doesn't preempt threads.) + thread::yield_now(); + } + // The thread should have been requeued to the mutex, which we wake up now. + drop(g); + t.join().unwrap(); + } + + #[test] + fn test_issue_129() { + let locks = Arc::new((Mutex::new(()), Condvar::new())); + + let (tx, rx) = channel(); + for _ in 0..4 { + let locks = locks.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let mut guard = locks.0.lock(); + locks.1.wait(&mut guard); + locks.1.wait_for(&mut guard, Duration::from_millis(1)); + locks.1.notify_one(); + tx.send(()).unwrap(); + }); + } + + thread::sleep(Duration::from_millis(100)); + locks.1.notify_one(); + + for _ in 0..4 { + assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(())); + } + } +} + +/// This module contains an integration test that is heavily inspired from WebKit's own integration +/// tests for it's own Condvar. +#[cfg(test)] +mod webkit_queue_test { + use crate::{Condvar, Mutex, MutexGuard}; + use std::{collections::VecDeque, sync::Arc, thread, time::Duration}; + + #[derive(Clone, Copy)] + enum Timeout { + Bounded(Duration), + Forever, + } + + #[derive(Clone, Copy)] + enum NotifyStyle { + One, + All, + } + + struct Queue { + items: VecDeque<usize>, + should_continue: bool, + } + + impl Queue { + fn new() -> Self { + Self { + items: VecDeque::new(), + should_continue: true, + } + } + } + + fn wait<T: ?Sized>( + condition: &Condvar, + lock: &mut MutexGuard<'_, T>, + predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, + timeout: &Timeout, + ) { + while !predicate(lock) { + match timeout { + Timeout::Forever => condition.wait(lock), + Timeout::Bounded(bound) => { + condition.wait_for(lock, *bound); + } + } + } + } + + fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) { + match style { + NotifyStyle::One => { + condition.notify_one(); + } + NotifyStyle::All => { + if should_notify { + condition.notify_all(); + } + } + } + } + + fn run_queue_test( + num_producers: usize, + num_consumers: usize, + max_queue_size: usize, + messages_per_producer: usize, + notify_style: NotifyStyle, + timeout: Timeout, + delay: Duration, + ) { + let input_queue = Arc::new(Mutex::new(Queue::new())); + let empty_condition = Arc::new(Condvar::new()); + let full_condition = Arc::new(Condvar::new()); + + let output_vec = Arc::new(Mutex::new(vec![])); + + let consumers = (0..num_consumers) + .map(|_| { + consumer_thread( + input_queue.clone(), + empty_condition.clone(), + full_condition.clone(), + timeout, + notify_style, + output_vec.clone(), + max_queue_size, + ) + }) + .collect::<Vec<_>>(); + let producers = (0..num_producers) + .map(|_| { + producer_thread( + messages_per_producer, + input_queue.clone(), + empty_condition.clone(), + full_condition.clone(), + timeout, + notify_style, + max_queue_size, + ) + }) + .collect::<Vec<_>>(); + + thread::sleep(delay); + + for producer in producers.into_iter() { + producer.join().expect("Producer thread panicked"); + } + + { + let mut input_queue = input_queue.lock(); + input_queue.should_continue = false; + } + empty_condition.notify_all(); + + for consumer in consumers.into_iter() { + consumer.join().expect("Consumer thread panicked"); + } + + let mut output_vec = output_vec.lock(); + assert_eq!(output_vec.len(), num_producers * messages_per_producer); + output_vec.sort(); + for msg_idx in 0..messages_per_producer { + for producer_idx in 0..num_producers { + assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]); + } + } + } + + fn consumer_thread( + input_queue: Arc<Mutex<Queue>>, + empty_condition: Arc<Condvar>, + full_condition: Arc<Condvar>, + timeout: Timeout, + notify_style: NotifyStyle, + output_queue: Arc<Mutex<Vec<usize>>>, + max_queue_size: usize, + ) -> thread::JoinHandle<()> { + thread::spawn(move || loop { + let (should_notify, result) = { + let mut queue = input_queue.lock(); + wait( + &*empty_condition, + &mut queue, + |state| -> bool { !state.items.is_empty() || !state.should_continue }, + &timeout, + ); + if queue.items.is_empty() && !queue.should_continue { + return; + } + let should_notify = queue.items.len() == max_queue_size; + let result = queue.items.pop_front(); + std::mem::drop(queue); + (should_notify, result) + }; + notify(notify_style, &*full_condition, should_notify); + + if let Some(result) = result { + output_queue.lock().push(result); + } + }) + } + + fn producer_thread( + num_messages: usize, + queue: Arc<Mutex<Queue>>, + empty_condition: Arc<Condvar>, + full_condition: Arc<Condvar>, + timeout: Timeout, + notify_style: NotifyStyle, + max_queue_size: usize, + ) -> thread::JoinHandle<()> { + thread::spawn(move || { + for message in 0..num_messages { + let should_notify = { + let mut queue = queue.lock(); + wait( + &*full_condition, + &mut queue, + |state| state.items.len() < max_queue_size, + &timeout, + ); + let should_notify = queue.items.is_empty(); + queue.items.push_back(message); + std::mem::drop(queue); + should_notify + }; + notify(notify_style, &*empty_condition, should_notify); + } + }) + } + + macro_rules! run_queue_tests { + ( $( $name:ident( + num_producers: $num_producers:expr, + num_consumers: $num_consumers:expr, + max_queue_size: $max_queue_size:expr, + messages_per_producer: $messages_per_producer:expr, + notification_style: $notification_style:expr, + timeout: $timeout:expr, + delay_seconds: $delay_seconds:expr); + )* ) => { + $(#[test] + fn $name() { + let delay = Duration::from_secs($delay_seconds); + run_queue_test( + $num_producers, + $num_consumers, + $max_queue_size, + $messages_per_producer, + $notification_style, + $timeout, + delay, + ); + })* + }; + } + + run_queue_tests! { + sanity_check_queue( + num_producers: 1, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Bounded(Duration::from_secs(1)), + delay_seconds: 0 + ); + sanity_check_queue_timeout( + num_producers: 1, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + new_test_without_timeout_5( + num_producers: 1, + num_consumers: 5, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_one_consumer_one_slot( + num_producers: 1, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_one_consumer_one_slot_timeout( + num_producers: 1, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 1 + ); + one_producer_one_consumer_hundred_slots( + num_producers: 1, + num_consumers: 1, + max_queue_size: 100, + messages_per_producer: 1_000_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_one_consumer_one_slot( + num_producers: 10, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 10000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_one_consumer_hundred_slots_notify_all( + num_producers: 10, + num_consumers: 1, + max_queue_size: 100, + messages_per_producer: 10000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_one_consumer_hundred_slots_notify_one( + num_producers: 10, + num_consumers: 1, + max_queue_size: 100, + messages_per_producer: 10000, + notification_style: NotifyStyle::One, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_ten_consumers_one_slot( + num_producers: 1, + num_consumers: 10, + max_queue_size: 1, + messages_per_producer: 10000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_ten_consumers_hundred_slots_notify_all( + num_producers: 1, + num_consumers: 10, + max_queue_size: 100, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_ten_consumers_hundred_slots_notify_one( + num_producers: 1, + num_consumers: 10, + max_queue_size: 100, + messages_per_producer: 100_000, + notification_style: NotifyStyle::One, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_ten_consumers_one_slot( + num_producers: 10, + num_consumers: 10, + max_queue_size: 1, + messages_per_producer: 50000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_ten_consumers_hundred_slots_notify_all( + num_producers: 10, + num_consumers: 10, + max_queue_size: 100, + messages_per_producer: 50000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_ten_consumers_hundred_slots_notify_one( + num_producers: 10, + num_consumers: 10, + max_queue_size: 100, + messages_per_producer: 50000, + notification_style: NotifyStyle::One, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + } +} diff --git a/third_party/rust/parking_lot/src/deadlock.rs b/third_party/rust/parking_lot/src/deadlock.rs new file mode 100644 index 0000000000..0fab7228c9 --- /dev/null +++ b/third_party/rust/parking_lot/src/deadlock.rs @@ -0,0 +1,232 @@ +//! \[Experimental\] Deadlock detection +//! +//! This feature is optional and can be enabled via the `deadlock_detection` feature flag. +//! +//! # Example +//! +//! ``` +//! #[cfg(feature = "deadlock_detection")] +//! { // only for #[cfg] +//! use std::thread; +//! use std::time::Duration; +//! use parking_lot::deadlock; +//! +//! // Create a background thread which checks for deadlocks every 10s +//! thread::spawn(move || { +//! loop { +//! thread::sleep(Duration::from_secs(10)); +//! let deadlocks = deadlock::check_deadlock(); +//! if deadlocks.is_empty() { +//! continue; +//! } +//! +//! println!("{} deadlocks detected", deadlocks.len()); +//! for (i, threads) in deadlocks.iter().enumerate() { +//! println!("Deadlock #{}", i); +//! for t in threads { +//! println!("Thread Id {:#?}", t.thread_id()); +//! println!("{:#?}", t.backtrace()); +//! } +//! } +//! } +//! }); +//! } // only for #[cfg] +//! ``` + +#[cfg(feature = "deadlock_detection")] +pub use parking_lot_core::deadlock::check_deadlock; +pub(crate) use parking_lot_core::deadlock::{acquire_resource, release_resource}; + +#[cfg(test)] +#[cfg(feature = "deadlock_detection")] +mod tests { + use crate::{Mutex, ReentrantMutex, RwLock}; + use std::sync::{Arc, Barrier}; + use std::thread::{self, sleep}; + use std::time::Duration; + + // We need to serialize these tests since deadlock detection uses global state + static DEADLOCK_DETECTION_LOCK: Mutex<()> = crate::const_mutex(()); + + fn check_deadlock() -> bool { + use parking_lot_core::deadlock::check_deadlock; + !check_deadlock().is_empty() + } + + #[test] + fn test_mutex_deadlock() { + let _guard = DEADLOCK_DETECTION_LOCK.lock(); + + let m1: Arc<Mutex<()>> = Default::default(); + let m2: Arc<Mutex<()>> = Default::default(); + let m3: Arc<Mutex<()>> = Default::default(); + let b = Arc::new(Barrier::new(4)); + + let m1_ = m1.clone(); + let m2_ = m2.clone(); + let m3_ = m3.clone(); + let b1 = b.clone(); + let b2 = b.clone(); + let b3 = b.clone(); + + assert!(!check_deadlock()); + + let _t1 = thread::spawn(move || { + let _g = m1.lock(); + b1.wait(); + let _ = m2_.lock(); + }); + + let _t2 = thread::spawn(move || { + let _g = m2.lock(); + b2.wait(); + let _ = m3_.lock(); + }); + + let _t3 = thread::spawn(move || { + let _g = m3.lock(); + b3.wait(); + let _ = m1_.lock(); + }); + + assert!(!check_deadlock()); + + b.wait(); + sleep(Duration::from_millis(50)); + assert!(check_deadlock()); + + assert!(!check_deadlock()); + } + + #[test] + fn test_mutex_deadlock_reentrant() { + let _guard = DEADLOCK_DETECTION_LOCK.lock(); + + let m1: Arc<Mutex<()>> = Default::default(); + + assert!(!check_deadlock()); + + let _t1 = thread::spawn(move || { + let _g = m1.lock(); + let _ = m1.lock(); + }); + + sleep(Duration::from_millis(50)); + assert!(check_deadlock()); + + assert!(!check_deadlock()); + } + + #[test] + fn test_remutex_deadlock() { + let _guard = DEADLOCK_DETECTION_LOCK.lock(); + + let m1: Arc<ReentrantMutex<()>> = Default::default(); + let m2: Arc<ReentrantMutex<()>> = Default::default(); + let m3: Arc<ReentrantMutex<()>> = Default::default(); + let b = Arc::new(Barrier::new(4)); + + let m1_ = m1.clone(); + let m2_ = m2.clone(); + let m3_ = m3.clone(); + let b1 = b.clone(); + let b2 = b.clone(); + let b3 = b.clone(); + + assert!(!check_deadlock()); + + let _t1 = thread::spawn(move || { + let _g = m1.lock(); + let _g = m1.lock(); + b1.wait(); + let _ = m2_.lock(); + }); + + let _t2 = thread::spawn(move || { + let _g = m2.lock(); + let _g = m2.lock(); + b2.wait(); + let _ = m3_.lock(); + }); + + let _t3 = thread::spawn(move || { + let _g = m3.lock(); + let _g = m3.lock(); + b3.wait(); + let _ = m1_.lock(); + }); + + assert!(!check_deadlock()); + + b.wait(); + sleep(Duration::from_millis(50)); + assert!(check_deadlock()); + + assert!(!check_deadlock()); + } + + #[test] + fn test_rwlock_deadlock() { + let _guard = DEADLOCK_DETECTION_LOCK.lock(); + + let m1: Arc<RwLock<()>> = Default::default(); + let m2: Arc<RwLock<()>> = Default::default(); + let m3: Arc<RwLock<()>> = Default::default(); + let b = Arc::new(Barrier::new(4)); + + let m1_ = m1.clone(); + let m2_ = m2.clone(); + let m3_ = m3.clone(); + let b1 = b.clone(); + let b2 = b.clone(); + let b3 = b.clone(); + + assert!(!check_deadlock()); + + let _t1 = thread::spawn(move || { + let _g = m1.read(); + b1.wait(); + let _g = m2_.write(); + }); + + let _t2 = thread::spawn(move || { + let _g = m2.read(); + b2.wait(); + let _g = m3_.write(); + }); + + let _t3 = thread::spawn(move || { + let _g = m3.read(); + b3.wait(); + let _ = m1_.write(); + }); + + assert!(!check_deadlock()); + + b.wait(); + sleep(Duration::from_millis(50)); + assert!(check_deadlock()); + + assert!(!check_deadlock()); + } + + #[cfg(rwlock_deadlock_detection_not_supported)] + #[test] + fn test_rwlock_deadlock_reentrant() { + let _guard = DEADLOCK_DETECTION_LOCK.lock(); + + let m1: Arc<RwLock<()>> = Default::default(); + + assert!(!check_deadlock()); + + let _t1 = thread::spawn(move || { + let _g = m1.read(); + let _ = m1.write(); + }); + + sleep(Duration::from_millis(50)); + assert!(check_deadlock()); + + assert!(!check_deadlock()); + } +} diff --git a/third_party/rust/parking_lot/src/elision.rs b/third_party/rust/parking_lot/src/elision.rs new file mode 100644 index 0000000000..68cfa63c3e --- /dev/null +++ b/third_party/rust/parking_lot/src/elision.rs @@ -0,0 +1,116 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::sync::atomic::AtomicUsize; + +// Extension trait to add lock elision primitives to atomic types +pub trait AtomicElisionExt { + type IntType; + + // Perform a compare_exchange and start a transaction + fn elision_compare_exchange_acquire( + &self, + current: Self::IntType, + new: Self::IntType, + ) -> Result<Self::IntType, Self::IntType>; + + // Perform a fetch_sub and end a transaction + fn elision_fetch_sub_release(&self, val: Self::IntType) -> Self::IntType; +} + +// Indicates whether the target architecture supports lock elision +#[inline] +pub fn have_elision() -> bool { + cfg!(all( + feature = "nightly", + any(target_arch = "x86", target_arch = "x86_64"), + )) +} + +// This implementation is never actually called because it is guarded by +// have_elision(). +#[cfg(not(all(feature = "nightly", any(target_arch = "x86", target_arch = "x86_64"))))] +impl AtomicElisionExt for AtomicUsize { + type IntType = usize; + + #[inline] + fn elision_compare_exchange_acquire(&self, _: usize, _: usize) -> Result<usize, usize> { + unreachable!(); + } + + #[inline] + fn elision_fetch_sub_release(&self, _: usize) -> usize { + unreachable!(); + } +} + +#[cfg(all(feature = "nightly", any(target_arch = "x86", target_arch = "x86_64")))] +impl AtomicElisionExt for AtomicUsize { + type IntType = usize; + + #[cfg(target_pointer_width = "32")] + #[inline] + fn elision_compare_exchange_acquire(&self, current: usize, new: usize) -> Result<usize, usize> { + unsafe { + let prev: usize; + llvm_asm!("xacquire; lock; cmpxchgl $2, $1" + : "={eax}" (prev), "+*m" (self) + : "r" (new), "{eax}" (current) + : "memory" + : "volatile"); + if prev == current { + Ok(prev) + } else { + Err(prev) + } + } + } + #[cfg(target_pointer_width = "64")] + #[inline] + fn elision_compare_exchange_acquire(&self, current: usize, new: usize) -> Result<usize, usize> { + unsafe { + let prev: usize; + llvm_asm!("xacquire; lock; cmpxchgq $2, $1" + : "={rax}" (prev), "+*m" (self) + : "r" (new), "{rax}" (current) + : "memory" + : "volatile"); + if prev == current { + Ok(prev) + } else { + Err(prev) + } + } + } + + #[cfg(target_pointer_width = "32")] + #[inline] + fn elision_fetch_sub_release(&self, val: usize) -> usize { + unsafe { + let prev: usize; + llvm_asm!("xrelease; lock; xaddl $2, $1" + : "=r" (prev), "+*m" (self) + : "0" (val.wrapping_neg()) + : "memory" + : "volatile"); + prev + } + } + #[cfg(target_pointer_width = "64")] + #[inline] + fn elision_fetch_sub_release(&self, val: usize) -> usize { + unsafe { + let prev: usize; + llvm_asm!("xrelease; lock; xaddq $2, $1" + : "=r" (prev), "+*m" (self) + : "0" (val.wrapping_neg()) + : "memory" + : "volatile"); + prev + } + } +} diff --git a/third_party/rust/parking_lot/src/fair_mutex.rs b/third_party/rust/parking_lot/src/fair_mutex.rs new file mode 100644 index 0000000000..449c53b051 --- /dev/null +++ b/third_party/rust/parking_lot/src/fair_mutex.rs @@ -0,0 +1,278 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::raw_fair_mutex::RawFairMutex; +use lock_api; + +/// A mutual exclusive primitive that is always fair, useful for protecting shared data +/// +/// This mutex will block threads waiting for the lock to become available. The +/// mutex can also be statically initialized or created via a `new` +/// constructor. Each mutex has a type parameter which represents the data that +/// it is protecting. The data can only be accessed through the RAII guards +/// returned from `lock` and `try_lock`, which guarantees that the data is only +/// ever accessed when the mutex is locked. +/// +/// The regular mutex provided by `parking_lot` uses eventual locking fairness +/// (after some time it will default to the fair algorithm), but eventual +/// fairness does not provide the same garantees a always fair method would. +/// Fair mutexes are generally slower, but sometimes needed. This wrapper was +/// created to avoid using a unfair protocol when it's forbidden by mistake. +/// +/// In a fair mutex the lock is provided to whichever thread asked first, +/// they form a queue and always follow the first-in first-out order. This +/// means some thread in the queue won't be able to steal the lock and use it fast +/// to increase throughput, at the cost of latency. Since the response time will grow +/// for some threads that are waiting for the lock and losing to faster but later ones, +/// but it may make sending more responses possible. +/// +/// A fair mutex may not be interesting if threads have different priorities (this is known as +/// priority inversion). +/// +/// # Differences from the standard library `Mutex` +/// +/// - No poisoning, the lock is released normally on panic. +/// - Only requires 1 byte of space, whereas the standard library boxes the +/// `FairMutex` due to platform limitations. +/// - Can be statically constructed (requires the `const_fn` nightly feature). +/// - Does not require any drop glue when dropped. +/// - Inline fast path for the uncontended case. +/// - Efficient handling of micro-contention using adaptive spinning. +/// - Allows raw locking & unlocking without a guard. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::FairMutex; +/// use std::sync::{Arc, mpsc::channel}; +/// use std::thread; +/// +/// const N: usize = 10; +/// +/// // Spawn a few threads to increment a shared variable (non-atomically), and +/// // let the main thread know once all increments are done. +/// // +/// // Here we're using an Arc to share memory among threads, and the data inside +/// // the Arc is protected with a mutex. +/// let data = Arc::new(FairMutex::new(0)); +/// +/// let (tx, rx) = channel(); +/// for _ in 0..10 { +/// let (data, tx) = (Arc::clone(&data), tx.clone()); +/// thread::spawn(move || { +/// // The shared state can only be accessed once the lock is held. +/// // Our non-atomic increment is safe because we're the only thread +/// // which can access the shared state when the lock is held. +/// let mut data = data.lock(); +/// *data += 1; +/// if *data == N { +/// tx.send(()).unwrap(); +/// } +/// // the lock is unlocked here when `data` goes out of scope. +/// }); +/// } +/// +/// rx.recv().unwrap(); +/// ``` +pub type FairMutex<T> = lock_api::Mutex<RawFairMutex, T>; + +/// Creates a new fair mutex in an unlocked state ready for use. +/// +/// This allows creating a fair mutex in a constant context on stable Rust. +pub const fn const_fair_mutex<T>(val: T) -> FairMutex<T> { + FairMutex::const_new(<RawFairMutex as lock_api::RawMutex>::INIT, val) +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be accessed through this guard via its +/// `Deref` and `DerefMut` implementations. +pub type FairMutexGuard<'a, T> = lock_api::MutexGuard<'a, RawFairMutex, T>; + +/// An RAII mutex guard returned by `FairMutexGuard::map`, which can point to a +/// subfield of the protected data. +/// +/// The main difference between `MappedFairMutexGuard` and `FairMutexGuard` is that the +/// former doesn't support temporarily unlocking and re-locking, since that +/// could introduce soundness issues if the locked object is modified by another +/// thread. +pub type MappedFairMutexGuard<'a, T> = lock_api::MappedMutexGuard<'a, RawFairMutex, T>; + +#[cfg(test)] +mod tests { + use crate::FairMutex; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + + #[cfg(feature = "serde")] + use bincode::{deserialize, serialize}; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let m = FairMutex::new(()); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn lots_and_lots() { + const J: u32 = 1000; + const K: u32 = 3; + + let m = Arc::new(FairMutex::new(0)); + + fn inc(m: &FairMutex<u32>) { + for _ in 0..J { + *m.lock() += 1; + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(*m.lock(), J * K * 2); + } + + #[test] + fn try_lock() { + let m = FairMutex::new(()); + *m.try_lock().unwrap() = (); + } + + #[test] + fn test_into_inner() { + let m = FairMutex::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = FairMutex::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_get_mut() { + let mut m = FairMutex::new(NonCopy(10)); + *m.get_mut() = NonCopy(20); + assert_eq!(m.into_inner(), NonCopy(20)); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(FairMutex::new(1)); + let arc2 = Arc::new(FairMutex::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock(); + let lock2 = lock.lock(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(FairMutex::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || { + struct Unwinder { + i: Arc<FairMutex<i32>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &FairMutex<[i32]> = &FairMutex::new([1, 2, 3]); + { + let b = &mut *mutex.lock(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock(), comp); + } + + #[test] + fn test_mutexguard_sync() { + fn sync<T: Sync>(_: T) {} + + let mutex = FairMutex::new(()); + sync(mutex.lock()); + } + + #[test] + fn test_mutex_debug() { + let mutex = FairMutex::new(vec![0u8, 10]); + + assert_eq!(format!("{:?}", mutex), "Mutex { data: [0, 10] }"); + let _lock = mutex.lock(); + assert_eq!(format!("{:?}", mutex), "Mutex { data: <locked> }"); + } + + #[cfg(feature = "serde")] + #[test] + fn test_serde() { + let contents: Vec<u8> = vec![0, 1, 2]; + let mutex = FairMutex::new(contents.clone()); + + let serialized = serialize(&mutex).unwrap(); + let deserialized: FairMutex<Vec<u8>> = deserialize(&serialized).unwrap(); + + assert_eq!(*(mutex.lock()), *(deserialized.lock())); + assert_eq!(contents, *(deserialized.lock())); + } +} diff --git a/third_party/rust/parking_lot/src/lib.rs b/third_party/rust/parking_lot/src/lib.rs new file mode 100644 index 0000000000..7ff2c79d26 --- /dev/null +++ b/third_party/rust/parking_lot/src/lib.rs @@ -0,0 +1,57 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! This library provides implementations of `Mutex`, `RwLock`, `Condvar` and +//! `Once` that are smaller, faster and more flexible than those in the Rust +//! standard library. It also provides a `ReentrantMutex` type. + +#![warn(missing_docs)] +#![warn(rust_2018_idioms)] +#![cfg_attr(feature = "nightly", feature(llvm_asm))] + +mod condvar; +mod elision; +mod fair_mutex; +mod mutex; +mod once; +mod raw_fair_mutex; +mod raw_mutex; +mod raw_rwlock; +mod remutex; +mod rwlock; +mod util; + +#[cfg(feature = "deadlock_detection")] +pub mod deadlock; +#[cfg(not(feature = "deadlock_detection"))] +mod deadlock; + +// If deadlock detection is enabled, we cannot allow lock guards to be sent to +// other threads. +#[cfg(all(feature = "send_guard", feature = "deadlock_detection"))] +compile_error!("the `send_guard` and `deadlock_detection` features cannot be used together"); +#[cfg(feature = "send_guard")] +type GuardMarker = lock_api::GuardSend; +#[cfg(not(feature = "send_guard"))] +type GuardMarker = lock_api::GuardNoSend; + +pub use self::condvar::{Condvar, WaitTimeoutResult}; +pub use self::fair_mutex::{const_fair_mutex, FairMutex, FairMutexGuard, MappedFairMutexGuard}; +pub use self::mutex::{const_mutex, MappedMutexGuard, Mutex, MutexGuard}; +pub use self::once::{Once, OnceState}; +pub use self::raw_fair_mutex::RawFairMutex; +pub use self::raw_mutex::RawMutex; +pub use self::raw_rwlock::RawRwLock; +pub use self::remutex::{ + const_reentrant_mutex, MappedReentrantMutexGuard, RawThreadId, ReentrantMutex, + ReentrantMutexGuard, +}; +pub use self::rwlock::{ + const_rwlock, MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, + RwLockUpgradableReadGuard, RwLockWriteGuard, +}; +pub use ::lock_api; diff --git a/third_party/rust/parking_lot/src/mutex.rs b/third_party/rust/parking_lot/src/mutex.rs new file mode 100644 index 0000000000..9f63cb9434 --- /dev/null +++ b/third_party/rust/parking_lot/src/mutex.rs @@ -0,0 +1,312 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::raw_mutex::RawMutex; +use lock_api; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex will block threads waiting for the lock to become available. The +/// mutex can also be statically initialized or created via a `new` +/// constructor. Each mutex has a type parameter which represents the data that +/// it is protecting. The data can only be accessed through the RAII guards +/// returned from `lock` and `try_lock`, which guarantees that the data is only +/// ever accessed when the mutex is locked. +/// +/// # Fairness +/// +/// A typical unfair lock can often end up in a situation where a single thread +/// quickly acquires and releases the same mutex in succession, which can starve +/// other threads waiting to acquire the mutex. While this improves throughput +/// because it doesn't force a context switch when a thread tries to re-acquire +/// a mutex it has just released, this can starve other threads. +/// +/// This mutex uses [eventual fairness](https://trac.webkit.org/changeset/203350) +/// to ensure that the lock will be fair on average without sacrificing +/// throughput. This is done by forcing a fair unlock on average every 0.5ms, +/// which will force the lock to go to the next thread waiting for the mutex. +/// +/// Additionally, any critical section longer than 1ms will always use a fair +/// unlock, which has a negligible impact on throughput considering the length +/// of the critical section. +/// +/// You can also force a fair unlock by calling `MutexGuard::unlock_fair` when +/// unlocking a mutex instead of simply dropping the `MutexGuard`. +/// +/// # Differences from the standard library `Mutex` +/// +/// - No poisoning, the lock is released normally on panic. +/// - Only requires 1 byte of space, whereas the standard library boxes the +/// `Mutex` due to platform limitations. +/// - Can be statically constructed (requires the `const_fn` nightly feature). +/// - Does not require any drop glue when dropped. +/// - Inline fast path for the uncontended case. +/// - Efficient handling of micro-contention using adaptive spinning. +/// - Allows raw locking & unlocking without a guard. +/// - Supports eventual fairness so that the mutex is fair on average. +/// - Optionally allows making the mutex fair by calling `MutexGuard::unlock_fair`. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::Mutex; +/// use std::sync::{Arc, mpsc::channel}; +/// use std::thread; +/// +/// const N: usize = 10; +/// +/// // Spawn a few threads to increment a shared variable (non-atomically), and +/// // let the main thread know once all increments are done. +/// // +/// // Here we're using an Arc to share memory among threads, and the data inside +/// // the Arc is protected with a mutex. +/// let data = Arc::new(Mutex::new(0)); +/// +/// let (tx, rx) = channel(); +/// for _ in 0..10 { +/// let (data, tx) = (Arc::clone(&data), tx.clone()); +/// thread::spawn(move || { +/// // The shared state can only be accessed once the lock is held. +/// // Our non-atomic increment is safe because we're the only thread +/// // which can access the shared state when the lock is held. +/// let mut data = data.lock(); +/// *data += 1; +/// if *data == N { +/// tx.send(()).unwrap(); +/// } +/// // the lock is unlocked here when `data` goes out of scope. +/// }); +/// } +/// +/// rx.recv().unwrap(); +/// ``` +pub type Mutex<T> = lock_api::Mutex<RawMutex, T>; + +/// Creates a new mutex in an unlocked state ready for use. +/// +/// This allows creating a mutex in a constant context on stable Rust. +pub const fn const_mutex<T>(val: T) -> Mutex<T> { + Mutex::const_new(<RawMutex as lock_api::RawMutex>::INIT, val) +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be accessed through this guard via its +/// `Deref` and `DerefMut` implementations. +pub type MutexGuard<'a, T> = lock_api::MutexGuard<'a, RawMutex, T>; + +/// An RAII mutex guard returned by `MutexGuard::map`, which can point to a +/// subfield of the protected data. +/// +/// The main difference between `MappedMutexGuard` and `MutexGuard` is that the +/// former doesn't support temporarily unlocking and re-locking, since that +/// could introduce soundness issues if the locked object is modified by another +/// thread. +pub type MappedMutexGuard<'a, T> = lock_api::MappedMutexGuard<'a, RawMutex, T>; + +#[cfg(test)] +mod tests { + use crate::{Condvar, Mutex}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + + #[cfg(feature = "serde")] + use bincode::{deserialize, serialize}; + + struct Packet<T>(Arc<(Mutex<T>, Condvar)>); + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + unsafe impl<T: Send> Send for Packet<T> {} + unsafe impl<T> Sync for Packet<T> {} + + #[test] + fn smoke() { + let m = Mutex::new(()); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn lots_and_lots() { + const J: u32 = 1000; + const K: u32 = 3; + + let m = Arc::new(Mutex::new(0)); + + fn inc(m: &Mutex<u32>) { + for _ in 0..J { + *m.lock() += 1; + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(*m.lock(), J * K * 2); + } + + #[test] + fn try_lock() { + let m = Mutex::new(()); + *m.try_lock().unwrap() = (); + } + + #[test] + fn test_into_inner() { + let m = Mutex::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = Mutex::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_get_mut() { + let mut m = Mutex::new(NonCopy(10)); + *m.get_mut() = NonCopy(20); + assert_eq!(m.into_inner(), NonCopy(20)); + } + + #[test] + fn test_mutex_arc_condvar() { + let packet = Packet(Arc::new((Mutex::new(false), Condvar::new()))); + let packet2 = Packet(packet.0.clone()); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + // wait until parent gets in + rx.recv().unwrap(); + let &(ref lock, ref cvar) = &*packet2.0; + let mut lock = lock.lock(); + *lock = true; + cvar.notify_one(); + }); + + let &(ref lock, ref cvar) = &*packet.0; + let mut lock = lock.lock(); + tx.send(()).unwrap(); + assert!(!*lock); + while !*lock { + cvar.wait(&mut lock); + } + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(Mutex::new(1)); + let arc2 = Arc::new(Mutex::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock(); + let lock2 = lock.lock(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(Mutex::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || { + struct Unwinder { + i: Arc<Mutex<i32>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]); + { + let b = &mut *mutex.lock(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock(), comp); + } + + #[test] + fn test_mutexguard_sync() { + fn sync<T: Sync>(_: T) {} + + let mutex = Mutex::new(()); + sync(mutex.lock()); + } + + #[test] + fn test_mutex_debug() { + let mutex = Mutex::new(vec![0u8, 10]); + + assert_eq!(format!("{:?}", mutex), "Mutex { data: [0, 10] }"); + let _lock = mutex.lock(); + assert_eq!(format!("{:?}", mutex), "Mutex { data: <locked> }"); + } + + #[cfg(feature = "serde")] + #[test] + fn test_serde() { + let contents: Vec<u8> = vec![0, 1, 2]; + let mutex = Mutex::new(contents.clone()); + + let serialized = serialize(&mutex).unwrap(); + let deserialized: Mutex<Vec<u8>> = deserialize(&serialized).unwrap(); + + assert_eq!(*(mutex.lock()), *(deserialized.lock())); + assert_eq!(contents, *(deserialized.lock())); + } +} diff --git a/third_party/rust/parking_lot/src/once.rs b/third_party/rust/parking_lot/src/once.rs new file mode 100644 index 0000000000..f458c9c04b --- /dev/null +++ b/third_party/rust/parking_lot/src/once.rs @@ -0,0 +1,458 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::util::UncheckedOptionExt; +use core::{ + fmt, mem, + sync::atomic::{fence, AtomicU8, Ordering}, +}; +use parking_lot_core::{self, SpinWait, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN}; + +const DONE_BIT: u8 = 1; +const POISON_BIT: u8 = 2; +const LOCKED_BIT: u8 = 4; +const PARKED_BIT: u8 = 8; + +/// Current state of a `Once`. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum OnceState { + /// A closure has not been executed yet + New, + + /// A closure was executed but panicked. + Poisoned, + + /// A thread is currently executing a closure. + InProgress, + + /// A closure has completed successfully. + Done, +} + +impl OnceState { + /// Returns whether the associated `Once` has been poisoned. + /// + /// Once an initialization routine for a `Once` has panicked it will forever + /// indicate to future forced initialization routines that it is poisoned. + #[inline] + pub fn poisoned(self) -> bool { + match self { + OnceState::Poisoned => true, + _ => false, + } + } + + /// Returns whether the associated `Once` has successfully executed a + /// closure. + #[inline] + pub fn done(self) -> bool { + match self { + OnceState::Done => true, + _ => false, + } + } +} + +/// A synchronization primitive which can be used to run a one-time +/// initialization. Useful for one-time initialization for globals, FFI or +/// related functionality. +/// +/// # Differences from the standard library `Once` +/// +/// - Only requires 1 byte of space, instead of 1 word. +/// - Not required to be `'static`. +/// - Relaxed memory barriers in the fast path, which can significantly improve +/// performance on some architectures. +/// - Efficient handling of micro-contention using adaptive spinning. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::Once; +/// +/// static START: Once = Once::new(); +/// +/// START.call_once(|| { +/// // run initialization here +/// }); +/// ``` +pub struct Once(AtomicU8); + +impl Once { + /// Creates a new `Once` value. + #[inline] + pub const fn new() -> Once { + Once(AtomicU8::new(0)) + } + + /// Returns the current state of this `Once`. + #[inline] + pub fn state(&self) -> OnceState { + let state = self.0.load(Ordering::Acquire); + if state & DONE_BIT != 0 { + OnceState::Done + } else if state & LOCKED_BIT != 0 { + OnceState::InProgress + } else if state & POISON_BIT != 0 { + OnceState::Poisoned + } else { + OnceState::New + } + } + + /// Performs an initialization routine once and only once. The given closure + /// will be executed if this is the first time `call_once` has been called, + /// and otherwise the routine will *not* be invoked. + /// + /// This method will block the calling thread if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). It is also + /// guaranteed that any memory writes performed by the executed closure can + /// be reliably observed by other threads at this point (there is a + /// happens-before relation between the closure and code executing after the + /// return). + /// + /// # Examples + /// + /// ``` + /// use parking_lot::Once; + /// + /// static mut VAL: usize = 0; + /// static INIT: Once = Once::new(); + /// + /// // Accessing a `static mut` is unsafe much of the time, but if we do so + /// // in a synchronized fashion (e.g. write once or read all) then we're + /// // good to go! + /// // + /// // This function will only call `expensive_computation` once, and will + /// // otherwise always return the value returned from the first invocation. + /// fn get_cached_val() -> usize { + /// unsafe { + /// INIT.call_once(|| { + /// VAL = expensive_computation(); + /// }); + /// VAL + /// } + /// } + /// + /// fn expensive_computation() -> usize { + /// // ... + /// # 2 + /// } + /// ``` + /// + /// # Panics + /// + /// The closure `f` will only be executed once if this is called + /// concurrently amongst many threads. If that closure panics, however, then + /// it will *poison* this `Once` instance, causing all future invocations of + /// `call_once` to also panic. + #[inline] + pub fn call_once<F>(&self, f: F) + where + F: FnOnce(), + { + if self.0.load(Ordering::Acquire) == DONE_BIT { + return; + } + + let mut f = Some(f); + self.call_once_slow(false, &mut |_| unsafe { f.take().unchecked_unwrap()() }); + } + + /// Performs the same function as `call_once` except ignores poisoning. + /// + /// If this `Once` has been poisoned (some initialization panicked) then + /// this function will continue to attempt to call initialization functions + /// until one of them doesn't panic. + /// + /// The closure `f` is yielded a structure which can be used to query the + /// state of this `Once` (whether initialization has previously panicked or + /// not). + #[inline] + pub fn call_once_force<F>(&self, f: F) + where + F: FnOnce(OnceState), + { + if self.0.load(Ordering::Acquire) == DONE_BIT { + return; + } + + let mut f = Some(f); + self.call_once_slow(true, &mut |state| unsafe { + f.take().unchecked_unwrap()(state) + }); + } + + // This is a non-generic function to reduce the monomorphization cost of + // using `call_once` (this isn't exactly a trivial or small implementation). + // + // Additionally, this is tagged with `#[cold]` as it should indeed be cold + // and it helps let LLVM know that calls to this function should be off the + // fast path. Essentially, this should help generate more straight line code + // in LLVM. + // + // Finally, this takes an `FnMut` instead of a `FnOnce` because there's + // currently no way to take an `FnOnce` and call it via virtual dispatch + // without some allocation overhead. + #[cold] + fn call_once_slow(&self, ignore_poison: bool, f: &mut dyn FnMut(OnceState)) { + let mut spinwait = SpinWait::new(); + let mut state = self.0.load(Ordering::Relaxed); + loop { + // If another thread called the closure, we're done + if state & DONE_BIT != 0 { + // An acquire fence is needed here since we didn't load the + // state with Ordering::Acquire. + fence(Ordering::Acquire); + return; + } + + // If the state has been poisoned and we aren't forcing, then panic + if state & POISON_BIT != 0 && !ignore_poison { + // Need the fence here as well for the same reason + fence(Ordering::Acquire); + panic!("Once instance has previously been poisoned"); + } + + // Grab the lock if it isn't locked, even if there is a queue on it. + // We also clear the poison bit since we are going to try running + // the closure again. + if state & LOCKED_BIT == 0 { + match self.0.compare_exchange_weak( + state, + (state | LOCKED_BIT) & !POISON_BIT, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(x) => state = x, + } + continue; + } + + // If there is no queue, try spinning a few times + if state & PARKED_BIT == 0 && spinwait.spin() { + state = self.0.load(Ordering::Relaxed); + continue; + } + + // Set the parked bit + if state & PARKED_BIT == 0 { + if let Err(x) = self.0.compare_exchange_weak( + state, + state | PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by the thread that owns the + // lock. + unsafe { + let addr = self as *const _ as usize; + let validate = || self.0.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT; + let before_sleep = || {}; + let timed_out = |_, _| unreachable!(); + parking_lot_core::park( + addr, + validate, + before_sleep, + timed_out, + DEFAULT_PARK_TOKEN, + None, + ); + } + + // Loop back and check if the done bit was set + spinwait.reset(); + state = self.0.load(Ordering::Relaxed); + } + + struct PanicGuard<'a>(&'a Once); + impl<'a> Drop for PanicGuard<'a> { + fn drop(&mut self) { + // Mark the state as poisoned, unlock it and unpark all threads. + let once = self.0; + let state = once.0.swap(POISON_BIT, Ordering::Release); + if state & PARKED_BIT != 0 { + unsafe { + let addr = once as *const _ as usize; + parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN); + } + } + } + } + + // At this point we have the lock, so run the closure. Make sure we + // properly clean up if the closure panicks. + let guard = PanicGuard(self); + let once_state = if state & POISON_BIT != 0 { + OnceState::Poisoned + } else { + OnceState::New + }; + f(once_state); + mem::forget(guard); + + // Now unlock the state, set the done bit and unpark all threads + let state = self.0.swap(DONE_BIT, Ordering::Release); + if state & PARKED_BIT != 0 { + unsafe { + let addr = self as *const _ as usize; + parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN); + } + } + } +} + +impl Default for Once { + #[inline] + fn default() -> Once { + Once::new() + } +} + +impl fmt::Debug for Once { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Once") + .field("state", &self.state()) + .finish() + } +} + +#[cfg(test)] +mod tests { + use crate::Once; + use std::panic; + use std::sync::mpsc::channel; + use std::thread; + + #[test] + fn smoke_once() { + static O: Once = Once::new(); + let mut a = 0; + O.call_once(|| a += 1); + assert_eq!(a, 1); + O.call_once(|| a += 1); + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static O: Once = Once::new(); + static mut RUN: bool = false; + + let (tx, rx) = channel(); + for _ in 0..10 { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + tx.send(()).unwrap(); + }); + } + + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + + for _ in 0..10 { + rx.recv().unwrap(); + } + } + + #[test] + fn poison_bad() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // poisoning propagates + let t = panic::catch_unwind(|| { + O.call_once(|| {}); + }); + assert!(t.is_err()); + + // we can subvert poisoning, however + let mut called = false; + O.call_once_force(|p| { + called = true; + assert!(p.poisoned()) + }); + assert!(called); + + // once any success happens, we stop propagating the poison + O.call_once(|| {}); + } + + #[test] + fn wait_for_force_to_finish() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // make sure someone's waiting inside the once via a force + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t1 = thread::spawn(move || { + O.call_once_force(|p| { + assert!(p.poisoned()); + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + }); + + rx1.recv().unwrap(); + + // put another waiter on the once + let t2 = thread::spawn(|| { + let mut called = false; + O.call_once(|| { + called = true; + }); + assert!(!called); + }); + + tx2.send(()).unwrap(); + + assert!(t1.join().is_ok()); + assert!(t2.join().is_ok()); + } + + #[test] + fn test_once_debug() { + static O: Once = Once::new(); + + assert_eq!(format!("{:?}", O), "Once { state: New }"); + } +} diff --git a/third_party/rust/parking_lot/src/raw_fair_mutex.rs b/third_party/rust/parking_lot/src/raw_fair_mutex.rs new file mode 100644 index 0000000000..0da6828e0e --- /dev/null +++ b/third_party/rust/parking_lot/src/raw_fair_mutex.rs @@ -0,0 +1,65 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::raw_mutex::RawMutex; +use lock_api::RawMutexFair; + +/// Raw fair mutex type backed by the parking lot. +pub struct RawFairMutex(RawMutex); + +unsafe impl lock_api::RawMutex for RawFairMutex { + const INIT: Self = RawFairMutex(<RawMutex as lock_api::RawMutex>::INIT); + + type GuardMarker = <RawMutex as lock_api::RawMutex>::GuardMarker; + + #[inline] + fn lock(&self) { + self.0.lock() + } + + #[inline] + fn try_lock(&self) -> bool { + self.0.try_lock() + } + + #[inline] + unsafe fn unlock(&self) { + self.unlock_fair() + } + + #[inline] + fn is_locked(&self) -> bool { + self.0.is_locked() + } +} + +unsafe impl lock_api::RawMutexFair for RawFairMutex { + #[inline] + unsafe fn unlock_fair(&self) { + self.0.unlock_fair() + } + + #[inline] + unsafe fn bump(&self) { + self.0.bump() + } +} + +unsafe impl lock_api::RawMutexTimed for RawFairMutex { + type Duration = <RawMutex as lock_api::RawMutexTimed>::Duration; + type Instant = <RawMutex as lock_api::RawMutexTimed>::Instant; + + #[inline] + fn try_lock_until(&self, timeout: Self::Instant) -> bool { + self.0.try_lock_until(timeout) + } + + #[inline] + fn try_lock_for(&self, timeout: Self::Duration) -> bool { + self.0.try_lock_for(timeout) + } +} diff --git a/third_party/rust/parking_lot/src/raw_mutex.rs b/third_party/rust/parking_lot/src/raw_mutex.rs new file mode 100644 index 0000000000..06667d32db --- /dev/null +++ b/third_party/rust/parking_lot/src/raw_mutex.rs @@ -0,0 +1,331 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::{deadlock, util}; +use core::{ + sync::atomic::{AtomicU8, Ordering}, + time::Duration, +}; +use instant::Instant; +use lock_api::RawMutex as RawMutex_; +use parking_lot_core::{self, ParkResult, SpinWait, UnparkResult, UnparkToken, DEFAULT_PARK_TOKEN}; + +// UnparkToken used to indicate that that the target thread should attempt to +// lock the mutex again as soon as it is unparked. +pub(crate) const TOKEN_NORMAL: UnparkToken = UnparkToken(0); + +// UnparkToken used to indicate that the mutex is being handed off to the target +// thread directly without unlocking it. +pub(crate) const TOKEN_HANDOFF: UnparkToken = UnparkToken(1); + +/// This bit is set in the `state` of a `RawMutex` when that mutex is locked by some thread. +const LOCKED_BIT: u8 = 0b01; +/// This bit is set in the `state` of a `RawMutex` just before parking a thread. A thread is being +/// parked if it wants to lock the mutex, but it is currently being held by some other thread. +const PARKED_BIT: u8 = 0b10; + +/// Raw mutex type backed by the parking lot. +pub struct RawMutex { + /// This atomic integer holds the current state of the mutex instance. Only the two lowest bits + /// are used. See `LOCKED_BIT` and `PARKED_BIT` for the bitmask for these bits. + /// + /// # State table: + /// + /// PARKED_BIT | LOCKED_BIT | Description + /// 0 | 0 | The mutex is not locked, nor is anyone waiting for it. + /// -----------+------------+------------------------------------------------------------------ + /// 0 | 1 | The mutex is locked by exactly one thread. No other thread is + /// | | waiting for it. + /// -----------+------------+------------------------------------------------------------------ + /// 1 | 0 | The mutex is not locked. One or more thread is parked or about to + /// | | park. At least one of the parked threads are just about to be + /// | | unparked, or a thread heading for parking might abort the park. + /// -----------+------------+------------------------------------------------------------------ + /// 1 | 1 | The mutex is locked by exactly one thread. One or more thread is + /// | | parked or about to park, waiting for the lock to become available. + /// | | In this state, PARKED_BIT is only ever cleared when a bucket lock + /// | | is held (i.e. in a parking_lot_core callback). This ensures that + /// | | we never end up in a situation where there are parked threads but + /// | | PARKED_BIT is not set (which would result in those threads + /// | | potentially never getting woken up). + state: AtomicU8, +} + +unsafe impl lock_api::RawMutex for RawMutex { + const INIT: RawMutex = RawMutex { + state: AtomicU8::new(0), + }; + + type GuardMarker = crate::GuardMarker; + + #[inline] + fn lock(&self) { + if self + .state + .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + self.lock_slow(None); + } + unsafe { deadlock::acquire_resource(self as *const _ as usize) }; + } + + #[inline] + fn try_lock(&self) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + loop { + if state & LOCKED_BIT != 0 { + return false; + } + match self.state.compare_exchange_weak( + state, + state | LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => { + unsafe { deadlock::acquire_resource(self as *const _ as usize) }; + return true; + } + Err(x) => state = x, + } + } + } + + #[inline] + unsafe fn unlock(&self) { + deadlock::release_resource(self as *const _ as usize); + if self + .state + .compare_exchange(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + return; + } + self.unlock_slow(false); + } + + #[inline] + fn is_locked(&self) -> bool { + let state = self.state.load(Ordering::Relaxed); + state & LOCKED_BIT != 0 + } +} + +unsafe impl lock_api::RawMutexFair for RawMutex { + #[inline] + unsafe fn unlock_fair(&self) { + deadlock::release_resource(self as *const _ as usize); + if self + .state + .compare_exchange(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + return; + } + self.unlock_slow(true); + } + + #[inline] + unsafe fn bump(&self) { + if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { + self.bump_slow(); + } + } +} + +unsafe impl lock_api::RawMutexTimed for RawMutex { + type Duration = Duration; + type Instant = Instant; + + #[inline] + fn try_lock_until(&self, timeout: Instant) -> bool { + let result = if self + .state + .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + true + } else { + self.lock_slow(Some(timeout)) + }; + if result { + unsafe { deadlock::acquire_resource(self as *const _ as usize) }; + } + result + } + + #[inline] + fn try_lock_for(&self, timeout: Duration) -> bool { + let result = if self + .state + .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + true + } else { + self.lock_slow(util::to_deadline(timeout)) + }; + if result { + unsafe { deadlock::acquire_resource(self as *const _ as usize) }; + } + result + } +} + +impl RawMutex { + // Used by Condvar when requeuing threads to us, must be called while + // holding the queue lock. + #[inline] + pub(crate) fn mark_parked_if_locked(&self) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + loop { + if state & LOCKED_BIT == 0 { + return false; + } + match self.state.compare_exchange_weak( + state, + state | PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(x) => state = x, + } + } + } + + // Used by Condvar when requeuing threads to us, must be called while + // holding the queue lock. + #[inline] + pub(crate) fn mark_parked(&self) { + self.state.fetch_or(PARKED_BIT, Ordering::Relaxed); + } + + #[cold] + fn lock_slow(&self, timeout: Option<Instant>) -> bool { + let mut spinwait = SpinWait::new(); + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Grab the lock if it isn't locked, even if there is a queue on it + if state & LOCKED_BIT == 0 { + match self.state.compare_exchange_weak( + state, + state | LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(x) => state = x, + } + continue; + } + + // If there is no queue, try spinning a few times + if state & PARKED_BIT == 0 && spinwait.spin() { + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Set the parked bit + if state & PARKED_BIT == 0 { + if let Err(x) = self.state.compare_exchange_weak( + state, + state | PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by an unlock + let addr = self as *const _ as usize; + let validate = || self.state.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT; + let before_sleep = || {}; + let timed_out = |_, was_last_thread| { + // Clear the parked bit if we were the last parked thread + if was_last_thread { + self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); + } + }; + // SAFETY: + // * `addr` is an address we control. + // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. + // * `before_sleep` does not call `park`, nor does it panic. + match unsafe { + parking_lot_core::park( + addr, + validate, + before_sleep, + timed_out, + DEFAULT_PARK_TOKEN, + timeout, + ) + } { + // The thread that unparked us passed the lock on to us + // directly without unlocking it. + ParkResult::Unparked(TOKEN_HANDOFF) => return true, + + // We were unparked normally, try acquiring the lock again + ParkResult::Unparked(_) => (), + + // The validation function failed, try locking again + ParkResult::Invalid => (), + + // Timeout expired + ParkResult::TimedOut => return false, + } + + // Loop back and try locking again + spinwait.reset(); + state = self.state.load(Ordering::Relaxed); + } + } + + #[cold] + fn unlock_slow(&self, force_fair: bool) { + // Unpark one thread and leave the parked bit set if there might + // still be parked threads on this address. + let addr = self as *const _ as usize; + let callback = |result: UnparkResult| { + // If we are using a fair unlock then we should keep the + // mutex locked and hand it off to the unparked thread. + if result.unparked_threads != 0 && (force_fair || result.be_fair) { + // Clear the parked bit if there are no more parked + // threads. + if !result.have_more_threads { + self.state.store(LOCKED_BIT, Ordering::Relaxed); + } + return TOKEN_HANDOFF; + } + + // Clear the locked bit, and the parked bit as well if there + // are no more parked threads. + if result.have_more_threads { + self.state.store(PARKED_BIT, Ordering::Release); + } else { + self.state.store(0, Ordering::Release); + } + TOKEN_NORMAL + }; + // SAFETY: + // * `addr` is an address we control. + // * `callback` does not panic or call into any function of `parking_lot`. + unsafe { + parking_lot_core::unpark_one(addr, callback); + } + } + + #[cold] + fn bump_slow(&self) { + unsafe { deadlock::release_resource(self as *const _ as usize) }; + self.unlock_slow(true); + self.lock(); + } +} diff --git a/third_party/rust/parking_lot/src/raw_rwlock.rs b/third_party/rust/parking_lot/src/raw_rwlock.rs new file mode 100644 index 0000000000..75a9812867 --- /dev/null +++ b/third_party/rust/parking_lot/src/raw_rwlock.rs @@ -0,0 +1,1144 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::elision::{have_elision, AtomicElisionExt}; +use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; +use crate::util; +use core::{ + cell::Cell, + sync::atomic::{AtomicUsize, Ordering}, +}; +use instant::Instant; +use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade}; +use parking_lot_core::{ + self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken, +}; +use std::time::Duration; + +// This reader-writer lock implementation is based on Boost's upgrade_mutex: +// https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 +// +// This implementation uses 2 wait queues, one at key [addr] and one at key +// [addr + 1]. The primary queue is used for all new waiting threads, and the +// secondary queue is used by the thread which has acquired WRITER_BIT but is +// waiting for the remaining readers to exit the lock. +// +// This implementation is fair between readers and writers since it uses the +// order in which threads first started queuing to alternate between read phases +// and write phases. In particular is it not vulnerable to write starvation +// since readers will block if there is a pending writer. + +// There is at least one thread in the main queue. +const PARKED_BIT: usize = 0b0001; +// There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. +const WRITER_PARKED_BIT: usize = 0b0010; +// A reader is holding an upgradable lock. The reader count must be non-zero and +// WRITER_BIT must not be set. +const UPGRADABLE_BIT: usize = 0b0100; +// If the reader count is zero: a writer is currently holding an exclusive lock. +// Otherwise: a writer is waiting for the remaining readers to exit the lock. +const WRITER_BIT: usize = 0b1000; +// Mask of bits used to count readers. +const READERS_MASK: usize = !0b1111; +// Base unit for counting readers. +const ONE_READER: usize = 0b10000; + +// Token indicating what type of lock a queued thread is trying to acquire +const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); +const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); +const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); + +/// Raw reader-writer lock type backed by the parking lot. +pub struct RawRwLock { + state: AtomicUsize, +} + +unsafe impl lock_api::RawRwLock for RawRwLock { + const INIT: RawRwLock = RawRwLock { + state: AtomicUsize::new(0), + }; + + type GuardMarker = crate::GuardMarker; + + #[inline] + fn lock_exclusive(&self) { + if self + .state + .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + let result = self.lock_exclusive_slow(None); + debug_assert!(result); + } + self.deadlock_acquire(); + } + + #[inline] + fn try_lock_exclusive(&self) -> bool { + if self + .state + .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + self.deadlock_acquire(); + true + } else { + false + } + } + + #[inline] + unsafe fn unlock_exclusive(&self) { + self.deadlock_release(); + if self + .state + .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + return; + } + self.unlock_exclusive_slow(false); + } + + #[inline] + fn lock_shared(&self) { + if !self.try_lock_shared_fast(false) { + let result = self.lock_shared_slow(false, None); + debug_assert!(result); + } + self.deadlock_acquire(); + } + + #[inline] + fn try_lock_shared(&self) -> bool { + let result = if self.try_lock_shared_fast(false) { + true + } else { + self.try_lock_shared_slow(false) + }; + if result { + self.deadlock_acquire(); + } + result + } + + #[inline] + unsafe fn unlock_shared(&self) { + self.deadlock_release(); + let state = if have_elision() { + self.state.elision_fetch_sub_release(ONE_READER) + } else { + self.state.fetch_sub(ONE_READER, Ordering::Release) + }; + if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { + self.unlock_shared_slow(); + } + } + + #[inline] + fn is_locked(&self) -> bool { + let state = self.state.load(Ordering::Relaxed); + state & (WRITER_BIT | READERS_MASK) != 0 + } +} + +unsafe impl lock_api::RawRwLockFair for RawRwLock { + #[inline] + unsafe fn unlock_shared_fair(&self) { + // Shared unlocking is always fair in this implementation. + self.unlock_shared(); + } + + #[inline] + unsafe fn unlock_exclusive_fair(&self) { + self.deadlock_release(); + if self + .state + .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + return; + } + self.unlock_exclusive_slow(true); + } + + #[inline] + unsafe fn bump_shared(&self) { + if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT) + == ONE_READER | WRITER_BIT + { + self.bump_shared_slow(); + } + } + + #[inline] + unsafe fn bump_exclusive(&self) { + if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { + self.bump_exclusive_slow(); + } + } +} + +unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { + #[inline] + unsafe fn downgrade(&self) { + let state = self + .state + .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release); + + // Wake up parked shared and upgradable threads if there are any + if state & PARKED_BIT != 0 { + self.downgrade_slow(); + } + } +} + +unsafe impl lock_api::RawRwLockTimed for RawRwLock { + type Duration = Duration; + type Instant = Instant; + + #[inline] + fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { + let result = if self.try_lock_shared_fast(false) { + true + } else { + self.lock_shared_slow(false, util::to_deadline(timeout)) + }; + if result { + self.deadlock_acquire(); + } + result + } + + #[inline] + fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { + let result = if self.try_lock_shared_fast(false) { + true + } else { + self.lock_shared_slow(false, Some(timeout)) + }; + if result { + self.deadlock_acquire(); + } + result + } + + #[inline] + fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { + let result = if self + .state + .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + true + } else { + self.lock_exclusive_slow(util::to_deadline(timeout)) + }; + if result { + self.deadlock_acquire(); + } + result + } + + #[inline] + fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { + let result = if self + .state + .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + true + } else { + self.lock_exclusive_slow(Some(timeout)) + }; + if result { + self.deadlock_acquire(); + } + result + } +} + +unsafe impl lock_api::RawRwLockRecursive for RawRwLock { + #[inline] + fn lock_shared_recursive(&self) { + if !self.try_lock_shared_fast(true) { + let result = self.lock_shared_slow(true, None); + debug_assert!(result); + } + self.deadlock_acquire(); + } + + #[inline] + fn try_lock_shared_recursive(&self) -> bool { + let result = if self.try_lock_shared_fast(true) { + true + } else { + self.try_lock_shared_slow(true) + }; + if result { + self.deadlock_acquire(); + } + result + } +} + +unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock { + #[inline] + fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { + let result = if self.try_lock_shared_fast(true) { + true + } else { + self.lock_shared_slow(true, util::to_deadline(timeout)) + }; + if result { + self.deadlock_acquire(); + } + result + } + + #[inline] + fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { + let result = if self.try_lock_shared_fast(true) { + true + } else { + self.lock_shared_slow(true, Some(timeout)) + }; + if result { + self.deadlock_acquire(); + } + result + } +} + +unsafe impl lock_api::RawRwLockUpgrade for RawRwLock { + #[inline] + fn lock_upgradable(&self) { + if !self.try_lock_upgradable_fast() { + let result = self.lock_upgradable_slow(None); + debug_assert!(result); + } + self.deadlock_acquire(); + } + + #[inline] + fn try_lock_upgradable(&self) -> bool { + let result = if self.try_lock_upgradable_fast() { + true + } else { + self.try_lock_upgradable_slow() + }; + if result { + self.deadlock_acquire(); + } + result + } + + #[inline] + unsafe fn unlock_upgradable(&self) { + self.deadlock_release(); + let state = self.state.load(Ordering::Relaxed); + if state & PARKED_BIT == 0 { + if self + .state + .compare_exchange_weak( + state, + state - (ONE_READER | UPGRADABLE_BIT), + Ordering::Release, + Ordering::Relaxed, + ) + .is_ok() + { + return; + } + } + self.unlock_upgradable_slow(false); + } + + #[inline] + unsafe fn upgrade(&self) { + let state = self.state.fetch_sub( + (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, + Ordering::Relaxed, + ); + if state & READERS_MASK != ONE_READER { + let result = self.upgrade_slow(None); + debug_assert!(result); + } + } + + #[inline] + unsafe fn try_upgrade(&self) -> bool { + if self + .state + .compare_exchange_weak( + ONE_READER | UPGRADABLE_BIT, + WRITER_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + { + true + } else { + self.try_upgrade_slow() + } + } +} + +unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock { + #[inline] + unsafe fn unlock_upgradable_fair(&self) { + self.deadlock_release(); + let state = self.state.load(Ordering::Relaxed); + if state & PARKED_BIT == 0 { + if self + .state + .compare_exchange_weak( + state, + state - (ONE_READER | UPGRADABLE_BIT), + Ordering::Release, + Ordering::Relaxed, + ) + .is_ok() + { + return; + } + } + self.unlock_upgradable_slow(false); + } + + #[inline] + unsafe fn bump_upgradable(&self) { + if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT { + self.bump_upgradable_slow(); + } + } +} + +unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock { + #[inline] + unsafe fn downgrade_upgradable(&self) { + let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed); + + // Wake up parked upgradable threads if there are any + if state & PARKED_BIT != 0 { + self.downgrade_slow(); + } + } + + #[inline] + unsafe fn downgrade_to_upgradable(&self) { + let state = self.state.fetch_add( + (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, + Ordering::Release, + ); + + // Wake up parked shared threads if there are any + if state & PARKED_BIT != 0 { + self.downgrade_to_upgradable_slow(); + } + } +} + +unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock { + #[inline] + fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { + let result = if self.try_lock_upgradable_fast() { + true + } else { + self.lock_upgradable_slow(Some(timeout)) + }; + if result { + self.deadlock_acquire(); + } + result + } + + #[inline] + fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { + let result = if self.try_lock_upgradable_fast() { + true + } else { + self.lock_upgradable_slow(util::to_deadline(timeout)) + }; + if result { + self.deadlock_acquire(); + } + result + } + + #[inline] + unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool { + let state = self.state.fetch_sub( + (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, + Ordering::Relaxed, + ); + if state & READERS_MASK == ONE_READER { + true + } else { + self.upgrade_slow(Some(timeout)) + } + } + + #[inline] + unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool { + let state = self.state.fetch_sub( + (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, + Ordering::Relaxed, + ); + if state & READERS_MASK == ONE_READER { + true + } else { + self.upgrade_slow(util::to_deadline(timeout)) + } + } +} + +impl RawRwLock { + #[inline(always)] + fn try_lock_shared_fast(&self, recursive: bool) -> bool { + let state = self.state.load(Ordering::Relaxed); + + // We can't allow grabbing a shared lock if there is a writer, even if + // the writer is still waiting for the remaining readers to exit. + if state & WRITER_BIT != 0 { + // To allow recursive locks, we make an exception and allow readers + // to skip ahead of a pending writer to avoid deadlocking, at the + // cost of breaking the fairness guarantees. + if !recursive || state & READERS_MASK == 0 { + return false; + } + } + + // Use hardware lock elision to avoid cache conflicts when multiple + // readers try to acquire the lock. We only do this if the lock is + // completely empty since elision handles conflicts poorly. + if have_elision() && state == 0 { + self.state + .elision_compare_exchange_acquire(0, ONE_READER) + .is_ok() + } else if let Some(new_state) = state.checked_add(ONE_READER) { + self.state + .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } else { + false + } + } + + #[cold] + fn try_lock_shared_slow(&self, recursive: bool) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + loop { + // This mirrors the condition in try_lock_shared_fast + if state & WRITER_BIT != 0 { + if !recursive || state & READERS_MASK == 0 { + return false; + } + } + if have_elision() && state == 0 { + match self.state.elision_compare_exchange_acquire(0, ONE_READER) { + Ok(_) => return true, + Err(x) => state = x, + } + } else { + match self.state.compare_exchange_weak( + state, + state + .checked_add(ONE_READER) + .expect("RwLock reader count overflow"), + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(x) => state = x, + } + } + } + } + + #[inline(always)] + fn try_lock_upgradable_fast(&self) -> bool { + let state = self.state.load(Ordering::Relaxed); + + // We can't grab an upgradable lock if there is already a writer or + // upgradable reader. + if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { + return false; + } + + if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) { + self.state + .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } else { + false + } + } + + #[cold] + fn try_lock_upgradable_slow(&self) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + loop { + // This mirrors the condition in try_lock_upgradable_fast + if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { + return false; + } + + match self.state.compare_exchange_weak( + state, + state + .checked_add(ONE_READER | UPGRADABLE_BIT) + .expect("RwLock reader count overflow"), + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(x) => state = x, + } + } + } + + #[cold] + fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { + let try_lock = |state: &mut usize| { + loop { + if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { + return false; + } + + // Grab WRITER_BIT if it isn't set, even if there are parked threads. + match self.state.compare_exchange_weak( + *state, + *state | WRITER_BIT, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(x) => *state = x, + } + } + }; + + // Step 1: grab exclusive ownership of WRITER_BIT + let timed_out = !self.lock_common( + timeout, + TOKEN_EXCLUSIVE, + try_lock, + WRITER_BIT | UPGRADABLE_BIT, + ); + if timed_out { + return false; + } + + // Step 2: wait for all remaining readers to exit the lock. + self.wait_for_readers(timeout, 0) + } + + #[cold] + fn unlock_exclusive_slow(&self, force_fair: bool) { + // There are threads to unpark. Try to unpark as many as we can. + let callback = |mut new_state, result: UnparkResult| { + // If we are using a fair unlock then we should keep the + // rwlock locked and hand it off to the unparked threads. + if result.unparked_threads != 0 && (force_fair || result.be_fair) { + if result.have_more_threads { + new_state |= PARKED_BIT; + } + self.state.store(new_state, Ordering::Release); + TOKEN_HANDOFF + } else { + // Clear the parked bit if there are no more parked threads. + if result.have_more_threads { + self.state.store(PARKED_BIT, Ordering::Release); + } else { + self.state.store(0, Ordering::Release); + } + TOKEN_NORMAL + } + }; + // SAFETY: `callback` does not panic or call into any function of `parking_lot`. + unsafe { + self.wake_parked_threads(0, callback); + } + } + + #[cold] + fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { + let try_lock = |state: &mut usize| { + let mut spinwait_shared = SpinWait::new(); + loop { + // Use hardware lock elision to avoid cache conflicts when multiple + // readers try to acquire the lock. We only do this if the lock is + // completely empty since elision handles conflicts poorly. + if have_elision() && *state == 0 { + match self.state.elision_compare_exchange_acquire(0, ONE_READER) { + Ok(_) => return true, + Err(x) => *state = x, + } + } + + // This is the same condition as try_lock_shared_fast + if *state & WRITER_BIT != 0 { + if !recursive || *state & READERS_MASK == 0 { + return false; + } + } + + if self + .state + .compare_exchange_weak( + *state, + state + .checked_add(ONE_READER) + .expect("RwLock reader count overflow"), + Ordering::Acquire, + Ordering::Relaxed, + ) + .is_ok() + { + return true; + } + + // If there is high contention on the reader count then we want + // to leave some time between attempts to acquire the lock to + // let other threads make progress. + spinwait_shared.spin_no_yield(); + *state = self.state.load(Ordering::Relaxed); + } + }; + self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT) + } + + #[cold] + fn unlock_shared_slow(&self) { + // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We + // just need to wake up a potentially sleeping pending writer. + // Using the 2nd key at addr + 1 + let addr = self as *const _ as usize + 1; + let callback = |_result: UnparkResult| { + // Clear the WRITER_PARKED_BIT here since there can only be one + // parked writer thread. + self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed); + TOKEN_NORMAL + }; + // SAFETY: + // * `addr` is an address we control. + // * `callback` does not panic or call into any function of `parking_lot`. + unsafe { + parking_lot_core::unpark_one(addr, callback); + } + } + + #[cold] + fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { + let try_lock = |state: &mut usize| { + let mut spinwait_shared = SpinWait::new(); + loop { + if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { + return false; + } + + if self + .state + .compare_exchange_weak( + *state, + state + .checked_add(ONE_READER | UPGRADABLE_BIT) + .expect("RwLock reader count overflow"), + Ordering::Acquire, + Ordering::Relaxed, + ) + .is_ok() + { + return true; + } + + // If there is high contention on the reader count then we want + // to leave some time between attempts to acquire the lock to + // let other threads make progress. + spinwait_shared.spin_no_yield(); + *state = self.state.load(Ordering::Relaxed); + } + }; + self.lock_common( + timeout, + TOKEN_UPGRADABLE, + try_lock, + WRITER_BIT | UPGRADABLE_BIT, + ) + } + + #[cold] + fn unlock_upgradable_slow(&self, force_fair: bool) { + // Just release the lock if there are no parked threads. + let mut state = self.state.load(Ordering::Relaxed); + while state & PARKED_BIT == 0 { + match self.state.compare_exchange_weak( + state, + state - (ONE_READER | UPGRADABLE_BIT), + Ordering::Release, + Ordering::Relaxed, + ) { + Ok(_) => return, + Err(x) => state = x, + } + } + + // There are threads to unpark. Try to unpark as many as we can. + let callback = |new_state, result: UnparkResult| { + // If we are using a fair unlock then we should keep the + // rwlock locked and hand it off to the unparked threads. + let mut state = self.state.load(Ordering::Relaxed); + if force_fair || result.be_fair { + // Fall back to normal unpark on overflow. Panicking is + // not allowed in parking_lot callbacks. + while let Some(mut new_state) = + (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state) + { + if result.have_more_threads { + new_state |= PARKED_BIT; + } else { + new_state &= !PARKED_BIT; + } + match self.state.compare_exchange_weak( + state, + new_state, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => return TOKEN_HANDOFF, + Err(x) => state = x, + } + } + } + + // Otherwise just release the upgradable lock and update PARKED_BIT. + loop { + let mut new_state = state - (ONE_READER | UPGRADABLE_BIT); + if result.have_more_threads { + new_state |= PARKED_BIT; + } else { + new_state &= !PARKED_BIT; + } + match self.state.compare_exchange_weak( + state, + new_state, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => return TOKEN_NORMAL, + Err(x) => state = x, + } + } + }; + // SAFETY: `callback` does not panic or call into any function of `parking_lot`. + unsafe { + self.wake_parked_threads(0, callback); + } + } + + #[cold] + fn try_upgrade_slow(&self) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + loop { + if state & READERS_MASK != ONE_READER { + return false; + } + match self.state.compare_exchange_weak( + state, + state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(x) => state = x, + } + } + } + + #[cold] + fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { + self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT) + } + + #[cold] + fn downgrade_slow(&self) { + // We only reach this point if PARKED_BIT is set. + let callback = |_, result: UnparkResult| { + // Clear the parked bit if there no more parked threads + if !result.have_more_threads { + self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); + } + TOKEN_NORMAL + }; + // SAFETY: `callback` does not panic or call into any function of `parking_lot`. + unsafe { + self.wake_parked_threads(ONE_READER, callback); + } + } + + #[cold] + fn downgrade_to_upgradable_slow(&self) { + // We only reach this point if PARKED_BIT is set. + let callback = |_, result: UnparkResult| { + // Clear the parked bit if there no more parked threads + if !result.have_more_threads { + self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); + } + TOKEN_NORMAL + }; + // SAFETY: `callback` does not panic or call into any function of `parking_lot`. + unsafe { + self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); + } + } + + #[cold] + unsafe fn bump_shared_slow(&self) { + self.unlock_shared(); + self.lock_shared(); + } + + #[cold] + fn bump_exclusive_slow(&self) { + self.deadlock_release(); + self.unlock_exclusive_slow(true); + self.lock_exclusive(); + } + + #[cold] + fn bump_upgradable_slow(&self) { + self.deadlock_release(); + self.unlock_upgradable_slow(true); + self.lock_upgradable(); + } + + /// Common code for waking up parked threads after releasing WRITER_BIT or + /// UPGRADABLE_BIT. + /// + /// # Safety + /// + /// `callback` must uphold the requirements of the `callback` parameter to + /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in + /// `parking_lot`. + #[inline] + unsafe fn wake_parked_threads( + &self, + new_state: usize, + callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, + ) { + // We must wake up at least one upgrader or writer if there is one, + // otherwise they may end up parked indefinitely since unlock_shared + // does not call wake_parked_threads. + let new_state = Cell::new(new_state); + let addr = self as *const _ as usize; + let filter = |ParkToken(token)| { + let s = new_state.get(); + + // If we are waking up a writer, don't wake anything else. + if s & WRITER_BIT != 0 { + return FilterOp::Stop; + } + + // Otherwise wake *all* readers and one upgrader/writer. + if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 { + // Skip writers and upgradable readers if we already have + // a writer/upgradable reader. + FilterOp::Skip + } else { + new_state.set(s + token); + FilterOp::Unpark + } + }; + let callback = |result| callback(new_state.get(), result); + // SAFETY: + // * `addr` is an address we control. + // * `filter` does not panic or call into any function of `parking_lot`. + // * `callback` safety responsibility is on caller + parking_lot_core::unpark_filter(addr, filter, callback); + } + + // Common code for waiting for readers to exit the lock after acquiring + // WRITER_BIT. + #[inline] + fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool { + // At this point WRITER_BIT is already set, we just need to wait for the + // remaining readers to exit the lock. + let mut spinwait = SpinWait::new(); + let mut state = self.state.load(Ordering::Acquire); + while state & READERS_MASK != 0 { + // Spin a few times to wait for readers to exit + if spinwait.spin() { + state = self.state.load(Ordering::Acquire); + continue; + } + + // Set the parked bit + if state & WRITER_PARKED_BIT == 0 { + if let Err(x) = self.state.compare_exchange_weak( + state, + state | WRITER_PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by an unlock + // Using the 2nd key at addr + 1 + let addr = self as *const _ as usize + 1; + let validate = || { + let state = self.state.load(Ordering::Relaxed); + state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0 + }; + let before_sleep = || {}; + let timed_out = |_, _| {}; + // SAFETY: + // * `addr` is an address we control. + // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. + // * `before_sleep` does not call `park`, nor does it panic. + let park_result = unsafe { + parking_lot_core::park( + addr, + validate, + before_sleep, + timed_out, + TOKEN_EXCLUSIVE, + timeout, + ) + }; + match park_result { + // We still need to re-check the state if we are unparked + // since a previous writer timing-out could have allowed + // another reader to sneak in before we parked. + ParkResult::Unparked(_) | ParkResult::Invalid => { + state = self.state.load(Ordering::Acquire); + continue; + } + + // Timeout expired + ParkResult::TimedOut => { + // We need to release WRITER_BIT and revert back to + // our previous value. We also wake up any threads that + // might be waiting on WRITER_BIT. + let state = self.state.fetch_add( + prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT), + Ordering::Relaxed, + ); + if state & PARKED_BIT != 0 { + let callback = |_, result: UnparkResult| { + // Clear the parked bit if there no more parked threads + if !result.have_more_threads { + self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); + } + TOKEN_NORMAL + }; + // SAFETY: `callback` does not panic or call any function of `parking_lot`. + unsafe { + self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); + } + } + return false; + } + } + } + true + } + + /// Common code for acquiring a lock + #[inline] + fn lock_common( + &self, + timeout: Option<Instant>, + token: ParkToken, + mut try_lock: impl FnMut(&mut usize) -> bool, + validate_flags: usize, + ) -> bool { + let mut spinwait = SpinWait::new(); + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Attempt to grab the lock + if try_lock(&mut state) { + return true; + } + + // If there are no parked threads, try spinning a few times. + if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() { + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Set the parked bit + if state & PARKED_BIT == 0 { + if let Err(x) = self.state.compare_exchange_weak( + state, + state | PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by an unlock + let addr = self as *const _ as usize; + let validate = || { + let state = self.state.load(Ordering::Relaxed); + state & PARKED_BIT != 0 && (state & validate_flags != 0) + }; + let before_sleep = || {}; + let timed_out = |_, was_last_thread| { + // Clear the parked bit if we were the last parked thread + if was_last_thread { + self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); + } + }; + + // SAFETY: + // * `addr` is an address we control. + // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. + // * `before_sleep` does not call `park`, nor does it panic. + let park_result = unsafe { + parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout) + }; + match park_result { + // The thread that unparked us passed the lock on to us + // directly without unlocking it. + ParkResult::Unparked(TOKEN_HANDOFF) => return true, + + // We were unparked normally, try acquiring the lock again + ParkResult::Unparked(_) => (), + + // The validation function failed, try locking again + ParkResult::Invalid => (), + + // Timeout expired + ParkResult::TimedOut => return false, + } + + // Loop back and try locking again + spinwait.reset(); + state = self.state.load(Ordering::Relaxed); + } + } + + #[inline] + fn deadlock_acquire(&self) { + unsafe { deadlock::acquire_resource(self as *const _ as usize) }; + unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; + } + + #[inline] + fn deadlock_release(&self) { + unsafe { deadlock::release_resource(self as *const _ as usize) }; + unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; + } +} diff --git a/third_party/rust/parking_lot/src/remutex.rs b/third_party/rust/parking_lot/src/remutex.rs new file mode 100644 index 0000000000..1037923018 --- /dev/null +++ b/third_party/rust/parking_lot/src/remutex.rs @@ -0,0 +1,149 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::raw_mutex::RawMutex; +use core::num::NonZeroUsize; +use lock_api::{self, GetThreadId}; + +/// Implementation of the `GetThreadId` trait for `lock_api::ReentrantMutex`. +pub struct RawThreadId; + +unsafe impl GetThreadId for RawThreadId { + const INIT: RawThreadId = RawThreadId; + + fn nonzero_thread_id(&self) -> NonZeroUsize { + // The address of a thread-local variable is guaranteed to be unique to the + // current thread, and is also guaranteed to be non-zero. The variable has to have a + // non-zero size to guarantee it has a unique address for each thread. + thread_local!(static KEY: u8 = 0); + KEY.with(|x| { + NonZeroUsize::new(x as *const _ as usize) + .expect("thread-local variable address is null") + }) + } +} + +/// A mutex which can be recursively locked by a single thread. +/// +/// This type is identical to `Mutex` except for the following points: +/// +/// - Locking multiple times from the same thread will work correctly instead of +/// deadlocking. +/// - `ReentrantMutexGuard` does not give mutable references to the locked data. +/// Use a `RefCell` if you need this. +/// +/// See [`Mutex`](type.Mutex.html) for more details about the underlying mutex +/// primitive. +pub type ReentrantMutex<T> = lock_api::ReentrantMutex<RawMutex, RawThreadId, T>; + +/// Creates a new reentrant mutex in an unlocked state ready for use. +/// +/// This allows creating a reentrant mutex in a constant context on stable Rust. +pub const fn const_reentrant_mutex<T>(val: T) -> ReentrantMutex<T> { + ReentrantMutex::const_new( + <RawMutex as lock_api::RawMutex>::INIT, + <RawThreadId as lock_api::GetThreadId>::INIT, + val, + ) +} + +/// An RAII implementation of a "scoped lock" of a reentrant mutex. When this structure +/// is dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be accessed through this guard via its +/// `Deref` implementation. +pub type ReentrantMutexGuard<'a, T> = lock_api::ReentrantMutexGuard<'a, RawMutex, RawThreadId, T>; + +/// An RAII mutex guard returned by `ReentrantMutexGuard::map`, which can point to a +/// subfield of the protected data. +/// +/// The main difference between `MappedReentrantMutexGuard` and `ReentrantMutexGuard` is that the +/// former doesn't support temporarily unlocking and re-locking, since that +/// could introduce soundness issues if the locked object is modified by another +/// thread. +pub type MappedReentrantMutexGuard<'a, T> = + lock_api::MappedReentrantMutexGuard<'a, RawMutex, RawThreadId, T>; + +#[cfg(test)] +mod tests { + use crate::ReentrantMutex; + use std::cell::RefCell; + use std::sync::Arc; + use std::thread; + + #[cfg(feature = "serde")] + use bincode::{deserialize, serialize}; + + #[test] + fn smoke() { + let m = ReentrantMutex::new(2); + { + let a = m.lock(); + { + let b = m.lock(); + { + let c = m.lock(); + assert_eq!(*c, 2); + } + assert_eq!(*b, 2); + } + assert_eq!(*a, 2); + } + } + + #[test] + fn is_mutex() { + let m = Arc::new(ReentrantMutex::new(RefCell::new(0))); + let m2 = m.clone(); + let lock = m.lock(); + let child = thread::spawn(move || { + let lock = m2.lock(); + assert_eq!(*lock.borrow(), 4950); + }); + for i in 0..100 { + let lock = m.lock(); + *lock.borrow_mut() += i; + } + drop(lock); + child.join().unwrap(); + } + + #[test] + fn trylock_works() { + let m = Arc::new(ReentrantMutex::new(())); + let m2 = m.clone(); + let _lock = m.try_lock(); + let _lock2 = m.try_lock(); + thread::spawn(move || { + let lock = m2.try_lock(); + assert!(lock.is_none()); + }) + .join() + .unwrap(); + let _lock3 = m.try_lock(); + } + + #[test] + fn test_reentrant_mutex_debug() { + let mutex = ReentrantMutex::new(vec![0u8, 10]); + + assert_eq!(format!("{:?}", mutex), "ReentrantMutex { data: [0, 10] }"); + } + + #[cfg(feature = "serde")] + #[test] + fn test_serde() { + let contents: Vec<u8> = vec![0, 1, 2]; + let mutex = ReentrantMutex::new(contents.clone()); + + let serialized = serialize(&mutex).unwrap(); + let deserialized: ReentrantMutex<Vec<u8>> = deserialize(&serialized).unwrap(); + + assert_eq!(*(mutex.lock()), *(deserialized.lock())); + assert_eq!(contents, *(deserialized.lock())); + } +} diff --git a/third_party/rust/parking_lot/src/rwlock.rs b/third_party/rust/parking_lot/src/rwlock.rs new file mode 100644 index 0000000000..70e1b1a7c6 --- /dev/null +++ b/third_party/rust/parking_lot/src/rwlock.rs @@ -0,0 +1,618 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::raw_rwlock::RawRwLock; +use lock_api; + +/// A reader-writer lock +/// +/// This type of lock allows a number of readers or at most one writer at any +/// point in time. The write portion of this lock typically allows modification +/// of the underlying data (exclusive access) and the read portion of this lock +/// typically allows for read-only access (shared access). +/// +/// This lock uses a task-fair locking policy which avoids both reader and +/// writer starvation. This means that readers trying to acquire the lock will +/// block even if the lock is unlocked when there are writers waiting to acquire +/// the lock. Because of this, attempts to recursively acquire a read lock +/// within a single thread may result in a deadlock. +/// +/// The type parameter `T` represents the data that this lock protects. It is +/// required that `T` satisfies `Send` to be shared across threads and `Sync` to +/// allow concurrent access through readers. The RAII guards returned from the +/// locking methods implement `Deref` (and `DerefMut` for the `write` methods) +/// to allow access to the contained of the lock. +/// +/// # Fairness +/// +/// A typical unfair lock can often end up in a situation where a single thread +/// quickly acquires and releases the same lock in succession, which can starve +/// other threads waiting to acquire the rwlock. While this improves throughput +/// because it doesn't force a context switch when a thread tries to re-acquire +/// a rwlock it has just released, this can starve other threads. +/// +/// This rwlock uses [eventual fairness](https://trac.webkit.org/changeset/203350) +/// to ensure that the lock will be fair on average without sacrificing +/// throughput. This is done by forcing a fair unlock on average every 0.5ms, +/// which will force the lock to go to the next thread waiting for the rwlock. +/// +/// Additionally, any critical section longer than 1ms will always use a fair +/// unlock, which has a negligible impact on throughput considering the length +/// of the critical section. +/// +/// You can also force a fair unlock by calling `RwLockReadGuard::unlock_fair` +/// or `RwLockWriteGuard::unlock_fair` when unlocking a mutex instead of simply +/// dropping the guard. +/// +/// # Differences from the standard library `RwLock` +/// +/// - Supports atomically downgrading a write lock into a read lock. +/// - Task-fair locking policy instead of an unspecified platform default. +/// - No poisoning, the lock is released normally on panic. +/// - Only requires 1 word of space, whereas the standard library boxes the +/// `RwLock` due to platform limitations. +/// - Can be statically constructed (requires the `const_fn` nightly feature). +/// - Does not require any drop glue when dropped. +/// - Inline fast path for the uncontended case. +/// - Efficient handling of micro-contention using adaptive spinning. +/// - Allows raw locking & unlocking without a guard. +/// - Supports eventual fairness so that the rwlock is fair on average. +/// - Optionally allows making the rwlock fair by calling +/// `RwLockReadGuard::unlock_fair` and `RwLockWriteGuard::unlock_fair`. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::RwLock; +/// +/// let lock = RwLock::new(5); +/// +/// // many reader locks can be held at once +/// { +/// let r1 = lock.read(); +/// let r2 = lock.read(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // read locks are dropped at this point +/// +/// // only one write lock may be held, however +/// { +/// let mut w = lock.write(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // write lock is dropped here +/// ``` +pub type RwLock<T> = lock_api::RwLock<RawRwLock, T>; + +/// Creates a new instance of an `RwLock<T>` which is unlocked. +/// +/// This allows creating a `RwLock<T>` in a constant context on stable Rust. +pub const fn const_rwlock<T>(val: T) -> RwLock<T> { + RwLock::const_new(<RawRwLock as lock_api::RawRwLock>::INIT, val) +} + +/// RAII structure used to release the shared read access of a lock when +/// dropped. +pub type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawRwLock, T>; + +/// RAII structure used to release the exclusive write access of a lock when +/// dropped. +pub type RwLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawRwLock, T>; + +/// An RAII read lock guard returned by `RwLockReadGuard::map`, which can point to a +/// subfield of the protected data. +/// +/// The main difference between `MappedRwLockReadGuard` and `RwLockReadGuard` is that the +/// former doesn't support temporarily unlocking and re-locking, since that +/// could introduce soundness issues if the locked object is modified by another +/// thread. +pub type MappedRwLockReadGuard<'a, T> = lock_api::MappedRwLockReadGuard<'a, RawRwLock, T>; + +/// An RAII write lock guard returned by `RwLockWriteGuard::map`, which can point to a +/// subfield of the protected data. +/// +/// The main difference between `MappedRwLockWriteGuard` and `RwLockWriteGuard` is that the +/// former doesn't support temporarily unlocking and re-locking, since that +/// could introduce soundness issues if the locked object is modified by another +/// thread. +pub type MappedRwLockWriteGuard<'a, T> = lock_api::MappedRwLockWriteGuard<'a, RawRwLock, T>; + +/// RAII structure used to release the upgradable read access of a lock when +/// dropped. +pub type RwLockUpgradableReadGuard<'a, T> = lock_api::RwLockUpgradableReadGuard<'a, RawRwLock, T>; + +#[cfg(test)] +mod tests { + use crate::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; + use rand::Rng; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + #[cfg(feature = "serde")] + use bincode::{deserialize, serialize}; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let l = RwLock::new(()); + drop(l.read()); + drop(l.write()); + drop(l.upgradable_read()); + drop((l.read(), l.read())); + drop((l.read(), l.upgradable_read())); + drop(l.write()); + } + + #[test] + fn frob() { + const N: u32 = 10; + const M: u32 = 1000; + + let r = Arc::new(RwLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..N { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + for _ in 0..M { + if rng.gen_bool(1.0 / N as f64) { + drop(r.write()); + } else { + drop(r.read()); + } + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); + } + + #[test] + fn test_rw_arc_no_poison_wr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write(); + panic!(); + }) + .join(); + let lock = arc.read(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc_no_poison_ww() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write(); + panic!(); + }) + .join(); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc_no_poison_rr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read(); + panic!(); + }) + .join(); + let lock = arc.read(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc_no_poison_rw() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read(); + panic!() + }) + .join(); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_ruw_arc() { + let arc = Arc::new(RwLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + thread::spawn(move || { + for _ in 0..10 { + let mut lock = arc2.write(); + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + tx.send(()).unwrap(); + }); + + let mut children = Vec::new(); + + // Upgradable readers try to catch the writer in the act and also + // try to touch the value + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.upgradable_read(); + let tmp = *lock; + assert!(tmp >= 0); + thread::yield_now(); + let mut lock = RwLockUpgradableReadGuard::upgrade(lock); + assert_eq!(tmp, *lock); + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + })); + } + + // Readers try to catch the writers in the act + for _ in 0..5 { + let arc4 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc4.read(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + rx.recv().unwrap(); + let lock = arc.read(); + assert_eq!(*lock, 15); + } + + #[test] + fn test_rw_arc() { + let arc = Arc::new(RwLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + thread::spawn(move || { + let mut lock = arc2.write(); + for _ in 0..10 { + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + tx.send(()).unwrap(); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.read(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + rx.recv().unwrap(); + let lock = arc.read(); + assert_eq!(*lock, 10); + } + + #[test] + fn test_rw_arc_access_in_unwind() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || { + struct Unwinder { + i: Arc<RwLock<isize>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.read(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_rwlock_unsized() { + let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]); + { + let b = &mut *rw.write(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*rw.read(), comp); + } + + #[test] + fn test_rwlock_try_read() { + let lock = RwLock::new(0isize); + { + let read_guard = lock.read(); + + let read_result = lock.try_read(); + assert!( + read_result.is_some(), + "try_read should succeed while read_guard is in scope" + ); + + drop(read_guard); + } + { + let upgrade_guard = lock.upgradable_read(); + + let read_result = lock.try_read(); + assert!( + read_result.is_some(), + "try_read should succeed while upgrade_guard is in scope" + ); + + drop(upgrade_guard); + } + { + let write_guard = lock.write(); + + let read_result = lock.try_read(); + assert!( + read_result.is_none(), + "try_read should fail while write_guard is in scope" + ); + + drop(write_guard); + } + } + + #[test] + fn test_rwlock_try_write() { + let lock = RwLock::new(0isize); + { + let read_guard = lock.read(); + + let write_result = lock.try_write(); + assert!( + write_result.is_none(), + "try_write should fail while read_guard is in scope" + ); + + drop(read_guard); + } + { + let upgrade_guard = lock.upgradable_read(); + + let write_result = lock.try_write(); + assert!( + write_result.is_none(), + "try_write should fail while upgrade_guard is in scope" + ); + + drop(upgrade_guard); + } + { + let write_guard = lock.write(); + + let write_result = lock.try_write(); + assert!( + write_result.is_none(), + "try_write should fail while write_guard is in scope" + ); + + drop(write_guard); + } + } + + #[test] + fn test_rwlock_try_upgrade() { + let lock = RwLock::new(0isize); + { + let read_guard = lock.read(); + + let upgrade_result = lock.try_upgradable_read(); + assert!( + upgrade_result.is_some(), + "try_upgradable_read should succeed while read_guard is in scope" + ); + + drop(read_guard); + } + { + let upgrade_guard = lock.upgradable_read(); + + let upgrade_result = lock.try_upgradable_read(); + assert!( + upgrade_result.is_none(), + "try_upgradable_read should fail while upgrade_guard is in scope" + ); + + drop(upgrade_guard); + } + { + let write_guard = lock.write(); + + let upgrade_result = lock.try_upgradable_read(); + assert!( + upgrade_result.is_none(), + "try_upgradable should fail while write_guard is in scope" + ); + + drop(write_guard); + } + } + + #[test] + fn test_into_inner() { + let m = RwLock::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = RwLock::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_get_mut() { + let mut m = RwLock::new(NonCopy(10)); + *m.get_mut() = NonCopy(20); + assert_eq!(m.into_inner(), NonCopy(20)); + } + + #[test] + fn test_rwlockguard_sync() { + fn sync<T: Sync>(_: T) {} + + let rwlock = RwLock::new(()); + sync(rwlock.read()); + sync(rwlock.write()); + } + + #[test] + fn test_rwlock_downgrade() { + let x = Arc::new(RwLock::new(0)); + let mut handles = Vec::new(); + for _ in 0..8 { + let x = x.clone(); + handles.push(thread::spawn(move || { + for _ in 0..100 { + let mut writer = x.write(); + *writer += 1; + let cur_val = *writer; + let reader = RwLockWriteGuard::downgrade(writer); + assert_eq!(cur_val, *reader); + } + })); + } + for handle in handles { + handle.join().unwrap() + } + assert_eq!(*x.read(), 800); + } + + #[test] + fn test_rwlock_recursive() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let lock1 = arc.read(); + let t = thread::spawn(move || { + let _lock = arc2.write(); + }); + + if cfg!(not(all(target_env = "sgx", target_vendor = "fortanix"))) { + thread::sleep(Duration::from_millis(100)); + } else { + // FIXME: https://github.com/fortanix/rust-sgx/issues/31 + for _ in 0..100 { + thread::yield_now(); + } + } + + // A normal read would block here since there is a pending writer + let lock2 = arc.read_recursive(); + + // Unblock the thread and join it. + drop(lock1); + drop(lock2); + t.join().unwrap(); + } + + #[test] + fn test_rwlock_debug() { + let x = RwLock::new(vec![0u8, 10]); + + assert_eq!(format!("{:?}", x), "RwLock { data: [0, 10] }"); + let _lock = x.write(); + assert_eq!(format!("{:?}", x), "RwLock { data: <locked> }"); + } + + #[test] + fn test_clone() { + let rwlock = RwLock::new(Arc::new(1)); + let a = rwlock.read_recursive(); + let b = a.clone(); + assert_eq!(Arc::strong_count(&b), 2); + } + + #[cfg(feature = "serde")] + #[test] + fn test_serde() { + let contents: Vec<u8> = vec![0, 1, 2]; + let mutex = RwLock::new(contents.clone()); + + let serialized = serialize(&mutex).unwrap(); + let deserialized: RwLock<Vec<u8>> = deserialize(&serialized).unwrap(); + + assert_eq!(*(mutex.read()), *(deserialized.read())); + assert_eq!(contents, *(deserialized.read())); + } + + #[test] + fn test_issue_203() { + struct Bar(RwLock<()>); + + impl Drop for Bar { + fn drop(&mut self) { + let _n = self.0.write(); + } + } + + thread_local! { + static B: Bar = Bar(RwLock::new(())); + } + + thread::spawn(|| { + B.with(|_| ()); + + let a = RwLock::new(()); + let _a = a.read(); + }) + .join() + .unwrap(); + } +} diff --git a/third_party/rust/parking_lot/src/util.rs b/third_party/rust/parking_lot/src/util.rs new file mode 100644 index 0000000000..19cc2c2129 --- /dev/null +++ b/third_party/rust/parking_lot/src/util.rs @@ -0,0 +1,39 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use instant::Instant; +use std::time::Duration; + +// Option::unchecked_unwrap +pub trait UncheckedOptionExt<T> { + unsafe fn unchecked_unwrap(self) -> T; +} + +impl<T> UncheckedOptionExt<T> for Option<T> { + #[inline] + unsafe fn unchecked_unwrap(self) -> T { + match self { + Some(x) => x, + None => unreachable(), + } + } +} + +// hint::unreachable_unchecked() in release mode +#[inline] +unsafe fn unreachable() -> ! { + if cfg!(debug_assertions) { + unreachable!(); + } else { + core::hint::unreachable_unchecked() + } +} + +#[inline] +pub fn to_deadline(timeout: Duration) -> Option<Instant> { + Instant::now().checked_add(timeout) +} diff --git a/third_party/rust/parking_lot/tests/issue_203.rs b/third_party/rust/parking_lot/tests/issue_203.rs new file mode 100644 index 0000000000..a77a95f8ae --- /dev/null +++ b/third_party/rust/parking_lot/tests/issue_203.rs @@ -0,0 +1,26 @@ +use parking_lot::RwLock; +use std::thread; + +struct Bar(RwLock<()>); + +impl Drop for Bar { + fn drop(&mut self) { + let _n = self.0.write(); + } +} + +thread_local! { + static B: Bar = Bar(RwLock::new(())); +} + +#[test] +fn main() { + thread::spawn(|| { + B.with(|_| ()); + + let a = RwLock::new(()); + let _a = a.read(); + }) + .join() + .unwrap(); +} |