diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/crossbeam-utils | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/crossbeam-utils')
30 files changed, 5944 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils/.cargo-checksum.json b/third_party/rust/crossbeam-utils/.cargo-checksum.json new file mode 100644 index 0000000000..77d0aa8f43 --- /dev/null +++ b/third_party/rust/crossbeam-utils/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"fbcdd2c242af3f8eab76ca3dff71f4c9b1d569db6039ab2f7e331417122d121d","Cargo.toml":"0916d9452f9f79784ac417256b661caa0c95f1b0d3107ad3af2991026707fa61","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"5734ed989dfca1f625b40281ee9f4530f91b2411ec01cb748223e7eb87e201ab","README.md":"2a19af38a52dd965c2d66bb39f90a85b430b51ee9ccb29e9e1978ee7091e5087","benches/atomic_cell.rs":"c927eb3cd1e5ecc4b91adbc3bde98af15ffab4086190792ba64d5cde0e24df3d","build-common.rs":"502cb7494549bed6fa10ac7bea36e880eeb60290dc69b679ac5c92b376469562","build.rs":"ec1d47ec36b3670a6e67955a104851ee7125616888e78bd03b93304e12cd1c50","no_atomic.rs":"3314524d2afa0360c947455a6e6566fb54ebf909c99479ca3b7435741fd3293e","src/atomic/atomic_cell.rs":"0fc99463e633144c5d59d39c35b5477da1f1b90f5448cadc37454b7f4b97707e","src/atomic/consume.rs":"7a7736fcd64f6473dfea7653559ffc5e1a2a234df43835f8aa8734862145ac15","src/atomic/mod.rs":"94193895fa03cece415e8d7be700b73a9a8a7015774ca821253438607f9b0736","src/atomic/seq_lock.rs":"27182e6b87a9db73c5f6831759f8625f9fcdec3c2828204c444aef04f427735a","src/atomic/seq_lock_wide.rs":"9888dd03116bb89ca36d4ab8d5a0b5032107a2983a7eb8024454263b09080088","src/backoff.rs":"8fd5e3dcccc05860680e49c8498de8096bee9140bcfee8723d97117106a020d0","src/cache_padded.rs":"8bb8925e2df44224ffa29f31a2f9c08d88d8bd3df6c1ce47003598225055fdb5","src/lib.rs":"6f1bcf157abe06ad8458a53e865bf8efab9fad4a9424790147cee8fefb3795d8","src/sync/mod.rs":"eca73c04f821859b8434d2b93db87d160dc6a3f65498ca201cd40d732ca4c134","src/sync/once_lock.rs":"c03dc9c05a817e087dccf8b682f7307501542805533551da3c2bab442bc40743","src/sync/parker.rs":"91f3a7d4ee8d9e06b6558d180e8a0df08ff5c6cef612b4ce4790f9f75cb34f84","src/sync/sharded_lock.rs":"6391b3b99b194b8e0888446c2dec340e4fb095753bcf0c1a80bc654f9c8be0e3","src/sync/wait_group.rs":"3e339aab014f50e214fea535c841755113ea058153378ed54e50a4acb403c937","src/thread.rs":"21cf9b3e965529e5c0a6ff8fc1ec846bfe0006c41deb238a149be8d07384e955","tests/atomic_cell.rs":"bf8bc869c922a1cbf929c3b741bae0cae98f2157f572b5a4eb2873d20a407c22","tests/cache_padded.rs":"1bfaff8354c8184e1ee1f902881ca9400b60effb273b0d3f752801a483d2b66d","tests/parker.rs":"6def4721287d9d70b1cfd63ebb34e1c83fbb3376edbad2bc8aac6ef69dd99d20","tests/sharded_lock.rs":"314adeb8a651a28935f7a49c9a261b8fa1fd82bf6a16c865a5aced6216d7e40b","tests/thread.rs":"9a7d7d3028c552fd834c68598b04a1cc252a816bc20ab62cec060d6cd09cab10","tests/wait_group.rs":"02661c2a820a5abe8b0c8fe15a6650aead707b57cdda0610d1b09a2680ed6969"},"package":"4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"}
\ No newline at end of file diff --git a/third_party/rust/crossbeam-utils/CHANGELOG.md b/third_party/rust/crossbeam-utils/CHANGELOG.md new file mode 100644 index 0000000000..8e0fe35b97 --- /dev/null +++ b/third_party/rust/crossbeam-utils/CHANGELOG.md @@ -0,0 +1,206 @@ +# Version 0.8.14 + +- Fix build script bug introduced in 0.8.13. (#932) + +# Version 0.8.13 + +**Note:** This release has been yanked due to regression fixed in 0.8.14. + +- Improve support for custom targets. (#922) + +# Version 0.8.12 + +- Removes the dependency on the `once_cell` crate to restore the MSRV. (#913) +- Work around [rust-lang#98302](https://github.com/rust-lang/rust/issues/98302), which causes compile error on windows-gnu when LTO is enabled. (#913) + +# Version 0.8.11 + +- Bump the minimum supported Rust version to 1.38. (#877) + +# Version 0.8.10 + +- Fix unsoundness of `AtomicCell` on types containing niches. (#834) + This fix contains breaking changes, but they are allowed because this is a soundness bug fix. See #834 for more. + +# Version 0.8.9 + +- Replace lazy_static with once_cell. (#817) + +# Version 0.8.8 + +- Fix a bug when unstable `loom` support is enabled. (#787) + +# Version 0.8.7 + +- Add `AtomicCell<{i*,u*}>::{fetch_max,fetch_min}`. (#785) +- Add `AtomicCell<{i*,u*,bool}>::fetch_nand`. (#785) +- Fix unsoundness of `AtomicCell<{i,u}64>` arithmetics on 32-bit targets that support `Atomic{I,U}64` (#781) + +# Version 0.8.6 + +**Note:** This release has been yanked. See [GHSA-qc84-gqf4-9926](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-qc84-gqf4-9926) for details. + +- Re-add `AtomicCell<{i,u}64>::{fetch_add,fetch_sub,fetch_and,fetch_or,fetch_xor}` that were accidentally removed in 0.8.0 on targets that do not support `Atomic{I,U}64`. (#767) +- Re-add `AtomicCell<{i,u}128>::{fetch_add,fetch_sub,fetch_and,fetch_or,fetch_xor}` that were accidentally removed in 0.8.0. (#767) + +# Version 0.8.5 + +**Note:** This release has been yanked. See [GHSA-qc84-gqf4-9926](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-qc84-gqf4-9926) for details. + +- Add `AtomicCell::fetch_update`. (#704) +- Support targets that do not have atomic CAS on stable Rust. (#698) + +# Version 0.8.4 + +**Note:** This release has been yanked. See [GHSA-qc84-gqf4-9926](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-qc84-gqf4-9926) for details. + +- Bump `loom` dependency to version 0.5. (#686) + +# Version 0.8.3 + +**Note:** This release has been yanked. See [GHSA-qc84-gqf4-9926](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-qc84-gqf4-9926) for details. + +- Make `loom` dependency optional. (#666) + +# Version 0.8.2 + +**Note:** This release has been yanked. See [GHSA-qc84-gqf4-9926](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-qc84-gqf4-9926) for details. + +- Deprecate `AtomicCell::compare_and_swap`. Use `AtomicCell::compare_exchange` instead. (#619) +- Add `Parker::park_deadline`. (#563) +- Improve implementation of `CachePadded`. (#636) +- Add unstable support for `loom`. (#487) + +# Version 0.8.1 + +**Note:** This release has been yanked. See [GHSA-qc84-gqf4-9926](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-qc84-gqf4-9926) for details. + +- Make `AtomicCell::is_lock_free` always const fn. (#600) +- Fix a bug in `seq_lock_wide`. (#596) +- Remove `const_fn` dependency. (#600) +- `crossbeam-utils` no longer fails to compile if unable to determine rustc version. Instead, it now displays a warning. (#604) + +# Version 0.8.0 + +**Note:** This release has been yanked. See [GHSA-qc84-gqf4-9926](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-qc84-gqf4-9926) for details. + +- Bump the minimum supported Rust version to 1.36. +- Remove deprecated `AtomicCell::get_mut()` and `Backoff::is_complete()` methods. +- Remove `alloc` feature. +- Make `CachePadded::new()` const function. +- Make `AtomicCell::is_lock_free()` const function at 1.46+. +- Implement `From<T>` for `AtomicCell<T>`. + +# Version 0.7.2 + +- Fix bug in release (yanking 0.7.1) + +# Version 0.7.1 + +- Bump `autocfg` dependency to version 1.0. (#460) +- Make `AtomicCell` lockfree for u8, u16, u32, u64 sized values at 1.34+. (#454) + +# Version 0.7.0 + +- Bump the minimum required version to 1.28. +- Fix breakage with nightly feature due to rust-lang/rust#65214. +- Apply `#[repr(transparent)]` to `AtomicCell`. +- Make `AtomicCell::new()` const function at 1.31+. + +# Version 0.6.6 + +- Add `UnwindSafe` and `RefUnwindSafe` impls for `AtomicCell`. +- Add `AtomicCell::as_ptr()`. +- Add `AtomicCell::take()`. +- Fix a bug in `AtomicCell::compare_exchange()` and `AtomicCell::compare_and_swap()`. +- Various documentation improvements. + +# Version 0.6.5 + +- Rename `Backoff::is_complete()` to `Backoff::is_completed()`. + +# Version 0.6.4 + +- Add `WaitGroup`, `ShardedLock`, and `Backoff`. +- Add `fetch_*` methods for `AtomicCell<i128>` and `AtomicCell<u128>`. +- Expand documentation. + +# Version 0.6.3 + +- Add `AtomicCell`. +- Improve documentation. + +# Version 0.6.2 + +- Add `Parker`. +- Improve documentation. + +# Version 0.6.1 + +- Fix a soundness bug in `Scope::spawn()`. +- Remove the `T: 'scope` bound on `ScopedJoinHandle`. + +# Version 0.6.0 + +- Move `AtomicConsume` to `atomic` module. +- `scope()` returns a `Result` of thread joins. +- Remove `spawn_unchecked`. +- Fix a soundness bug due to incorrect lifetimes. +- Improve documentation. +- Support nested scoped spawns. +- Implement `Copy`, `Hash`, `PartialEq`, and `Eq` for `CachePadded`. +- Add `CachePadded::into_inner()`. + +# Version 0.5.0 + +- Reorganize sub-modules and rename functions. + +# Version 0.4.1 + +- Fix a documentation link. + +# Version 0.4.0 + +- `CachePadded` supports types bigger than 64 bytes. +- Fix a bug in scoped threads where unitialized memory was being dropped. +- Minimum required Rust version is now 1.25. + +# Version 0.3.2 + +- Mark `load_consume` with `#[inline]`. + +# Version 0.3.1 + +- `load_consume` on ARM and AArch64. + +# Version 0.3.0 + +- Add `join` for scoped thread API. +- Add `load_consume` for atomic load-consume memory ordering. +- Remove `AtomicOption`. + +# Version 0.2.2 + +- Support Rust 1.12.1. +- Call `T::clone` when cloning a `CachePadded<T>`. + +# Version 0.2.1 + +- Add `use_std` feature. + +# Version 0.2.0 + +- Add `nightly` feature. +- Use `repr(align(64))` on `CachePadded` with the `nightly` feature. +- Implement `Drop` for `CachePadded<T>`. +- Implement `Clone` for `CachePadded<T>`. +- Implement `From<T>` for `CachePadded<T>`. +- Implement better `Debug` for `CachePadded<T>`. +- Write more tests. +- Add this changelog. +- Change cache line length to 64 bytes. +- Remove `ZerosValid`. + +# Version 0.1.0 + +- Old implementation of `CachePadded` from `crossbeam` version 0.3.0 diff --git a/third_party/rust/crossbeam-utils/Cargo.toml b/third_party/rust/crossbeam-utils/Cargo.toml new file mode 100644 index 0000000000..6fe3f9f955 --- /dev/null +++ b/third_party/rust/crossbeam-utils/Cargo.toml @@ -0,0 +1,51 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +rust-version = "1.38" +name = "crossbeam-utils" +version = "0.8.14" +description = "Utilities for concurrent programming" +homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-utils" +readme = "README.md" +keywords = [ + "scoped", + "thread", + "atomic", + "cache", +] +categories = [ + "algorithms", + "concurrency", + "data-structures", + "no-std", +] +license = "MIT OR Apache-2.0" +repository = "https://github.com/crossbeam-rs/crossbeam" + +[dependencies.cfg-if] +version = "1" + +[dev-dependencies.rand] +version = "0.8" + +[dev-dependencies.rustversion] +version = "1" + +[features] +default = ["std"] +nightly = [] +std = [] + +[target."cfg(crossbeam_loom)".dependencies.loom] +version = "0.5" +optional = true diff --git a/third_party/rust/crossbeam-utils/LICENSE-APACHE b/third_party/rust/crossbeam-utils/LICENSE-APACHE new file mode 100644 index 0000000000..16fe87b06e --- /dev/null +++ b/third_party/rust/crossbeam-utils/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/third_party/rust/crossbeam-utils/LICENSE-MIT b/third_party/rust/crossbeam-utils/LICENSE-MIT new file mode 100644 index 0000000000..068d491fd5 --- /dev/null +++ b/third_party/rust/crossbeam-utils/LICENSE-MIT @@ -0,0 +1,27 @@ +The MIT License (MIT) + +Copyright (c) 2019 The Crossbeam Project Developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/crossbeam-utils/README.md b/third_party/rust/crossbeam-utils/README.md new file mode 100644 index 0000000000..c06ea601a8 --- /dev/null +++ b/third_party/rust/crossbeam-utils/README.md @@ -0,0 +1,73 @@ +# Crossbeam Utils + +[![Build Status](https://github.com/crossbeam-rs/crossbeam/workflows/CI/badge.svg)]( +https://github.com/crossbeam-rs/crossbeam/actions) +[![License](https://img.shields.io/badge/license-MIT_OR_Apache--2.0-blue.svg)]( +https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-utils#license) +[![Cargo](https://img.shields.io/crates/v/crossbeam-utils.svg)]( +https://crates.io/crates/crossbeam-utils) +[![Documentation](https://docs.rs/crossbeam-utils/badge.svg)]( +https://docs.rs/crossbeam-utils) +[![Rust 1.38+](https://img.shields.io/badge/rust-1.38+-lightgray.svg)]( +https://www.rust-lang.org) +[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ) + +This crate provides miscellaneous tools for concurrent programming: + +#### Atomics + +* [`AtomicCell`], a thread-safe mutable memory location.<sup>(no_std)</sup> +* [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering.<sup>(no_std)</sup> + +#### Thread synchronization + +* [`Parker`], a thread parking primitive. +* [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads. +* [`WaitGroup`], for synchronizing the beginning or end of some computation. + +#### Utilities + +* [`Backoff`], for exponential backoff in spin loops.<sup>(no_std)</sup> +* [`CachePadded`], for padding and aligning a value to the length of a cache line.<sup>(no_std)</sup> +* [`scope`], for spawning threads that borrow local variables from the stack. + +*Features marked with <sup>(no_std)</sup> can be used in `no_std` environments.*<br/> + +[`AtomicCell`]: https://docs.rs/crossbeam-utils/*/crossbeam_utils/atomic/struct.AtomicCell.html +[`AtomicConsume`]: https://docs.rs/crossbeam-utils/*/crossbeam_utils/atomic/trait.AtomicConsume.html +[`Parker`]: https://docs.rs/crossbeam-utils/*/crossbeam_utils/sync/struct.Parker.html +[`ShardedLock`]: https://docs.rs/crossbeam-utils/*/crossbeam_utils/sync/struct.ShardedLock.html +[`WaitGroup`]: https://docs.rs/crossbeam-utils/*/crossbeam_utils/sync/struct.WaitGroup.html +[`Backoff`]: https://docs.rs/crossbeam-utils/*/crossbeam_utils/struct.Backoff.html +[`CachePadded`]: https://docs.rs/crossbeam-utils/*/crossbeam_utils/struct.CachePadded.html +[`scope`]: https://docs.rs/crossbeam-utils/*/crossbeam_utils/thread/fn.scope.html + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +crossbeam-utils = "0.8" +``` + +## Compatibility + +Crossbeam Utils supports stable Rust releases going back at least six months, +and every time the minimum supported Rust version is increased, a new minor +version is released. Currently, the minimum supported Rust version is 1.38. + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +#### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/third_party/rust/crossbeam-utils/benches/atomic_cell.rs b/third_party/rust/crossbeam-utils/benches/atomic_cell.rs new file mode 100644 index 0000000000..844f7c02b6 --- /dev/null +++ b/third_party/rust/crossbeam-utils/benches/atomic_cell.rs @@ -0,0 +1,156 @@ +#![feature(test)] + +extern crate test; + +use std::sync::Barrier; + +use crossbeam_utils::atomic::AtomicCell; +use crossbeam_utils::thread; + +#[bench] +fn load_u8(b: &mut test::Bencher) { + let a = AtomicCell::new(0u8); + let mut sum = 0; + b.iter(|| sum += a.load()); + test::black_box(sum); +} + +#[bench] +fn store_u8(b: &mut test::Bencher) { + let a = AtomicCell::new(0u8); + b.iter(|| a.store(1)); +} + +#[bench] +fn fetch_add_u8(b: &mut test::Bencher) { + let a = AtomicCell::new(0u8); + b.iter(|| a.fetch_add(1)); +} + +#[bench] +fn compare_exchange_u8(b: &mut test::Bencher) { + let a = AtomicCell::new(0u8); + let mut i = 0; + b.iter(|| { + let _ = a.compare_exchange(i, i.wrapping_add(1)); + i = i.wrapping_add(1); + }); +} + +#[bench] +fn concurrent_load_u8(b: &mut test::Bencher) { + const THREADS: usize = 2; + const STEPS: usize = 1_000_000; + + let start = Barrier::new(THREADS + 1); + let end = Barrier::new(THREADS + 1); + let exit = AtomicCell::new(false); + + let a = AtomicCell::new(0u8); + + thread::scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| loop { + start.wait(); + + let mut sum = 0; + for _ in 0..STEPS { + sum += a.load(); + } + test::black_box(sum); + + end.wait(); + if exit.load() { + break; + } + }); + } + + start.wait(); + end.wait(); + + b.iter(|| { + start.wait(); + end.wait(); + }); + + start.wait(); + exit.store(true); + end.wait(); + }) + .unwrap(); +} + +#[bench] +fn load_usize(b: &mut test::Bencher) { + let a = AtomicCell::new(0usize); + let mut sum = 0; + b.iter(|| sum += a.load()); + test::black_box(sum); +} + +#[bench] +fn store_usize(b: &mut test::Bencher) { + let a = AtomicCell::new(0usize); + b.iter(|| a.store(1)); +} + +#[bench] +fn fetch_add_usize(b: &mut test::Bencher) { + let a = AtomicCell::new(0usize); + b.iter(|| a.fetch_add(1)); +} + +#[bench] +fn compare_exchange_usize(b: &mut test::Bencher) { + let a = AtomicCell::new(0usize); + let mut i = 0; + b.iter(|| { + let _ = a.compare_exchange(i, i.wrapping_add(1)); + i = i.wrapping_add(1); + }); +} + +#[bench] +fn concurrent_load_usize(b: &mut test::Bencher) { + const THREADS: usize = 2; + const STEPS: usize = 1_000_000; + + let start = Barrier::new(THREADS + 1); + let end = Barrier::new(THREADS + 1); + let exit = AtomicCell::new(false); + + let a = AtomicCell::new(0usize); + + thread::scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| loop { + start.wait(); + + let mut sum = 0; + for _ in 0..STEPS { + sum += a.load(); + } + test::black_box(sum); + + end.wait(); + if exit.load() { + break; + } + }); + } + + start.wait(); + end.wait(); + + b.iter(|| { + start.wait(); + end.wait(); + }); + + start.wait(); + exit.store(true); + end.wait(); + }) + .unwrap(); +} diff --git a/third_party/rust/crossbeam-utils/build-common.rs b/third_party/rust/crossbeam-utils/build-common.rs new file mode 100644 index 0000000000..e91bb4d471 --- /dev/null +++ b/third_party/rust/crossbeam-utils/build-common.rs @@ -0,0 +1,13 @@ +// The target triplets have the form of 'arch-vendor-system'. +// +// When building for Linux (e.g. the 'system' part is +// 'linux-something'), replace the vendor with 'unknown' +// so that mapping to rust standard targets happens correctly. +fn convert_custom_linux_target(target: String) -> String { + let mut parts: Vec<&str> = target.split('-').collect(); + let system = parts.get(2); + if system == Some(&"linux") { + parts[1] = "unknown"; + }; + parts.join("-") +} diff --git a/third_party/rust/crossbeam-utils/build.rs b/third_party/rust/crossbeam-utils/build.rs new file mode 100644 index 0000000000..617162fb52 --- /dev/null +++ b/third_party/rust/crossbeam-utils/build.rs @@ -0,0 +1,61 @@ +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `crossbeam_no_atomic_cas` +// Assume the target does *not* support atomic CAS operations. +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// - `crossbeam_no_atomic` +// Assume the target does *not* support any atomic operations. +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// - `crossbeam_no_atomic_64` +// Assume the target does *not* support AtomicU64/AtomicI64. +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg emitted by the build +// script are *not* public API. + +#![warn(rust_2018_idioms)] + +use std::env; + +include!("no_atomic.rs"); +include!("build-common.rs"); + +fn main() { + let target = match env::var("TARGET") { + Ok(target) => convert_custom_linux_target(target), + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_`*, not `has_*`. This allows treating as the latest + // stable rustc is used when the build script doesn't run. This is useful + // for non-cargo build systems that don't run the build script. + if NO_ATOMIC_CAS.contains(&&*target) { + println!("cargo:rustc-cfg=crossbeam_no_atomic_cas"); + } + if NO_ATOMIC.contains(&&*target) { + println!("cargo:rustc-cfg=crossbeam_no_atomic"); + println!("cargo:rustc-cfg=crossbeam_no_atomic_64"); + } else if NO_ATOMIC_64.contains(&&*target) { + println!("cargo:rustc-cfg=crossbeam_no_atomic_64"); + } else { + // Otherwise, assuming `"max-atomic-width" == 64` or `"max-atomic-width" == 128`. + } + + println!("cargo:rerun-if-changed=no_atomic.rs"); +} diff --git a/third_party/rust/crossbeam-utils/no_atomic.rs b/third_party/rust/crossbeam-utils/no_atomic.rs new file mode 100644 index 0000000000..8ce0d311fb --- /dev/null +++ b/third_party/rust/crossbeam-utils/no_atomic.rs @@ -0,0 +1,82 @@ +// This file is @generated by no_atomic.sh. +// It is not intended for manual editing. + +const NO_ATOMIC_CAS: &[&str] = &[ + "armv4t-none-eabi", + "armv5te-none-eabi", + "avr-unknown-gnu-atmega328", + "bpfeb-unknown-none", + "bpfel-unknown-none", + "msp430-none-elf", + "riscv32i-unknown-none-elf", + "riscv32im-unknown-none-elf", + "riscv32imc-unknown-none-elf", + "thumbv4t-none-eabi", + "thumbv5te-none-eabi", + "thumbv6m-none-eabi", +]; + +#[allow(dead_code)] // Only crossbeam-utils uses this. +const NO_ATOMIC_64: &[&str] = &[ + "arm-linux-androideabi", + "armebv7r-none-eabi", + "armebv7r-none-eabihf", + "armv4t-none-eabi", + "armv4t-unknown-linux-gnueabi", + "armv5te-none-eabi", + "armv5te-unknown-linux-gnueabi", + "armv5te-unknown-linux-musleabi", + "armv5te-unknown-linux-uclibceabi", + "armv6k-nintendo-3ds", + "armv7r-none-eabi", + "armv7r-none-eabihf", + "avr-unknown-gnu-atmega328", + "hexagon-unknown-linux-musl", + "m68k-unknown-linux-gnu", + "mips-unknown-linux-gnu", + "mips-unknown-linux-musl", + "mips-unknown-linux-uclibc", + "mipsel-sony-psp", + "mipsel-sony-psx", + "mipsel-unknown-linux-gnu", + "mipsel-unknown-linux-musl", + "mipsel-unknown-linux-uclibc", + "mipsel-unknown-none", + "mipsisa32r6-unknown-linux-gnu", + "mipsisa32r6el-unknown-linux-gnu", + "msp430-none-elf", + "powerpc-unknown-freebsd", + "powerpc-unknown-linux-gnu", + "powerpc-unknown-linux-gnuspe", + "powerpc-unknown-linux-musl", + "powerpc-unknown-netbsd", + "powerpc-unknown-openbsd", + "powerpc-wrs-vxworks", + "powerpc-wrs-vxworks-spe", + "riscv32gc-unknown-linux-gnu", + "riscv32gc-unknown-linux-musl", + "riscv32i-unknown-none-elf", + "riscv32im-unknown-none-elf", + "riscv32imac-unknown-none-elf", + "riscv32imac-unknown-xous-elf", + "riscv32imc-unknown-none-elf", + "thumbv4t-none-eabi", + "thumbv5te-none-eabi", + "thumbv6m-none-eabi", + "thumbv7em-none-eabi", + "thumbv7em-none-eabihf", + "thumbv7m-none-eabi", + "thumbv8m.base-none-eabi", + "thumbv8m.main-none-eabi", + "thumbv8m.main-none-eabihf", +]; + +#[allow(dead_code)] // Only crossbeam-utils uses this. +const NO_ATOMIC: &[&str] = &[ + "avr-unknown-gnu-atmega328", + "mipsel-sony-psx", + "msp430-none-elf", + "riscv32i-unknown-none-elf", + "riscv32im-unknown-none-elf", + "riscv32imc-unknown-none-elf", +]; diff --git a/third_party/rust/crossbeam-utils/src/atomic/atomic_cell.rs b/third_party/rust/crossbeam-utils/src/atomic/atomic_cell.rs new file mode 100644 index 0000000000..7941c5c87c --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/atomic_cell.rs @@ -0,0 +1,1121 @@ +// Necessary for implementing atomic methods for `AtomicUnit` +#![allow(clippy::unit_arg)] + +use crate::primitive::sync::atomic::{self, AtomicBool}; +use core::cell::UnsafeCell; +use core::cmp; +use core::fmt; +use core::mem::{self, ManuallyDrop, MaybeUninit}; +use core::sync::atomic::Ordering; + +use core::ptr; + +#[cfg(feature = "std")] +use std::panic::{RefUnwindSafe, UnwindSafe}; + +use super::seq_lock::SeqLock; + +/// A thread-safe mutable memory location. +/// +/// This type is equivalent to [`Cell`], except it can also be shared among multiple threads. +/// +/// Operations on `AtomicCell`s use atomic instructions whenever possible, and synchronize using +/// global locks otherwise. You can call [`AtomicCell::<T>::is_lock_free()`] to check whether +/// atomic instructions or locks will be used. +/// +/// Atomic loads use the [`Acquire`] ordering and atomic stores use the [`Release`] ordering. +/// +/// [`Cell`]: std::cell::Cell +/// [`AtomicCell::<T>::is_lock_free()`]: AtomicCell::is_lock_free +/// [`Acquire`]: std::sync::atomic::Ordering::Acquire +/// [`Release`]: std::sync::atomic::Ordering::Release +#[repr(transparent)] +pub struct AtomicCell<T> { + /// The inner value. + /// + /// If this value can be transmuted into a primitive atomic type, it will be treated as such. + /// Otherwise, all potentially concurrent operations on this data will be protected by a global + /// lock. + /// + /// Using MaybeUninit to prevent code outside the cell from observing partially initialized state: + /// <https://github.com/crossbeam-rs/crossbeam/issues/833> + /// + /// Note: + /// - we'll never store uninitialized `T` due to our API only using initialized `T`. + /// - this `MaybeUninit` does *not* fix <https://github.com/crossbeam-rs/crossbeam/issues/315>. + value: UnsafeCell<MaybeUninit<T>>, +} + +unsafe impl<T: Send> Send for AtomicCell<T> {} +unsafe impl<T: Send> Sync for AtomicCell<T> {} + +#[cfg(feature = "std")] +impl<T> UnwindSafe for AtomicCell<T> {} +#[cfg(feature = "std")] +impl<T> RefUnwindSafe for AtomicCell<T> {} + +impl<T> AtomicCell<T> { + /// Creates a new atomic cell initialized with `val`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// ``` + pub const fn new(val: T) -> AtomicCell<T> { + AtomicCell { + value: UnsafeCell::new(MaybeUninit::new(val)), + } + } + + /// Consumes the atomic and returns the contained value. + /// + /// This is safe because passing `self` by value guarantees that no other threads are + /// concurrently accessing the atomic data. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// let v = a.into_inner(); + /// + /// assert_eq!(v, 7); + /// ``` + pub fn into_inner(self) -> T { + let this = ManuallyDrop::new(self); + // SAFETY: + // - passing `self` by value guarantees that no other threads are concurrently + // accessing the atomic data + // - the raw pointer passed in is valid because we got it from an owned value. + // - `ManuallyDrop` prevents double dropping `T` + unsafe { this.as_ptr().read() } + } + + /// Returns `true` if operations on values of this type are lock-free. + /// + /// If the compiler or the platform doesn't support the necessary atomic instructions, + /// `AtomicCell<T>` will use global locks for every potentially concurrent atomic operation. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// // This type is internally represented as `AtomicUsize` so we can just use atomic + /// // operations provided by it. + /// assert_eq!(AtomicCell::<usize>::is_lock_free(), true); + /// + /// // A wrapper struct around `isize`. + /// struct Foo { + /// bar: isize, + /// } + /// // `AtomicCell<Foo>` will be internally represented as `AtomicIsize`. + /// assert_eq!(AtomicCell::<Foo>::is_lock_free(), true); + /// + /// // Operations on zero-sized types are always lock-free. + /// assert_eq!(AtomicCell::<()>::is_lock_free(), true); + /// + /// // Very large types cannot be represented as any of the standard atomic types, so atomic + /// // operations on them will have to use global locks for synchronization. + /// assert_eq!(AtomicCell::<[u8; 1000]>::is_lock_free(), false); + /// ``` + pub const fn is_lock_free() -> bool { + atomic_is_lock_free::<T>() + } + + /// Stores `val` into the atomic cell. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// a.store(8); + /// assert_eq!(a.load(), 8); + /// ``` + pub fn store(&self, val: T) { + if mem::needs_drop::<T>() { + drop(self.swap(val)); + } else { + unsafe { + atomic_store(self.as_ptr(), val); + } + } + } + + /// Stores `val` into the atomic cell and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// assert_eq!(a.swap(8), 7); + /// assert_eq!(a.load(), 8); + /// ``` + pub fn swap(&self, val: T) -> T { + unsafe { atomic_swap(self.as_ptr(), val) } + } + + /// Returns a raw pointer to the underlying data in this atomic cell. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(5); + /// + /// let ptr = a.as_ptr(); + /// ``` + #[inline] + pub fn as_ptr(&self) -> *mut T { + self.value.get().cast::<T>() + } +} + +impl<T: Default> AtomicCell<T> { + /// Takes the value of the atomic cell, leaving `Default::default()` in its place. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(5); + /// let five = a.take(); + /// + /// assert_eq!(five, 5); + /// assert_eq!(a.into_inner(), 0); + /// ``` + pub fn take(&self) -> T { + self.swap(Default::default()) + } +} + +impl<T: Copy> AtomicCell<T> { + /// Loads a value from the atomic cell. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// + /// assert_eq!(a.load(), 7); + /// ``` + pub fn load(&self) -> T { + unsafe { atomic_load(self.as_ptr()) } + } +} + +impl<T: Copy + Eq> AtomicCell<T> { + /// If the current value equals `current`, stores `new` into the atomic cell. + /// + /// The return value is always the previous value. If it is equal to `current`, then the value + /// was updated. + /// + /// # Examples + /// + /// ``` + /// # #![allow(deprecated)] + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(1); + /// + /// assert_eq!(a.compare_and_swap(2, 3), 1); + /// assert_eq!(a.load(), 1); + /// + /// assert_eq!(a.compare_and_swap(1, 2), 1); + /// assert_eq!(a.load(), 2); + /// ``` + // TODO: remove in the next major version. + #[deprecated(note = "Use `compare_exchange` instead")] + pub fn compare_and_swap(&self, current: T, new: T) -> T { + match self.compare_exchange(current, new) { + Ok(v) => v, + Err(v) => v, + } + } + + /// If the current value equals `current`, stores `new` into the atomic cell. + /// + /// The return value is a result indicating whether the new value was written and containing + /// the previous value. On success this value is guaranteed to be equal to `current`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(1); + /// + /// assert_eq!(a.compare_exchange(2, 3), Err(1)); + /// assert_eq!(a.load(), 1); + /// + /// assert_eq!(a.compare_exchange(1, 2), Ok(1)); + /// assert_eq!(a.load(), 2); + /// ``` + pub fn compare_exchange(&self, current: T, new: T) -> Result<T, T> { + unsafe { atomic_compare_exchange_weak(self.as_ptr(), current, new) } + } + + /// Fetches the value, and applies a function to it that returns an optional + /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else + /// `Err(previous_value)`. + /// + /// Note: This may call the function multiple times if the value has been changed from other threads in + /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied + /// only once to the stored value. + /// + /// # Examples + /// + /// ```rust + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(7); + /// assert_eq!(a.fetch_update(|_| None), Err(7)); + /// assert_eq!(a.fetch_update(|a| Some(a + 1)), Ok(7)); + /// assert_eq!(a.fetch_update(|a| Some(a + 1)), Ok(8)); + /// assert_eq!(a.load(), 9); + /// ``` + #[inline] + pub fn fetch_update<F>(&self, mut f: F) -> Result<T, T> + where + F: FnMut(T) -> Option<T>, + { + let mut prev = self.load(); + while let Some(next) = f(prev) { + match self.compare_exchange(prev, next) { + x @ Ok(_) => return x, + Err(next_prev) => prev = next_prev, + } + } + Err(prev) + } +} + +// `MaybeUninit` prevents `T` from being dropped, so we need to implement `Drop` +// for `AtomicCell` to avoid leaks of non-`Copy` types. +impl<T> Drop for AtomicCell<T> { + fn drop(&mut self) { + if mem::needs_drop::<T>() { + // SAFETY: + // - the mutable reference guarantees that no other threads are concurrently accessing the atomic data + // - the raw pointer passed in is valid because we got it from a reference + // - `MaybeUninit` prevents double dropping `T` + unsafe { + self.as_ptr().drop_in_place(); + } + } + } +} + +macro_rules! impl_arithmetic { + ($t:ty, fallback, $example:tt) => { + impl AtomicCell<$t> { + /// Increments the current value by `val` and returns the previous value. + /// + /// The addition wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_add(3), 7); + /// assert_eq!(a.load(), 10); + /// ``` + #[inline] + pub fn fetch_add(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = value.wrapping_add(val); + old + } + + /// Decrements the current value by `val` and returns the previous value. + /// + /// The subtraction wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_sub(3), 7); + /// assert_eq!(a.load(), 4); + /// ``` + #[inline] + pub fn fetch_sub(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = value.wrapping_sub(val); + old + } + + /// Applies bitwise "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_and(3), 7); + /// assert_eq!(a.load(), 3); + /// ``` + #[inline] + pub fn fetch_and(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value &= val; + old + } + + /// Applies bitwise "nand" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_nand(3), 7); + /// assert_eq!(a.load(), !(7 & 3)); + /// ``` + #[inline] + pub fn fetch_nand(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = !(old & val); + old + } + + /// Applies bitwise "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_or(16), 7); + /// assert_eq!(a.load(), 23); + /// ``` + #[inline] + pub fn fetch_or(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value |= val; + old + } + + /// Applies bitwise "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_xor(2), 7); + /// assert_eq!(a.load(), 5); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value ^= val; + old + } + + /// Compares and sets the maximum of the current value and `val`, + /// and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_max(2), 7); + /// assert_eq!(a.load(), 7); + /// ``` + #[inline] + pub fn fetch_max(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = cmp::max(old, val); + old + } + + /// Compares and sets the minimum of the current value and `val`, + /// and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_min(2), 7); + /// assert_eq!(a.load(), 2); + /// ``` + #[inline] + pub fn fetch_min(&self, val: $t) -> $t { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = cmp::min(old, val); + old + } + } + }; + ($t:ty, $atomic:ty, $example:tt) => { + impl AtomicCell<$t> { + /// Increments the current value by `val` and returns the previous value. + /// + /// The addition wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_add(3), 7); + /// assert_eq!(a.load(), 10); + /// ``` + #[inline] + pub fn fetch_add(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_add(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = value.wrapping_add(val); + old + } + } + + /// Decrements the current value by `val` and returns the previous value. + /// + /// The subtraction wraps on overflow. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_sub(3), 7); + /// assert_eq!(a.load(), 4); + /// ``` + #[inline] + pub fn fetch_sub(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_sub(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = value.wrapping_sub(val); + old + } + } + + /// Applies bitwise "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_and(3), 7); + /// assert_eq!(a.load(), 3); + /// ``` + #[inline] + pub fn fetch_and(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_and(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value &= val; + old + } + } + + /// Applies bitwise "nand" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_nand(3), 7); + /// assert_eq!(a.load(), !(7 & 3)); + /// ``` + #[inline] + pub fn fetch_nand(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_nand(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = !(old & val); + old + } + } + + /// Applies bitwise "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_or(16), 7); + /// assert_eq!(a.load(), 23); + /// ``` + #[inline] + pub fn fetch_or(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_or(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value |= val; + old + } + } + + /// Applies bitwise "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_xor(2), 7); + /// assert_eq!(a.load(), 5); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + let a = unsafe { &*(self.as_ptr() as *const $atomic) }; + a.fetch_xor(val, Ordering::AcqRel) + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value ^= val; + old + } + } + + /// Compares and sets the maximum of the current value and `val`, + /// and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_max(9), 7); + /// assert_eq!(a.load(), 9); + /// ``` + #[inline] + pub fn fetch_max(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + // TODO: Atomic*::fetch_max requires Rust 1.45. + self.fetch_update(|old| Some(cmp::max(old, val))).unwrap() + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = cmp::max(old, val); + old + } + } + + /// Compares and sets the minimum of the current value and `val`, + /// and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + #[doc = $example] + /// + /// assert_eq!(a.fetch_min(2), 7); + /// assert_eq!(a.load(), 2); + /// ``` + #[inline] + pub fn fetch_min(&self, val: $t) -> $t { + if can_transmute::<$t, $atomic>() { + // TODO: Atomic*::fetch_min requires Rust 1.45. + self.fetch_update(|old| Some(cmp::min(old, val))).unwrap() + } else { + let _guard = lock(self.as_ptr() as usize).write(); + let value = unsafe { &mut *(self.as_ptr()) }; + let old = *value; + *value = cmp::min(old, val); + old + } + } + } + }; +} + +impl_arithmetic!(u8, atomic::AtomicU8, "let a = AtomicCell::new(7u8);"); +impl_arithmetic!(i8, atomic::AtomicI8, "let a = AtomicCell::new(7i8);"); +impl_arithmetic!(u16, atomic::AtomicU16, "let a = AtomicCell::new(7u16);"); +impl_arithmetic!(i16, atomic::AtomicI16, "let a = AtomicCell::new(7i16);"); +impl_arithmetic!(u32, atomic::AtomicU32, "let a = AtomicCell::new(7u32);"); +impl_arithmetic!(i32, atomic::AtomicI32, "let a = AtomicCell::new(7i32);"); +#[cfg(not(crossbeam_no_atomic_64))] +impl_arithmetic!(u64, atomic::AtomicU64, "let a = AtomicCell::new(7u64);"); +#[cfg(not(crossbeam_no_atomic_64))] +impl_arithmetic!(i64, atomic::AtomicI64, "let a = AtomicCell::new(7i64);"); +#[cfg(crossbeam_no_atomic_64)] +impl_arithmetic!(u64, fallback, "let a = AtomicCell::new(7u64);"); +#[cfg(crossbeam_no_atomic_64)] +impl_arithmetic!(i64, fallback, "let a = AtomicCell::new(7i64);"); +// TODO: AtomicU128 is unstable +// impl_arithmetic!(u128, atomic::AtomicU128, "let a = AtomicCell::new(7u128);"); +// impl_arithmetic!(i128, atomic::AtomicI128, "let a = AtomicCell::new(7i128);"); +impl_arithmetic!(u128, fallback, "let a = AtomicCell::new(7u128);"); +impl_arithmetic!(i128, fallback, "let a = AtomicCell::new(7i128);"); + +impl_arithmetic!( + usize, + atomic::AtomicUsize, + "let a = AtomicCell::new(7usize);" +); +impl_arithmetic!( + isize, + atomic::AtomicIsize, + "let a = AtomicCell::new(7isize);" +); + +impl AtomicCell<bool> { + /// Applies logical "and" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(true); + /// + /// assert_eq!(a.fetch_and(true), true); + /// assert_eq!(a.load(), true); + /// + /// assert_eq!(a.fetch_and(false), true); + /// assert_eq!(a.load(), false); + /// ``` + #[inline] + pub fn fetch_and(&self, val: bool) -> bool { + let a = unsafe { &*(self.as_ptr() as *const AtomicBool) }; + a.fetch_and(val, Ordering::AcqRel) + } + + /// Applies logical "nand" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(true); + /// + /// assert_eq!(a.fetch_nand(false), true); + /// assert_eq!(a.load(), true); + /// + /// assert_eq!(a.fetch_nand(true), true); + /// assert_eq!(a.load(), false); + /// + /// assert_eq!(a.fetch_nand(false), false); + /// assert_eq!(a.load(), true); + /// ``` + #[inline] + pub fn fetch_nand(&self, val: bool) -> bool { + let a = unsafe { &*(self.as_ptr() as *const AtomicBool) }; + a.fetch_nand(val, Ordering::AcqRel) + } + + /// Applies logical "or" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(false); + /// + /// assert_eq!(a.fetch_or(false), false); + /// assert_eq!(a.load(), false); + /// + /// assert_eq!(a.fetch_or(true), false); + /// assert_eq!(a.load(), true); + /// ``` + #[inline] + pub fn fetch_or(&self, val: bool) -> bool { + let a = unsafe { &*(self.as_ptr() as *const AtomicBool) }; + a.fetch_or(val, Ordering::AcqRel) + } + + /// Applies logical "xor" to the current value and returns the previous value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::atomic::AtomicCell; + /// + /// let a = AtomicCell::new(true); + /// + /// assert_eq!(a.fetch_xor(false), true); + /// assert_eq!(a.load(), true); + /// + /// assert_eq!(a.fetch_xor(true), true); + /// assert_eq!(a.load(), false); + /// ``` + #[inline] + pub fn fetch_xor(&self, val: bool) -> bool { + let a = unsafe { &*(self.as_ptr() as *const AtomicBool) }; + a.fetch_xor(val, Ordering::AcqRel) + } +} + +impl<T: Default> Default for AtomicCell<T> { + fn default() -> AtomicCell<T> { + AtomicCell::new(T::default()) + } +} + +impl<T> From<T> for AtomicCell<T> { + #[inline] + fn from(val: T) -> AtomicCell<T> { + AtomicCell::new(val) + } +} + +impl<T: Copy + fmt::Debug> fmt::Debug for AtomicCell<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AtomicCell") + .field("value", &self.load()) + .finish() + } +} + +/// Returns `true` if values of type `A` can be transmuted into values of type `B`. +const fn can_transmute<A, B>() -> bool { + // Sizes must be equal, but alignment of `A` must be greater or equal than that of `B`. + (mem::size_of::<A>() == mem::size_of::<B>()) & (mem::align_of::<A>() >= mem::align_of::<B>()) +} + +/// Returns a reference to the global lock associated with the `AtomicCell` at address `addr`. +/// +/// This function is used to protect atomic data which doesn't fit into any of the primitive atomic +/// types in `std::sync::atomic`. Operations on such atomics must therefore use a global lock. +/// +/// However, there is not only one global lock but an array of many locks, and one of them is +/// picked based on the given address. Having many locks reduces contention and improves +/// scalability. +#[inline] +#[must_use] +fn lock(addr: usize) -> &'static SeqLock { + // The number of locks is a prime number because we want to make sure `addr % LEN` gets + // dispersed across all locks. + // + // Note that addresses are always aligned to some power of 2, depending on type `T` in + // `AtomicCell<T>`. If `LEN` was an even number, then `addr % LEN` would be an even number, + // too, which means only half of the locks would get utilized! + // + // It is also possible for addresses to accidentally get aligned to a number that is not a + // power of 2. Consider this example: + // + // ``` + // #[repr(C)] + // struct Foo { + // a: AtomicCell<u8>, + // b: u8, + // c: u8, + // } + // ``` + // + // Now, if we have a slice of type `&[Foo]`, it is possible that field `a` in all items gets + // stored at addresses that are multiples of 3. It'd be too bad if `LEN` was divisible by 3. + // In order to protect from such cases, we simply choose a large prime number for `LEN`. + const LEN: usize = 97; + #[allow(clippy::declare_interior_mutable_const)] + const L: SeqLock = SeqLock::new(); + static LOCKS: [SeqLock; LEN] = [L; LEN]; + + // If the modulus is a constant number, the compiler will use crazy math to transform this into + // a sequence of cheap arithmetic operations rather than using the slow modulo instruction. + &LOCKS[addr % LEN] +} + +/// An atomic `()`. +/// +/// All operations are noops. +struct AtomicUnit; + +impl AtomicUnit { + #[inline] + fn load(&self, _order: Ordering) {} + + #[inline] + fn store(&self, _val: (), _order: Ordering) {} + + #[inline] + fn swap(&self, _val: (), _order: Ordering) {} + + #[inline] + fn compare_exchange_weak( + &self, + _current: (), + _new: (), + _success: Ordering, + _failure: Ordering, + ) -> Result<(), ()> { + Ok(()) + } +} + +macro_rules! atomic { + // If values of type `$t` can be transmuted into values of the primitive atomic type `$atomic`, + // declares variable `$a` of type `$atomic` and executes `$atomic_op`, breaking out of the loop. + (@check, $t:ty, $atomic:ty, $a:ident, $atomic_op:expr) => { + if can_transmute::<$t, $atomic>() { + let $a: &$atomic; + break $atomic_op; + } + }; + + // If values of type `$t` can be transmuted into values of a primitive atomic type, declares + // variable `$a` of that type and executes `$atomic_op`. Otherwise, just executes + // `$fallback_op`. + ($t:ty, $a:ident, $atomic_op:expr, $fallback_op:expr) => { + loop { + atomic!(@check, $t, AtomicUnit, $a, $atomic_op); + + atomic!(@check, $t, atomic::AtomicU8, $a, $atomic_op); + atomic!(@check, $t, atomic::AtomicU16, $a, $atomic_op); + atomic!(@check, $t, atomic::AtomicU32, $a, $atomic_op); + #[cfg(not(crossbeam_no_atomic_64))] + atomic!(@check, $t, atomic::AtomicU64, $a, $atomic_op); + // TODO: AtomicU128 is unstable + // atomic!(@check, $t, atomic::AtomicU128, $a, $atomic_op); + + break $fallback_op; + } + }; +} + +/// Returns `true` if operations on `AtomicCell<T>` are lock-free. +const fn atomic_is_lock_free<T>() -> bool { + // HACK(taiki-e): This is equivalent to `atomic! { T, _a, true, false }`, but can be used in const fn even in our MSRV (Rust 1.38). + let is_lock_free = can_transmute::<T, AtomicUnit>() + | can_transmute::<T, atomic::AtomicU8>() + | can_transmute::<T, atomic::AtomicU16>() + | can_transmute::<T, atomic::AtomicU32>(); + #[cfg(not(crossbeam_no_atomic_64))] + let is_lock_free = is_lock_free | can_transmute::<T, atomic::AtomicU64>(); + // TODO: AtomicU128 is unstable + // let is_lock_free = is_lock_free | can_transmute::<T, atomic::AtomicU128>(); + is_lock_free +} + +/// Atomically reads data from `src`. +/// +/// This operation uses the `Acquire` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_load<T>(src: *mut T) -> T +where + T: Copy, +{ + atomic! { + T, a, + { + a = &*(src as *const _ as *const _); + mem::transmute_copy(&a.load(Ordering::Acquire)) + }, + { + let lock = lock(src as usize); + + // Try doing an optimistic read first. + if let Some(stamp) = lock.optimistic_read() { + // We need a volatile read here because other threads might concurrently modify the + // value. In theory, data races are *always* UB, even if we use volatile reads and + // discard the data when a data race is detected. The proper solution would be to + // do atomic reads and atomic writes, but we can't atomically read and write all + // kinds of data since `AtomicU8` is not available on stable Rust yet. + // Load as `MaybeUninit` because we may load a value that is not valid as `T`. + let val = ptr::read_volatile(src.cast::<MaybeUninit<T>>()); + + if lock.validate_read(stamp) { + return val.assume_init(); + } + } + + // Grab a regular write lock so that writers don't starve this load. + let guard = lock.write(); + let val = ptr::read(src); + // The value hasn't been changed. Drop the guard without incrementing the stamp. + guard.abort(); + val + } + } +} + +/// Atomically writes `val` to `dst`. +/// +/// This operation uses the `Release` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_store<T>(dst: *mut T, val: T) { + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + a.store(mem::transmute_copy(&val), Ordering::Release); + mem::forget(val); + }, + { + let _guard = lock(dst as usize).write(); + ptr::write(dst, val); + } + } +} + +/// Atomically swaps data at `dst` with `val`. +/// +/// This operation uses the `AcqRel` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +unsafe fn atomic_swap<T>(dst: *mut T, val: T) -> T { + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + let res = mem::transmute_copy(&a.swap(mem::transmute_copy(&val), Ordering::AcqRel)); + mem::forget(val); + res + }, + { + let _guard = lock(dst as usize).write(); + ptr::replace(dst, val) + } + } +} + +/// Atomically compares data at `dst` to `current` and, if equal byte-for-byte, exchanges data at +/// `dst` with `new`. +/// +/// Returns the old value on success, or the current value at `dst` on failure. +/// +/// This operation uses the `AcqRel` ordering. If possible, an atomic instructions is used, and a +/// global lock otherwise. +#[allow(clippy::let_unit_value)] +unsafe fn atomic_compare_exchange_weak<T>(dst: *mut T, mut current: T, new: T) -> Result<T, T> +where + T: Copy + Eq, +{ + atomic! { + T, a, + { + a = &*(dst as *const _ as *const _); + let mut current_raw = mem::transmute_copy(¤t); + let new_raw = mem::transmute_copy(&new); + + loop { + match a.compare_exchange_weak( + current_raw, + new_raw, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break Ok(current), + Err(previous_raw) => { + let previous = mem::transmute_copy(&previous_raw); + + if !T::eq(&previous, ¤t) { + break Err(previous); + } + + // The compare-exchange operation has failed and didn't store `new`. The + // failure is either spurious, or `previous` was semantically equal to + // `current` but not byte-equal. Let's retry with `previous` as the new + // `current`. + current = previous; + current_raw = previous_raw; + } + } + } + }, + { + let guard = lock(dst as usize).write(); + + if T::eq(&*dst, ¤t) { + Ok(ptr::replace(dst, new)) + } else { + let val = ptr::read(dst); + // The value hasn't been changed. Drop the guard without incrementing the stamp. + guard.abort(); + Err(val) + } + } + } +} diff --git a/third_party/rust/crossbeam-utils/src/atomic/consume.rs b/third_party/rust/crossbeam-utils/src/atomic/consume.rs new file mode 100644 index 0000000000..277b370a55 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/consume.rs @@ -0,0 +1,92 @@ +#[cfg(any(target_arch = "arm", target_arch = "aarch64"))] +use crate::primitive::sync::atomic::compiler_fence; +#[cfg(not(crossbeam_no_atomic))] +use core::sync::atomic::Ordering; + +/// Trait which allows reading from primitive atomic types with "consume" ordering. +pub trait AtomicConsume { + /// Type returned by `load_consume`. + type Val; + + /// Loads a value from the atomic using a "consume" memory ordering. + /// + /// This is similar to the "acquire" ordering, except that an ordering is + /// only guaranteed with operations that "depend on" the result of the load. + /// However consume loads are usually much faster than acquire loads on + /// architectures with a weak memory model since they don't require memory + /// fence instructions. + /// + /// The exact definition of "depend on" is a bit vague, but it works as you + /// would expect in practice since a lot of software, especially the Linux + /// kernel, rely on this behavior. + /// + /// This is currently only implemented on ARM and AArch64, where a fence + /// can be avoided. On other architectures this will fall back to a simple + /// `load(Ordering::Acquire)`. + fn load_consume(&self) -> Self::Val; +} + +#[cfg(not(crossbeam_no_atomic))] +#[cfg(any(target_arch = "arm", target_arch = "aarch64"))] +macro_rules! impl_consume { + () => { + #[inline] + fn load_consume(&self) -> Self::Val { + let result = self.load(Ordering::Relaxed); + compiler_fence(Ordering::Acquire); + result + } + }; +} + +#[cfg(not(crossbeam_no_atomic))] +#[cfg(not(any(target_arch = "arm", target_arch = "aarch64")))] +macro_rules! impl_consume { + () => { + #[inline] + fn load_consume(&self) -> Self::Val { + self.load(Ordering::Acquire) + } + }; +} + +macro_rules! impl_atomic { + ($atomic:ident, $val:ty) => { + #[cfg(not(crossbeam_no_atomic))] + impl AtomicConsume for core::sync::atomic::$atomic { + type Val = $val; + impl_consume!(); + } + #[cfg(crossbeam_loom)] + impl AtomicConsume for loom::sync::atomic::$atomic { + type Val = $val; + impl_consume!(); + } + }; +} + +impl_atomic!(AtomicBool, bool); +impl_atomic!(AtomicUsize, usize); +impl_atomic!(AtomicIsize, isize); +impl_atomic!(AtomicU8, u8); +impl_atomic!(AtomicI8, i8); +impl_atomic!(AtomicU16, u16); +impl_atomic!(AtomicI16, i16); +impl_atomic!(AtomicU32, u32); +impl_atomic!(AtomicI32, i32); +#[cfg(not(crossbeam_no_atomic_64))] +impl_atomic!(AtomicU64, u64); +#[cfg(not(crossbeam_no_atomic_64))] +impl_atomic!(AtomicI64, i64); + +#[cfg(not(crossbeam_no_atomic))] +impl<T> AtomicConsume for core::sync::atomic::AtomicPtr<T> { + type Val = *mut T; + impl_consume!(); +} + +#[cfg(crossbeam_loom)] +impl<T> AtomicConsume for loom::sync::atomic::AtomicPtr<T> { + type Val = *mut T; + impl_consume!(); +} diff --git a/third_party/rust/crossbeam-utils/src/atomic/mod.rs b/third_party/rust/crossbeam-utils/src/atomic/mod.rs new file mode 100644 index 0000000000..38967859f9 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/mod.rs @@ -0,0 +1,37 @@ +//! Atomic types. +//! +//! * [`AtomicCell`], a thread-safe mutable memory location. +//! * [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering. + +#[cfg(not(crossbeam_no_atomic_cas))] +#[cfg(not(crossbeam_loom))] +cfg_if::cfg_if! { + // Use "wide" sequence lock if the pointer width <= 32 for preventing its counter against wrap + // around. + // + // We are ignoring too wide architectures (pointer width >= 256), since such a system will not + // appear in a conceivable future. + // + // In narrow architectures (pointer width <= 16), the counter is still <= 32-bit and may be + // vulnerable to wrap around. But it's mostly okay, since in such a primitive hardware, the + // counter will not be increased that fast. + if #[cfg(any(target_pointer_width = "64", target_pointer_width = "128"))] { + mod seq_lock; + } else { + #[path = "seq_lock_wide.rs"] + mod seq_lock; + } +} + +#[cfg(not(crossbeam_no_atomic_cas))] +// We cannot provide AtomicCell under cfg(crossbeam_loom) because loom's atomic +// types have a different in-memory representation than the underlying type. +// TODO: The latest loom supports fences, so fallback using seqlock may be available. +#[cfg(not(crossbeam_loom))] +mod atomic_cell; +mod consume; + +#[cfg(not(crossbeam_no_atomic_cas))] +#[cfg(not(crossbeam_loom))] +pub use self::atomic_cell::AtomicCell; +pub use self::consume::AtomicConsume; diff --git a/third_party/rust/crossbeam-utils/src/atomic/seq_lock.rs b/third_party/rust/crossbeam-utils/src/atomic/seq_lock.rs new file mode 100644 index 0000000000..ff8defd26d --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/seq_lock.rs @@ -0,0 +1,112 @@ +use core::mem; +use core::sync::atomic::{self, AtomicUsize, Ordering}; + +use crate::Backoff; + +/// A simple stamped lock. +pub(crate) struct SeqLock { + /// The current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state + /// equals 1 and doesn't contain a valid stamp. + state: AtomicUsize, +} + +impl SeqLock { + pub(crate) const fn new() -> Self { + Self { + state: AtomicUsize::new(0), + } + } + + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + pub(crate) fn optimistic_read(&self) -> Option<usize> { + let state = self.state.load(Ordering::Acquire); + if state == 1 { + None + } else { + Some(state) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + pub(crate) fn validate_read(&self, stamp: usize) -> bool { + atomic::fence(Ordering::Acquire); + self.state.load(Ordering::Relaxed) == stamp + } + + /// Grabs the lock for writing. + #[inline] + pub(crate) fn write(&'static self) -> SeqLockWriteGuard { + let backoff = Backoff::new(); + loop { + let previous = self.state.swap(1, Ordering::Acquire); + + if previous != 1 { + atomic::fence(Ordering::Release); + + return SeqLockWriteGuard { + lock: self, + state: previous, + }; + } + + backoff.snooze(); + } + } +} + +/// An RAII guard that releases the lock and increments the stamp when dropped. +pub(crate) struct SeqLockWriteGuard { + /// The parent lock. + lock: &'static SeqLock, + + /// The stamp before locking. + state: usize, +} + +impl SeqLockWriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + pub(crate) fn abort(self) { + self.lock.state.store(self.state, Ordering::Release); + + // We specifically don't want to call drop(), since that's + // what increments the stamp. + mem::forget(self); + } +} + +impl Drop for SeqLockWriteGuard { + #[inline] + fn drop(&mut self) { + // Release the lock and increment the stamp. + self.lock + .state + .store(self.state.wrapping_add(2), Ordering::Release); + } +} + +#[cfg(test)] +mod tests { + use super::SeqLock; + + #[test] + fn test_abort() { + static LK: SeqLock = SeqLock::new(); + let before = LK.optimistic_read().unwrap(); + { + let guard = LK.write(); + guard.abort(); + } + let after = LK.optimistic_read().unwrap(); + assert_eq!(before, after, "aborted write does not update the stamp"); + } +} diff --git a/third_party/rust/crossbeam-utils/src/atomic/seq_lock_wide.rs b/third_party/rust/crossbeam-utils/src/atomic/seq_lock_wide.rs new file mode 100644 index 0000000000..ef5d94a454 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/atomic/seq_lock_wide.rs @@ -0,0 +1,155 @@ +use core::mem; +use core::sync::atomic::{self, AtomicUsize, Ordering}; + +use crate::Backoff; + +/// A simple stamped lock. +/// +/// The state is represented as two `AtomicUsize`: `state_hi` for high bits and `state_lo` for low +/// bits. +pub(crate) struct SeqLock { + /// The high bits of the current state of the lock. + state_hi: AtomicUsize, + + /// The low bits of the current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state_lo + /// equals 1 and doesn't contain a valid stamp. + state_lo: AtomicUsize, +} + +impl SeqLock { + pub(crate) const fn new() -> Self { + Self { + state_hi: AtomicUsize::new(0), + state_lo: AtomicUsize::new(0), + } + } + + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + pub(crate) fn optimistic_read(&self) -> Option<(usize, usize)> { + // The acquire loads from `state_hi` and `state_lo` synchronize with the release stores in + // `SeqLockWriteGuard::drop`. + // + // As a consequence, we can make sure that (1) all writes within the era of `state_hi - 1` + // happens before now; and therefore, (2) if `state_lo` is even, all writes within the + // critical section of (`state_hi`, `state_lo`) happens before now. + let state_hi = self.state_hi.load(Ordering::Acquire); + let state_lo = self.state_lo.load(Ordering::Acquire); + if state_lo == 1 { + None + } else { + Some((state_hi, state_lo)) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + pub(crate) fn validate_read(&self, stamp: (usize, usize)) -> bool { + // Thanks to the fence, if we're noticing any modification to the data at the critical + // section of `(a, b)`, then the critical section's write of 1 to state_lo should be + // visible. + atomic::fence(Ordering::Acquire); + + // So if `state_lo` coincides with `stamp.1`, then either (1) we're noticing no modification + // to the data after the critical section of `(stamp.0, stamp.1)`, or (2) `state_lo` wrapped + // around. + // + // If (2) is the case, the acquire ordering ensures we see the new value of `state_hi`. + let state_lo = self.state_lo.load(Ordering::Acquire); + + // If (2) is the case and `state_hi` coincides with `stamp.0`, then `state_hi` also wrapped + // around, which we give up to correctly validate the read. + let state_hi = self.state_hi.load(Ordering::Relaxed); + + // Except for the case that both `state_hi` and `state_lo` wrapped around, the following + // condition implies that we're noticing no modification to the data after the critical + // section of `(stamp.0, stamp.1)`. + (state_hi, state_lo) == stamp + } + + /// Grabs the lock for writing. + #[inline] + pub(crate) fn write(&'static self) -> SeqLockWriteGuard { + let backoff = Backoff::new(); + loop { + let previous = self.state_lo.swap(1, Ordering::Acquire); + + if previous != 1 { + // To synchronize with the acquire fence in `validate_read` via any modification to + // the data at the critical section of `(state_hi, previous)`. + atomic::fence(Ordering::Release); + + return SeqLockWriteGuard { + lock: self, + state_lo: previous, + }; + } + + backoff.snooze(); + } + } +} + +/// An RAII guard that releases the lock and increments the stamp when dropped. +pub(crate) struct SeqLockWriteGuard { + /// The parent lock. + lock: &'static SeqLock, + + /// The stamp before locking. + state_lo: usize, +} + +impl SeqLockWriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + pub(crate) fn abort(self) { + self.lock.state_lo.store(self.state_lo, Ordering::Release); + mem::forget(self); + } +} + +impl Drop for SeqLockWriteGuard { + #[inline] + fn drop(&mut self) { + let state_lo = self.state_lo.wrapping_add(2); + + // Increase the high bits if the low bits wrap around. + // + // Release ordering for synchronizing with `optimistic_read`. + if state_lo == 0 { + let state_hi = self.lock.state_hi.load(Ordering::Relaxed); + self.lock + .state_hi + .store(state_hi.wrapping_add(1), Ordering::Release); + } + + // Release the lock and increment the stamp. + // + // Release ordering for synchronizing with `optimistic_read`. + self.lock.state_lo.store(state_lo, Ordering::Release); + } +} + +#[cfg(test)] +mod tests { + use super::SeqLock; + + #[test] + fn test_abort() { + static LK: SeqLock = SeqLock::new(); + let before = LK.optimistic_read().unwrap(); + { + let guard = LK.write(); + guard.abort(); + } + let after = LK.optimistic_read().unwrap(); + assert_eq!(before, after, "aborted write does not update the stamp"); + } +} diff --git a/third_party/rust/crossbeam-utils/src/backoff.rs b/third_party/rust/crossbeam-utils/src/backoff.rs new file mode 100644 index 0000000000..9e256aaf2e --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/backoff.rs @@ -0,0 +1,296 @@ +use crate::primitive::sync::atomic; +use core::cell::Cell; +use core::fmt; + +const SPIN_LIMIT: u32 = 6; +const YIELD_LIMIT: u32 = 10; + +/// Performs exponential backoff in spin loops. +/// +/// Backing off in spin loops reduces contention and improves overall performance. +/// +/// This primitive can execute *YIELD* and *PAUSE* instructions, yield the current thread to the OS +/// scheduler, and tell when is a good time to block the thread using a different synchronization +/// mechanism. Each step of the back off procedure takes roughly twice as long as the previous +/// step. +/// +/// # Examples +/// +/// Backing off in a lock-free loop: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicUsize; +/// use std::sync::atomic::Ordering::SeqCst; +/// +/// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { +/// let backoff = Backoff::new(); +/// loop { +/// let val = a.load(SeqCst); +/// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { +/// return val; +/// } +/// backoff.spin(); +/// } +/// } +/// ``` +/// +/// Waiting for an [`AtomicBool`] to become `true`: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicBool; +/// use std::sync::atomic::Ordering::SeqCst; +/// +/// fn spin_wait(ready: &AtomicBool) { +/// let backoff = Backoff::new(); +/// while !ready.load(SeqCst) { +/// backoff.snooze(); +/// } +/// } +/// ``` +/// +/// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait. +/// Note that whoever sets the atomic variable to `true` must notify the parked thread by calling +/// [`unpark()`]: +/// +/// ``` +/// use crossbeam_utils::Backoff; +/// use std::sync::atomic::AtomicBool; +/// use std::sync::atomic::Ordering::SeqCst; +/// use std::thread; +/// +/// fn blocking_wait(ready: &AtomicBool) { +/// let backoff = Backoff::new(); +/// while !ready.load(SeqCst) { +/// if backoff.is_completed() { +/// thread::park(); +/// } else { +/// backoff.snooze(); +/// } +/// } +/// } +/// ``` +/// +/// [`is_completed`]: Backoff::is_completed +/// [`std::thread::park()`]: std::thread::park +/// [`Condvar`]: std::sync::Condvar +/// [`AtomicBool`]: std::sync::atomic::AtomicBool +/// [`unpark()`]: std::thread::Thread::unpark +pub struct Backoff { + step: Cell<u32>, +} + +impl Backoff { + /// Creates a new `Backoff`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// + /// let backoff = Backoff::new(); + /// ``` + #[inline] + pub fn new() -> Self { + Backoff { step: Cell::new(0) } + } + + /// Resets the `Backoff`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// + /// let backoff = Backoff::new(); + /// backoff.reset(); + /// ``` + #[inline] + pub fn reset(&self) { + self.step.set(0); + } + + /// Backs off in a lock-free loop. + /// + /// This method should be used when we need to retry an operation because another thread made + /// progress. + /// + /// The processor may yield using the *YIELD* or *PAUSE* instruction. + /// + /// # Examples + /// + /// Backing off in a lock-free loop: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::atomic::AtomicUsize; + /// use std::sync::atomic::Ordering::SeqCst; + /// + /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { + /// let backoff = Backoff::new(); + /// loop { + /// let val = a.load(SeqCst); + /// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { + /// return val; + /// } + /// backoff.spin(); + /// } + /// } + /// + /// let a = AtomicUsize::new(7); + /// assert_eq!(fetch_mul(&a, 8), 7); + /// assert_eq!(a.load(SeqCst), 56); + /// ``` + #[inline] + pub fn spin(&self) { + for _ in 0..1 << self.step.get().min(SPIN_LIMIT) { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + atomic::spin_loop_hint(); + } + + if self.step.get() <= SPIN_LIMIT { + self.step.set(self.step.get() + 1); + } + } + + /// Backs off in a blocking loop. + /// + /// This method should be used when we need to wait for another thread to make progress. + /// + /// The processor may yield using the *YIELD* or *PAUSE* instruction and the current thread + /// may yield by giving up a timeslice to the OS scheduler. + /// + /// In `#[no_std]` environments, this method is equivalent to [`spin`]. + /// + /// If possible, use [`is_completed`] to check when it is advised to stop using backoff and + /// block the current thread using a different synchronization mechanism instead. + /// + /// [`spin`]: Backoff::spin + /// [`is_completed`]: Backoff::is_completed + /// + /// # Examples + /// + /// Waiting for an [`AtomicBool`] to become `true`: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::Arc; + /// use std::sync::atomic::AtomicBool; + /// use std::sync::atomic::Ordering::SeqCst; + /// use std::thread; + /// use std::time::Duration; + /// + /// fn spin_wait(ready: &AtomicBool) { + /// let backoff = Backoff::new(); + /// while !ready.load(SeqCst) { + /// backoff.snooze(); + /// } + /// } + /// + /// let ready = Arc::new(AtomicBool::new(false)); + /// let ready2 = ready.clone(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(100)); + /// ready2.store(true, SeqCst); + /// }); + /// + /// assert_eq!(ready.load(SeqCst), false); + /// spin_wait(&ready); + /// assert_eq!(ready.load(SeqCst), true); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// [`AtomicBool`]: std::sync::atomic::AtomicBool + #[inline] + pub fn snooze(&self) { + if self.step.get() <= SPIN_LIMIT { + for _ in 0..1 << self.step.get() { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + atomic::spin_loop_hint(); + } + } else { + #[cfg(not(feature = "std"))] + for _ in 0..1 << self.step.get() { + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + atomic::spin_loop_hint(); + } + + #[cfg(feature = "std")] + ::std::thread::yield_now(); + } + + if self.step.get() <= YIELD_LIMIT { + self.step.set(self.step.get() + 1); + } + } + + /// Returns `true` if exponential backoff has completed and blocking the thread is advised. + /// + /// # Examples + /// + /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait: + /// + /// ``` + /// use crossbeam_utils::Backoff; + /// use std::sync::Arc; + /// use std::sync::atomic::AtomicBool; + /// use std::sync::atomic::Ordering::SeqCst; + /// use std::thread; + /// use std::time::Duration; + /// + /// fn blocking_wait(ready: &AtomicBool) { + /// let backoff = Backoff::new(); + /// while !ready.load(SeqCst) { + /// if backoff.is_completed() { + /// thread::park(); + /// } else { + /// backoff.snooze(); + /// } + /// } + /// } + /// + /// let ready = Arc::new(AtomicBool::new(false)); + /// let ready2 = ready.clone(); + /// let waiter = thread::current(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(100)); + /// ready2.store(true, SeqCst); + /// waiter.unpark(); + /// }); + /// + /// assert_eq!(ready.load(SeqCst), false); + /// blocking_wait(&ready); + /// assert_eq!(ready.load(SeqCst), true); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// [`AtomicBool`]: std::sync::atomic::AtomicBool + #[inline] + pub fn is_completed(&self) -> bool { + self.step.get() > YIELD_LIMIT + } +} + +impl fmt::Debug for Backoff { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Backoff") + .field("step", &self.step) + .field("is_completed", &self.is_completed()) + .finish() + } +} + +impl Default for Backoff { + fn default() -> Backoff { + Backoff::new() + } +} diff --git a/third_party/rust/crossbeam-utils/src/cache_padded.rs b/third_party/rust/crossbeam-utils/src/cache_padded.rs new file mode 100644 index 0000000000..b5d5d33c96 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/cache_padded.rs @@ -0,0 +1,191 @@ +use core::fmt; +use core::ops::{Deref, DerefMut}; + +/// Pads and aligns a value to the length of a cache line. +/// +/// In concurrent programming, sometimes it is desirable to make sure commonly accessed pieces of +/// data are not placed into the same cache line. Updating an atomic value invalidates the whole +/// cache line it belongs to, which makes the next access to the same cache line slower for other +/// CPU cores. Use `CachePadded` to ensure updating one piece of data doesn't invalidate other +/// cached data. +/// +/// # Size and alignment +/// +/// Cache lines are assumed to be N bytes long, depending on the architecture: +/// +/// * On x86-64, aarch64, and powerpc64, N = 128. +/// * On arm, mips, mips64, and riscv64, N = 32. +/// * On s390x, N = 256. +/// * On all others, N = 64. +/// +/// Note that N is just a reasonable guess and is not guaranteed to match the actual cache line +/// length of the machine the program is running on. On modern Intel architectures, spatial +/// prefetcher is pulling pairs of 64-byte cache lines at a time, so we pessimistically assume that +/// cache lines are 128 bytes long. +/// +/// The size of `CachePadded<T>` is the smallest multiple of N bytes large enough to accommodate +/// a value of type `T`. +/// +/// The alignment of `CachePadded<T>` is the maximum of N bytes and the alignment of `T`. +/// +/// # Examples +/// +/// Alignment and padding: +/// +/// ``` +/// use crossbeam_utils::CachePadded; +/// +/// let array = [CachePadded::new(1i8), CachePadded::new(2i8)]; +/// let addr1 = &*array[0] as *const i8 as usize; +/// let addr2 = &*array[1] as *const i8 as usize; +/// +/// assert!(addr2 - addr1 >= 32); +/// assert_eq!(addr1 % 32, 0); +/// assert_eq!(addr2 % 32, 0); +/// ``` +/// +/// When building a concurrent queue with a head and a tail index, it is wise to place them in +/// different cache lines so that concurrent threads pushing and popping elements don't invalidate +/// each other's cache lines: +/// +/// ``` +/// use crossbeam_utils::CachePadded; +/// use std::sync::atomic::AtomicUsize; +/// +/// struct Queue<T> { +/// head: CachePadded<AtomicUsize>, +/// tail: CachePadded<AtomicUsize>, +/// buffer: *mut T, +/// } +/// ``` +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +// +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// +// Sources: +// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ +// +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + ), + repr(align(128)) +)] +// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + ), + repr(align(32)) +)] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86 and wasm have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// +// All others are assumed to have 64-byte cache line size. +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + target_arch = "s390x", + )), + repr(align(64)) +)] +pub struct CachePadded<T> { + value: T, +} + +unsafe impl<T: Send> Send for CachePadded<T> {} +unsafe impl<T: Sync> Sync for CachePadded<T> {} + +impl<T> CachePadded<T> { + /// Pads and aligns a value to the length of a cache line. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::CachePadded; + /// + /// let padded_value = CachePadded::new(1); + /// ``` + pub const fn new(t: T) -> CachePadded<T> { + CachePadded::<T> { value: t } + } + + /// Returns the inner value. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::CachePadded; + /// + /// let padded_value = CachePadded::new(7); + /// let value = padded_value.into_inner(); + /// assert_eq!(value, 7); + /// ``` + pub fn into_inner(self) -> T { + self.value + } +} + +impl<T> Deref for CachePadded<T> { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl<T> DerefMut for CachePadded<T> { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} + +impl<T: fmt::Debug> fmt::Debug for CachePadded<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CachePadded") + .field("value", &self.value) + .finish() + } +} + +impl<T> From<T> for CachePadded<T> { + fn from(t: T) -> Self { + CachePadded::new(t) + } +} diff --git a/third_party/rust/crossbeam-utils/src/lib.rs b/third_party/rust/crossbeam-utils/src/lib.rs new file mode 100644 index 0000000000..191c5a17d1 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/lib.rs @@ -0,0 +1,104 @@ +//! Miscellaneous tools for concurrent programming. +//! +//! ## Atomics +//! +//! * [`AtomicCell`], a thread-safe mutable memory location. +//! * [`AtomicConsume`], for reading from primitive atomic types with "consume" ordering. +//! +//! ## Thread synchronization +//! +//! * [`Parker`], a thread parking primitive. +//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads. +//! * [`WaitGroup`], for synchronizing the beginning or end of some computation. +//! +//! ## Utilities +//! +//! * [`Backoff`], for exponential backoff in spin loops. +//! * [`CachePadded`], for padding and aligning a value to the length of a cache line. +//! * [`scope`], for spawning threads that borrow local variables from the stack. +//! +//! [`AtomicCell`]: atomic::AtomicCell +//! [`AtomicConsume`]: atomic::AtomicConsume +//! [`Parker`]: sync::Parker +//! [`ShardedLock`]: sync::ShardedLock +//! [`WaitGroup`]: sync::WaitGroup +//! [`scope`]: thread::scope + +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms), + allow(dead_code, unused_assignments, unused_variables) + ) +))] +#![warn( + missing_docs, + missing_debug_implementations, + rust_2018_idioms, + unreachable_pub +)] +#![cfg_attr(not(feature = "std"), no_std)] + +#[cfg(crossbeam_loom)] +#[allow(unused_imports)] +mod primitive { + pub(crate) mod sync { + pub(crate) mod atomic { + pub(crate) use loom::sync::atomic::spin_loop_hint; + pub(crate) use loom::sync::atomic::{ + AtomicBool, AtomicI16, AtomicI32, AtomicI64, AtomicI8, AtomicIsize, AtomicU16, + AtomicU32, AtomicU64, AtomicU8, AtomicUsize, + }; + + // FIXME: loom does not support compiler_fence at the moment. + // https://github.com/tokio-rs/loom/issues/117 + // we use fence as a stand-in for compiler_fence for the time being. + // this may miss some races since fence is stronger than compiler_fence, + // but it's the best we can do for the time being. + pub(crate) use loom::sync::atomic::fence as compiler_fence; + } + pub(crate) use loom::sync::{Arc, Condvar, Mutex}; + } +} +#[cfg(not(crossbeam_loom))] +#[allow(unused_imports)] +mod primitive { + pub(crate) mod sync { + pub(crate) mod atomic { + pub(crate) use core::sync::atomic::compiler_fence; + // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, + // use [`core::hint::spin_loop`] instead. + #[allow(deprecated)] + pub(crate) use core::sync::atomic::spin_loop_hint; + #[cfg(not(crossbeam_no_atomic))] + pub(crate) use core::sync::atomic::{ + AtomicBool, AtomicI16, AtomicI32, AtomicI8, AtomicIsize, AtomicU16, AtomicU32, + AtomicU8, AtomicUsize, + }; + #[cfg(not(crossbeam_no_atomic_64))] + pub(crate) use core::sync::atomic::{AtomicI64, AtomicU64}; + } + + #[cfg(feature = "std")] + pub(crate) use std::sync::{Arc, Condvar, Mutex}; + } +} + +pub mod atomic; + +mod cache_padded; +pub use crate::cache_padded::CachePadded; + +mod backoff; +pub use crate::backoff::Backoff; + +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(feature = "std")] { + pub mod sync; + + #[cfg(not(crossbeam_loom))] + pub mod thread; + } +} diff --git a/third_party/rust/crossbeam-utils/src/sync/mod.rs b/third_party/rust/crossbeam-utils/src/sync/mod.rs new file mode 100644 index 0000000000..f9eec71fb3 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/mod.rs @@ -0,0 +1,17 @@ +//! Thread synchronization primitives. +//! +//! * [`Parker`], a thread parking primitive. +//! * [`ShardedLock`], a sharded reader-writer lock with fast concurrent reads. +//! * [`WaitGroup`], for synchronizing the beginning or end of some computation. + +#[cfg(not(crossbeam_loom))] +mod once_lock; +mod parker; +#[cfg(not(crossbeam_loom))] +mod sharded_lock; +mod wait_group; + +pub use self::parker::{Parker, Unparker}; +#[cfg(not(crossbeam_loom))] +pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; +pub use self::wait_group::WaitGroup; diff --git a/third_party/rust/crossbeam-utils/src/sync/once_lock.rs b/third_party/rust/crossbeam-utils/src/sync/once_lock.rs new file mode 100644 index 0000000000..c1fefc96cc --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/once_lock.rs @@ -0,0 +1,103 @@ +// Based on unstable std::sync::OnceLock. +// +// Source: https://github.com/rust-lang/rust/blob/8e9c93df464b7ada3fc7a1c8ccddd9dcb24ee0a0/library/std/src/sync/once_lock.rs + +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Once; + +pub(crate) struct OnceLock<T> { + once: Once, + // Once::is_completed requires Rust 1.43, so use this to track of whether they have been initialized. + is_initialized: AtomicBool, + value: UnsafeCell<MaybeUninit<T>>, + // Unlike std::sync::OnceLock, we don't need PhantomData here because + // we don't use #[may_dangle]. +} + +unsafe impl<T: Sync + Send> Sync for OnceLock<T> {} +unsafe impl<T: Send> Send for OnceLock<T> {} + +impl<T> OnceLock<T> { + /// Creates a new empty cell. + #[must_use] + pub(crate) const fn new() -> Self { + Self { + once: Once::new(), + is_initialized: AtomicBool::new(false), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Gets the contents of the cell, initializing it with `f` if the cell + /// was empty. + /// + /// Many threads may call `get_or_init` concurrently with different + /// initializing functions, but it is guaranteed that only one function + /// will be executed. + /// + /// # Panics + /// + /// If `f` panics, the panic is propagated to the caller, and the cell + /// remains uninitialized. + /// + /// It is an error to reentrantly initialize the cell from `f`. The + /// exact outcome is unspecified. Current implementation deadlocks, but + /// this may be changed to a panic in the future. + pub(crate) fn get_or_init<F>(&self, f: F) -> &T + where + F: FnOnce() -> T, + { + // Fast path check + if self.is_initialized() { + // SAFETY: The inner value has been initialized + return unsafe { self.get_unchecked() }; + } + self.initialize(f); + + debug_assert!(self.is_initialized()); + + // SAFETY: The inner value has been initialized + unsafe { self.get_unchecked() } + } + + #[inline] + fn is_initialized(&self) -> bool { + self.is_initialized.load(Ordering::Acquire) + } + + #[cold] + fn initialize<F>(&self, f: F) + where + F: FnOnce() -> T, + { + let slot = self.value.get().cast::<T>(); + let is_initialized = &self.is_initialized; + + self.once.call_once(|| { + let value = f(); + unsafe { + slot.write(value); + } + is_initialized.store(true, Ordering::Release); + }); + } + + /// # Safety + /// + /// The value must be initialized + unsafe fn get_unchecked(&self) -> &T { + debug_assert!(self.is_initialized()); + &*self.value.get().cast::<T>() + } +} + +impl<T> Drop for OnceLock<T> { + fn drop(&mut self) { + if self.is_initialized() { + // SAFETY: The inner value has been initialized + unsafe { self.value.get().cast::<T>().drop_in_place() }; + } + } +} diff --git a/third_party/rust/crossbeam-utils/src/sync/parker.rs b/third_party/rust/crossbeam-utils/src/sync/parker.rs new file mode 100644 index 0000000000..e791c44852 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/parker.rs @@ -0,0 +1,413 @@ +use crate::primitive::sync::atomic::AtomicUsize; +use crate::primitive::sync::{Arc, Condvar, Mutex}; +use core::sync::atomic::Ordering::SeqCst; +use std::fmt; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; + +/// A thread parking primitive. +/// +/// Conceptually, each `Parker` has an associated token which is initially not present: +/// +/// * The [`park`] method blocks the current thread unless or until the token is available, at +/// which point it automatically consumes the token. +/// +/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for +/// a specified maximum time. +/// +/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the +/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call +/// returning immediately. +/// +/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using +/// [`park`] and [`unpark`]. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_utils::sync::Parker; +/// +/// let p = Parker::new(); +/// let u = p.unparker().clone(); +/// +/// // Make the token available. +/// u.unpark(); +/// // Wakes up immediately and consumes the token. +/// p.park(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// u.unpark(); +/// }); +/// +/// // Wakes up when `u.unpark()` provides the token. +/// p.park(); +/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +/// ``` +/// +/// [`park`]: Parker::park +/// [`park_timeout`]: Parker::park_timeout +/// [`park_deadline`]: Parker::park_deadline +/// [`unpark`]: Unparker::unpark +pub struct Parker { + unparker: Unparker, + _marker: PhantomData<*const ()>, +} + +unsafe impl Send for Parker {} + +impl Default for Parker { + fn default() -> Self { + Self { + unparker: Unparker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), + cvar: Condvar::new(), + }), + }, + _marker: PhantomData, + } + } +} + +impl Parker { + /// Creates a new `Parker`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// ``` + /// + pub fn new() -> Parker { + Self::default() + } + + /// Blocks the current thread until the token is made available. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + pub fn park(&self) { + self.unparker.inner.park(None); + } + + /// Blocks the current thread until the token is made available, but only for a limited time. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_timeout(Duration::from_millis(500)); + /// ``` + pub fn park_timeout(&self, timeout: Duration) { + self.park_deadline(Instant::now() + timeout) + } + + /// Blocks the current thread until the token is made available, or until a certain deadline. + /// + /// # Examples + /// + /// ``` + /// use std::time::{Duration, Instant}; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_deadline(deadline); + /// ``` + pub fn park_deadline(&self, deadline: Instant) { + self.unparker.inner.park(Some(deadline)) + } + + /// Returns a reference to an associated [`Unparker`]. + /// + /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + /// + /// [`park`]: Parker::park + /// [`park_timeout`]: Parker::park_timeout + pub fn unparker(&self) -> &Unparker { + &self.unparker + } + + /// Converts a `Parker` into a raw pointer. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let raw = Parker::into_raw(p); + /// # let _ = unsafe { Parker::from_raw(raw) }; + /// ``` + pub fn into_raw(this: Parker) -> *const () { + Unparker::into_raw(this.unparker) + } + + /// Converts a raw pointer into a `Parker`. + /// + /// # Safety + /// + /// This method is safe to use only with pointers returned by [`Parker::into_raw`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let raw = Parker::into_raw(p); + /// let p = unsafe { Parker::from_raw(raw) }; + /// ``` + pub unsafe fn from_raw(ptr: *const ()) -> Parker { + Parker { + unparker: Unparker::from_raw(ptr), + _marker: PhantomData, + } + } +} + +impl fmt::Debug for Parker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Parker { .. }") + } +} + +/// Unparks a thread parked by the associated [`Parker`]. +pub struct Unparker { + inner: Arc<Inner>, +} + +unsafe impl Send for Unparker {} +unsafe impl Sync for Unparker {} + +impl Unparker { + /// Atomically makes the token available if it is not already. + /// + /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is + /// any. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// u.unpark(); + /// }); + /// + /// // Wakes up when `u.unpark()` provides the token. + /// p.park(); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// [`park`]: Parker::park + /// [`park_timeout`]: Parker::park_timeout + pub fn unpark(&self) { + self.inner.unpark() + } + + /// Converts an `Unparker` into a raw pointer. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::{Parker, Unparker}; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// let raw = Unparker::into_raw(u); + /// # let _ = unsafe { Unparker::from_raw(raw) }; + /// ``` + pub fn into_raw(this: Unparker) -> *const () { + Arc::into_raw(this.inner).cast::<()>() + } + + /// Converts a raw pointer into an `Unparker`. + /// + /// # Safety + /// + /// This method is safe to use only with pointers returned by [`Unparker::into_raw`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::{Parker, Unparker}; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// let raw = Unparker::into_raw(u); + /// let u = unsafe { Unparker::from_raw(raw) }; + /// ``` + pub unsafe fn from_raw(ptr: *const ()) -> Unparker { + Unparker { + inner: Arc::from_raw(ptr.cast::<Inner>()), + } + } +} + +impl fmt::Debug for Unparker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Unparker { .. }") + } +} + +impl Clone for Unparker { + fn clone(&self) -> Unparker { + Unparker { + inner: self.inner.clone(), + } + } +} + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +struct Inner { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl Inner { + fn park(&self, deadline: Option<Instant>) { + // If we were previously notified then we consume this notification and return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + // If the timeout is zero, then there is no need to actually block. + if let Some(deadline) = deadline { + if deadline <= Instant::now() { + return; + } + } + + // Otherwise we need to coordinate going to sleep. + let mut m = self.lock.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + // Consume this notification to avoid spurious wakeups in the next park. + Err(NOTIFIED) => { + // We must read `state` here, even though we know it will be `NOTIFIED`. This is + // because `unpark` may have been called again since we read `NOTIFIED` in the + // `compare_exchange` above. We must perform an acquire operation that synchronizes + // with that `unpark` to observe any writes it made before the call to `unpark`. To + // do that we must read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } + Err(n) => panic!("inconsistent park_timeout state: {}", n), + } + + loop { + // Block the current thread on the conditional variable. + m = match deadline { + None => self.cvar.wait(m).unwrap(), + Some(deadline) => { + let now = Instant::now(); + if now < deadline { + // We could check for a timeout here, in the return value of wait_timeout, + // but in the case that a timeout and an unpark arrive simultaneously, we + // prefer to report the former. + self.cvar.wait_timeout(m, deadline - now).unwrap().0 + } else { + // We've timed out; swap out the state back to empty on our way out + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED | PARKED => return, + n => panic!("inconsistent park_timeout state: {}", n), + }; + } + } + }; + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; + } + + // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught + // in the branch above, when we discover the deadline is in the past + } + } + + pub(crate) fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before this call, we must + // perform a release operation that `park` can synchronize with. To do that we must write + // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather + // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to `PARKED` (or last + // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. + // If we were to notify during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this + // stage so we can acquire `lock` to wait until it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes + // it doesn't get woken only to have to wait for us to release `lock`. + drop(self.lock.lock().unwrap()); + self.cvar.notify_one(); + } +} diff --git a/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs b/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs new file mode 100644 index 0000000000..b43c55ea42 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/sharded_lock.rs @@ -0,0 +1,634 @@ +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::fmt; +use std::marker::PhantomData; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult}; +use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::thread::{self, ThreadId}; + +use crate::sync::once_lock::OnceLock; +use crate::CachePadded; + +/// The number of shards per sharded lock. Must be a power of two. +const NUM_SHARDS: usize = 8; + +/// A shard containing a single reader-writer lock. +struct Shard { + /// The inner reader-writer lock. + lock: RwLock<()>, + + /// The write-guard keeping this shard locked. + /// + /// Write operations will lock each shard and store the guard here. These guards get dropped at + /// the same time the big guard is dropped. + write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>, +} + +/// A sharded reader-writer lock. +/// +/// This lock is equivalent to [`RwLock`], except read operations are faster and write operations +/// are slower. +/// +/// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a +/// single cache line. Read operations will pick one of the shards depending on the current thread +/// and lock it. Write operations need to lock all shards in succession. +/// +/// By splitting the lock into shards, concurrent read operations will in most cases choose +/// different shards and thus update different cache lines, which is good for scalability. However, +/// write operations need to do more work and are therefore slower than usual. +/// +/// The priority policy of the lock is dependent on the underlying operating system's +/// implementation, and this type does not guarantee that any particular policy will be used. +/// +/// # Poisoning +/// +/// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be +/// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any +/// read operation, the lock will not be poisoned. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::sync::ShardedLock; +/// +/// let lock = ShardedLock::new(5); +/// +/// // Any number of read locks can be held at once. +/// { +/// let r1 = lock.read().unwrap(); +/// let r2 = lock.read().unwrap(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // Read locks are dropped at this point. +/// +/// // However, only one write lock may be held. +/// { +/// let mut w = lock.write().unwrap(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // Write lock is dropped here. +/// ``` +/// +/// [`RwLock`]: std::sync::RwLock +pub struct ShardedLock<T: ?Sized> { + /// A list of locks protecting the internal data. + shards: Box<[CachePadded<Shard>]>, + + /// The internal data. + value: UnsafeCell<T>, +} + +unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {} +unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {} + +impl<T: ?Sized> UnwindSafe for ShardedLock<T> {} +impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {} + +impl<T> ShardedLock<T> { + /// Creates a new sharded reader-writer lock. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(5); + /// ``` + pub fn new(value: T) -> ShardedLock<T> { + ShardedLock { + shards: (0..NUM_SHARDS) + .map(|_| { + CachePadded::new(Shard { + lock: RwLock::new(()), + write_guard: UnsafeCell::new(None), + }) + }) + .collect::<Box<[_]>>(), + value: UnsafeCell::new(value), + } + } + + /// Consumes this lock, returning the underlying data. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(String::new()); + /// { + /// let mut s = lock.write().unwrap(); + /// *s = "modified".to_owned(); + /// } + /// assert_eq!(lock.into_inner().unwrap(), "modified"); + /// ``` + pub fn into_inner(self) -> LockResult<T> { + let is_poisoned = self.is_poisoned(); + let inner = self.value.into_inner(); + + if is_poisoned { + Err(PoisonError::new(inner)) + } else { + Ok(inner) + } + } +} + +impl<T: ?Sized> ShardedLock<T> { + /// Returns `true` if the lock is poisoned. + /// + /// If another thread can still access the lock, it may become poisoned at any time. A `false` + /// result should not be trusted without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let lock = Arc::new(ShardedLock::new(0)); + /// let c_lock = lock.clone(); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_lock.write().unwrap(); + /// panic!(); // the lock gets poisoned + /// }).join(); + /// assert_eq!(lock.is_poisoned(), true); + /// ``` + pub fn is_poisoned(&self) -> bool { + self.shards[0].lock.is_poisoned() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the lock mutably, no actual locking needs to take place. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let mut lock = ShardedLock::new(0); + /// *lock.get_mut().unwrap() = 10; + /// assert_eq!(*lock.read().unwrap(), 10); + /// ``` + pub fn get_mut(&mut self) -> LockResult<&mut T> { + let is_poisoned = self.is_poisoned(); + let inner = unsafe { &mut *self.value.get() }; + + if is_poisoned { + Err(PoisonError::new(inner)) + } else { + Ok(inner) + } + } + + /// Attempts to acquire this lock with shared read access. + /// + /// If the access could not be granted at this time, an error is returned. Otherwise, a guard + /// is returned which will release the shared access when it is dropped. This method does not + /// provide any guarantees with respect to the ordering of whether contentious readers or + /// writers will acquire the lock first. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// match lock.try_read() { + /// Ok(n) => assert_eq!(*n, 1), + /// Err(_) => unreachable!(), + /// }; + /// ``` + pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let current_index = current_index().unwrap_or(0); + let shard_index = current_index & (self.shards.len() - 1); + + match self.shards[shard_index].lock.try_read() { + Ok(guard) => Ok(ShardedLockReadGuard { + lock: self, + _guard: guard, + _marker: PhantomData, + }), + Err(TryLockError::Poisoned(err)) => { + let guard = ShardedLockReadGuard { + lock: self, + _guard: err.into_inner(), + _marker: PhantomData, + }; + Err(TryLockError::Poisoned(PoisonError::new(guard))) + } + Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock), + } + } + + /// Locks with shared read access, blocking the current thread until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns a guard which will release the shared access when dropped. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Panics + /// + /// This method might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let lock = Arc::new(ShardedLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// thread::spawn(move || { + /// let r = c_lock.read(); + /// assert!(r.is_ok()); + /// }).join().unwrap(); + /// ``` + pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let current_index = current_index().unwrap_or(0); + let shard_index = current_index & (self.shards.len() - 1); + + match self.shards[shard_index].lock.read() { + Ok(guard) => Ok(ShardedLockReadGuard { + lock: self, + _guard: guard, + _marker: PhantomData, + }), + Err(err) => Err(PoisonError::new(ShardedLockReadGuard { + lock: self, + _guard: err.into_inner(), + _marker: PhantomData, + })), + } + } + + /// Attempts to acquire this lock with exclusive write access. + /// + /// If the access could not be granted at this time, an error is returned. Otherwise, a guard + /// is returned which will release the exclusive access when it is dropped. This method does + /// not provide any guarantees with respect to the ordering of whether contentious readers or + /// writers will acquire the lock first. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// assert!(lock.try_write().is_err()); + /// ``` + pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> { + let mut poisoned = false; + let mut blocked = None; + + // Write-lock each shard in succession. + for (i, shard) in self.shards.iter().enumerate() { + let guard = match shard.lock.try_write() { + Ok(guard) => guard, + Err(TryLockError::Poisoned(err)) => { + poisoned = true; + err.into_inner() + } + Err(TryLockError::WouldBlock) => { + blocked = Some(i); + break; + } + }; + + // Store the guard into the shard. + unsafe { + let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard); + let dest: *mut _ = shard.write_guard.get(); + *dest = Some(guard); + } + } + + if let Some(i) = blocked { + // Unlock the shards in reverse order of locking. + for shard in self.shards[0..i].iter().rev() { + unsafe { + let dest: *mut _ = shard.write_guard.get(); + let guard = mem::replace(&mut *dest, None); + drop(guard); + } + } + Err(TryLockError::WouldBlock) + } else if poisoned { + let guard = ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }; + Err(TryLockError::Poisoned(PoisonError::new(guard))) + } else { + Ok(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }) + } + } + + /// Locks with exclusive write access, blocking the current thread until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns a guard which will release the exclusive access when dropped. + /// + /// # Errors + /// + /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write + /// operation panics. + /// + /// # Panics + /// + /// This method might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::ShardedLock; + /// + /// let lock = ShardedLock::new(1); + /// + /// let mut n = lock.write().unwrap(); + /// *n = 2; + /// + /// assert!(lock.try_read().is_err()); + /// ``` + pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> { + let mut poisoned = false; + + // Write-lock each shard in succession. + for shard in self.shards.iter() { + let guard = match shard.lock.write() { + Ok(guard) => guard, + Err(err) => { + poisoned = true; + err.into_inner() + } + }; + + // Store the guard into the shard. + unsafe { + let guard: RwLockWriteGuard<'_, ()> = guard; + let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard); + let dest: *mut _ = shard.write_guard.get(); + *dest = Some(guard); + } + } + + if poisoned { + Err(PoisonError::new(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + })) + } else { + Ok(ShardedLockWriteGuard { + lock: self, + _marker: PhantomData, + }) + } + } +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.try_read() { + Ok(guard) => f + .debug_struct("ShardedLock") + .field("data", &&*guard) + .finish(), + Err(TryLockError::Poisoned(err)) => f + .debug_struct("ShardedLock") + .field("data", &&**err.get_ref()) + .finish(), + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("<locked>") + } + } + f.debug_struct("ShardedLock") + .field("data", &LockedPlaceholder) + .finish() + } + } + } +} + +impl<T: Default> Default for ShardedLock<T> { + fn default() -> ShardedLock<T> { + ShardedLock::new(Default::default()) + } +} + +impl<T> From<T> for ShardedLock<T> { + fn from(t: T) -> Self { + ShardedLock::new(t) + } +} + +/// A guard used to release the shared read access of a [`ShardedLock`] when dropped. +pub struct ShardedLockReadGuard<'a, T: ?Sized> { + lock: &'a ShardedLock<T>, + _guard: RwLockReadGuard<'a, ()>, + _marker: PhantomData<RwLockReadGuard<'a, T>>, +} + +unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {} + +impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.value.get() } + } +} + +impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShardedLockReadGuard") + .field("lock", &self.lock) + .finish() + } +} + +impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +/// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped. +pub struct ShardedLockWriteGuard<'a, T: ?Sized> { + lock: &'a ShardedLock<T>, + _marker: PhantomData<RwLockWriteGuard<'a, T>>, +} + +unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {} + +impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> { + fn drop(&mut self) { + // Unlock the shards in reverse order of locking. + for shard in self.lock.shards.iter().rev() { + unsafe { + let dest: *mut _ = shard.write_guard.get(); + let guard = mem::replace(&mut *dest, None); + drop(guard); + } + } + } +} + +impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShardedLockWriteGuard") + .field("lock", &self.lock) + .finish() + } +} + +impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.value.get() } + } +} + +impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.value.get() } + } +} + +/// Returns a `usize` that identifies the current thread. +/// +/// Each thread is associated with an 'index'. While there are no particular guarantees, indices +/// usually tend to be consecutive numbers between 0 and the number of running threads. +/// +/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is +/// tearing down. +#[inline] +fn current_index() -> Option<usize> { + REGISTRATION.try_with(|reg| reg.index).ok() +} + +/// The global registry keeping track of registered threads and indices. +struct ThreadIndices { + /// Mapping from `ThreadId` to thread index. + mapping: HashMap<ThreadId, usize>, + + /// A list of free indices. + free_list: Vec<usize>, + + /// The next index to allocate if the free list is empty. + next_index: usize, +} + +fn thread_indices() -> &'static Mutex<ThreadIndices> { + static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new(); + fn init() -> Mutex<ThreadIndices> { + Mutex::new(ThreadIndices { + mapping: HashMap::new(), + free_list: Vec::new(), + next_index: 0, + }) + } + THREAD_INDICES.get_or_init(init) +} + +/// A registration of a thread with an index. +/// +/// When dropped, unregisters the thread and frees the reserved index. +struct Registration { + index: usize, + thread_id: ThreadId, +} + +impl Drop for Registration { + fn drop(&mut self) { + let mut indices = thread_indices().lock().unwrap(); + indices.mapping.remove(&self.thread_id); + indices.free_list.push(self.index); + } +} + +thread_local! { + static REGISTRATION: Registration = { + let thread_id = thread::current().id(); + let mut indices = thread_indices().lock().unwrap(); + + let index = match indices.free_list.pop() { + Some(i) => i, + None => { + let i = indices.next_index; + indices.next_index += 1; + i + } + }; + indices.mapping.insert(thread_id, index); + + Registration { + index, + thread_id, + } + }; +} diff --git a/third_party/rust/crossbeam-utils/src/sync/wait_group.rs b/third_party/rust/crossbeam-utils/src/sync/wait_group.rs new file mode 100644 index 0000000000..19d6074157 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/wait_group.rs @@ -0,0 +1,145 @@ +use crate::primitive::sync::{Arc, Condvar, Mutex}; +use std::fmt; + +/// Enables threads to synchronize the beginning or end of some computation. +/// +/// # Wait groups vs barriers +/// +/// `WaitGroup` is very similar to [`Barrier`], but there are a few differences: +/// +/// * [`Barrier`] needs to know the number of threads at construction, while `WaitGroup` is cloned to +/// register more threads. +/// +/// * A [`Barrier`] can be reused even after all threads have synchronized, while a `WaitGroup` +/// synchronizes threads only once. +/// +/// * All threads wait for others to reach the [`Barrier`]. With `WaitGroup`, each thread can choose +/// to either wait for other threads or to continue without blocking. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::sync::WaitGroup; +/// use std::thread; +/// +/// // Create a new wait group. +/// let wg = WaitGroup::new(); +/// +/// for _ in 0..4 { +/// // Create another reference to the wait group. +/// let wg = wg.clone(); +/// +/// thread::spawn(move || { +/// // Do some work. +/// +/// // Drop the reference to the wait group. +/// drop(wg); +/// }); +/// } +/// +/// // Block until all threads have finished their work. +/// wg.wait(); +/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +/// ``` +/// +/// [`Barrier`]: std::sync::Barrier +pub struct WaitGroup { + inner: Arc<Inner>, +} + +/// Inner state of a `WaitGroup`. +struct Inner { + cvar: Condvar, + count: Mutex<usize>, +} + +impl Default for WaitGroup { + fn default() -> Self { + Self { + inner: Arc::new(Inner { + cvar: Condvar::new(), + count: Mutex::new(1), + }), + } + } +} + +impl WaitGroup { + /// Creates a new wait group and returns the single reference to it. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::WaitGroup; + /// + /// let wg = WaitGroup::new(); + /// ``` + pub fn new() -> Self { + Self::default() + } + + /// Drops this reference and waits until all other references are dropped. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::WaitGroup; + /// use std::thread; + /// + /// let wg = WaitGroup::new(); + /// + /// thread::spawn({ + /// let wg = wg.clone(); + /// move || { + /// // Block until both threads have reached `wait()`. + /// wg.wait(); + /// } + /// }); + /// + /// // Block until both threads have reached `wait()`. + /// wg.wait(); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + pub fn wait(self) { + if *self.inner.count.lock().unwrap() == 1 { + return; + } + + let inner = self.inner.clone(); + drop(self); + + let mut count = inner.count.lock().unwrap(); + while *count > 0 { + count = inner.cvar.wait(count).unwrap(); + } + } +} + +impl Drop for WaitGroup { + fn drop(&mut self) { + let mut count = self.inner.count.lock().unwrap(); + *count -= 1; + + if *count == 0 { + self.inner.cvar.notify_all(); + } + } +} + +impl Clone for WaitGroup { + fn clone(&self) -> WaitGroup { + let mut count = self.inner.count.lock().unwrap(); + *count += 1; + + WaitGroup { + inner: self.inner.clone(), + } + } +} + +impl fmt::Debug for WaitGroup { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let count: &usize = &*self.inner.count.lock().unwrap(); + f.debug_struct("WaitGroup").field("count", count).finish() + } +} diff --git a/third_party/rust/crossbeam-utils/src/thread.rs b/third_party/rust/crossbeam-utils/src/thread.rs new file mode 100644 index 0000000000..f1086d9ec3 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/thread.rs @@ -0,0 +1,587 @@ +//! Threads that can borrow variables from the stack. +//! +//! Create a scope when spawned threads need to access variables on the stack: +//! +//! ``` +//! use crossbeam_utils::thread; +//! +//! let people = vec![ +//! "Alice".to_string(), +//! "Bob".to_string(), +//! "Carol".to_string(), +//! ]; +//! +//! thread::scope(|s| { +//! for person in &people { +//! s.spawn(move |_| { +//! println!("Hello, {}!", person); +//! }); +//! } +//! }).unwrap(); +//! ``` +//! +//! # Why scoped threads? +//! +//! Suppose we wanted to re-write the previous example using plain threads: +//! +//! ```compile_fail,E0597 +//! use std::thread; +//! +//! let people = vec![ +//! "Alice".to_string(), +//! "Bob".to_string(), +//! "Carol".to_string(), +//! ]; +//! +//! let mut threads = Vec::new(); +//! +//! for person in &people { +//! threads.push(thread::spawn(move || { +//! println!("Hello, {}!", person); +//! })); +//! } +//! +//! for thread in threads { +//! thread.join().unwrap(); +//! } +//! ``` +//! +//! This doesn't work because the borrow checker complains about `people` not living long enough: +//! +//! ```text +//! error[E0597]: `people` does not live long enough +//! --> src/main.rs:12:20 +//! | +//! 12 | for person in &people { +//! | ^^^^^^ borrowed value does not live long enough +//! ... +//! 21 | } +//! | - borrowed value only lives until here +//! | +//! = note: borrowed value must be valid for the static lifetime... +//! ``` +//! +//! The problem here is that spawned threads are not allowed to borrow variables on stack because +//! the compiler cannot prove they will be joined before `people` is destroyed. +//! +//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined +//! before the scope ends. +//! +//! # How scoped threads work +//! +//! If a variable is borrowed by a thread, the thread must complete before the variable is +//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the +//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete. +//! +//! A scope creates a clear boundary between variables outside the scope and threads inside the +//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends. +//! This way we guarantee to the borrow checker that scoped threads only live within the scope and +//! can safely access variables outside it. +//! +//! # Nesting scoped threads +//! +//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little +//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such +//! cannot be borrowed by scoped threads: +//! +//! ```compile_fail,E0373,E0521 +//! use crossbeam_utils::thread; +//! +//! thread::scope(|s| { +//! s.spawn(|_| { +//! // Not going to compile because we're trying to borrow `s`, +//! // which lives *inside* the scope! :( +//! s.spawn(|_| println!("nested thread")); +//! }); +//! }); +//! ``` +//! +//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an +//! argument, which can be used for spawning nested threads: +//! +//! ``` +//! use crossbeam_utils::thread; +//! +//! thread::scope(|s| { +//! // Note the `|s|` here. +//! s.spawn(|s| { +//! // Yay, this works because we're using a fresh argument `s`! :) +//! s.spawn(|_| println!("nested thread")); +//! }); +//! }).unwrap(); +//! ``` + +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::mem; +use std::panic; +use std::sync::{Arc, Mutex}; +use std::thread; + +use crate::sync::WaitGroup; +use cfg_if::cfg_if; + +type SharedVec<T> = Arc<Mutex<Vec<T>>>; +type SharedOption<T> = Arc<Mutex<Option<T>>>; + +/// Creates a new scope for spawning threads. +/// +/// All child threads that haven't been manually joined will be automatically joined just before +/// this function invocation ends. If all joined threads have successfully completed, `Ok` is +/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is +/// returned containing errors from panicked threads. Note that if panics are implemented by +/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind]. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::thread; +/// +/// let var = vec![1, 2, 3]; +/// +/// thread::scope(|s| { +/// s.spawn(|_| { +/// println!("A child thread borrowing `var`: {:?}", var); +/// }); +/// }).unwrap(); +/// ``` +pub fn scope<'env, F, R>(f: F) -> thread::Result<R> +where + F: FnOnce(&Scope<'env>) -> R, +{ + let wg = WaitGroup::new(); + let scope = Scope::<'env> { + handles: SharedVec::default(), + wait_group: wg.clone(), + _marker: PhantomData, + }; + + // Execute the scoped function, but catch any panics. + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope))); + + // Wait until all nested scopes are dropped. + drop(scope.wait_group); + wg.wait(); + + // Join all remaining spawned threads. + let panics: Vec<_> = scope + .handles + .lock() + .unwrap() + // Filter handles that haven't been joined, join them, and collect errors. + .drain(..) + .filter_map(|handle| handle.lock().unwrap().take()) + .filter_map(|handle| handle.join().err()) + .collect(); + + // If `f` has panicked, resume unwinding. + // If any of the child threads have panicked, return the panic errors. + // Otherwise, everything is OK and return the result of `f`. + match result { + Err(err) => panic::resume_unwind(err), + Ok(res) => { + if panics.is_empty() { + Ok(res) + } else { + Err(Box::new(panics)) + } + } + } +} + +/// A scope for spawning threads. +pub struct Scope<'env> { + /// The list of the thread join handles. + handles: SharedVec<SharedOption<thread::JoinHandle<()>>>, + + /// Used to wait until all subscopes all dropped. + wait_group: WaitGroup, + + /// Borrows data with invariant lifetime `'env`. + _marker: PhantomData<&'env mut &'env ()>, +} + +unsafe impl Sync for Scope<'_> {} + +impl<'env> Scope<'env> { + /// Spawns a scoped thread. + /// + /// This method is similar to the [`spawn`] function in Rust's standard library. The difference + /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits, + /// allowing it to reference variables outside the scope. + /// + /// The scoped thread is passed a reference to this scope as an argument, which can be used for + /// spawning nested threads. + /// + /// The returned [handle](ScopedJoinHandle) can be used to manually + /// [join](ScopedJoinHandle::join) the thread before the scope exits. + /// + /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the + /// stack size or the name of the thread, use this API instead. + /// + /// [`spawn`]: std::thread::spawn + /// + /// # Panics + /// + /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`] + /// to recover from such errors. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.spawn(|_| { + /// println!("A child thread is running"); + /// 42 + /// }); + /// + /// // Join the thread and retrieve its result. + /// let res = handle.join().unwrap(); + /// assert_eq!(res, 42); + /// }).unwrap(); + /// ``` + pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce(&Scope<'env>) -> T, + F: Send + 'env, + T: Send + 'env, + { + self.builder() + .spawn(f) + .expect("failed to spawn scoped thread") + } + + /// Creates a builder that can configure a thread before spawning. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// s.builder() + /// .spawn(|_| println!("A child thread is running")) + /// .unwrap(); + /// }).unwrap(); + /// ``` + pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> { + ScopedThreadBuilder { + scope: self, + builder: thread::Builder::new(), + } + } +} + +impl fmt::Debug for Scope<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Scope { .. }") + } +} + +/// Configures the properties of a new thread. +/// +/// The two configurable properties are: +/// +/// - [`name`]: Specifies an [associated name for the thread][naming-threads]. +/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size]. +/// +/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the +/// thread handle with the given configuration. +/// +/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return +/// value. You may want to use this builder when you want to recover from a failure to launch a +/// thread. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::thread; +/// +/// thread::scope(|s| { +/// s.builder() +/// .spawn(|_| println!("Running a child thread")) +/// .unwrap(); +/// }).unwrap(); +/// ``` +/// +/// [`name`]: ScopedThreadBuilder::name +/// [`stack_size`]: ScopedThreadBuilder::stack_size +/// [`spawn`]: ScopedThreadBuilder::spawn +/// [`io::Result`]: std::io::Result +/// [naming-threads]: std::thread#naming-threads +/// [stack-size]: std::thread#stack-size +#[derive(Debug)] +pub struct ScopedThreadBuilder<'scope, 'env> { + scope: &'scope Scope<'env>, + builder: thread::Builder, +} + +impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> { + /// Sets the name for the new thread. + /// + /// The name must not contain null bytes (`\0`). + /// + /// For more information about named threads, see [here][naming-threads]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// use std::thread::current; + /// + /// thread::scope(|s| { + /// s.builder() + /// .name("my thread".to_string()) + /// .spawn(|_| assert_eq!(current().name(), Some("my thread"))) + /// .unwrap(); + /// }).unwrap(); + /// ``` + /// + /// [naming-threads]: std::thread#naming-threads + pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> { + self.builder = self.builder.name(name); + self + } + + /// Sets the size of the stack for the new thread. + /// + /// The stack size is measured in bytes. + /// + /// For more information about the stack size for threads, see [here][stack-size]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// s.builder() + /// .stack_size(32 * 1024) + /// .spawn(|_| println!("Running a child thread")) + /// .unwrap(); + /// }).unwrap(); + /// ``` + /// + /// [stack-size]: std::thread#stack-size + pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> { + self.builder = self.builder.stack_size(size); + self + } + + /// Spawns a scoped thread with this configuration. + /// + /// The scoped thread is passed a reference to this scope as an argument, which can be used for + /// spawning nested threads. + /// + /// The returned handle can be used to manually join the thread before the scope exits. + /// + /// # Errors + /// + /// Unlike the [`Scope::spawn`] method, this method yields an + /// [`io::Result`] to capture any failure to create the thread at + /// the OS level. + /// + /// [`io::Result`]: std::io::Result + /// + /// # Panics + /// + /// Panics if a thread name was set and it contained null bytes. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.builder() + /// .spawn(|_| { + /// println!("A child thread is running"); + /// 42 + /// }) + /// .unwrap(); + /// + /// // Join the thread and retrieve its result. + /// let res = handle.join().unwrap(); + /// assert_eq!(res, 42); + /// }).unwrap(); + /// ``` + pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>> + where + F: FnOnce(&Scope<'env>) -> T, + F: Send + 'env, + T: Send + 'env, + { + // The result of `f` will be stored here. + let result = SharedOption::default(); + + // Spawn the thread and grab its join handle and thread handle. + let (handle, thread) = { + let result = Arc::clone(&result); + + // A clone of the scope that will be moved into the new thread. + let scope = Scope::<'env> { + handles: Arc::clone(&self.scope.handles), + wait_group: self.scope.wait_group.clone(), + _marker: PhantomData, + }; + + // Spawn the thread. + let handle = { + let closure = move || { + // Make sure the scope is inside the closure with the proper `'env` lifetime. + let scope: Scope<'env> = scope; + + // Run the closure. + let res = f(&scope); + + // Store the result if the closure didn't panic. + *result.lock().unwrap() = Some(res); + }; + + // Allocate `closure` on the heap and erase the `'env` bound. + let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure); + let closure: Box<dyn FnOnce() + Send + 'static> = + unsafe { mem::transmute(closure) }; + + // Finally, spawn the closure. + self.builder.spawn(closure)? + }; + + let thread = handle.thread().clone(); + let handle = Arc::new(Mutex::new(Some(handle))); + (handle, thread) + }; + + // Add the handle to the shared list of join handles. + self.scope.handles.lock().unwrap().push(Arc::clone(&handle)); + + Ok(ScopedJoinHandle { + handle, + result, + thread, + _marker: PhantomData, + }) + } +} + +unsafe impl<T> Send for ScopedJoinHandle<'_, T> {} +unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {} + +/// A handle that can be used to join its scoped thread. +/// +/// This struct is created by the [`Scope::spawn`] method and the +/// [`ScopedThreadBuilder::spawn`] method. +pub struct ScopedJoinHandle<'scope, T> { + /// A join handle to the spawned thread. + handle: SharedOption<thread::JoinHandle<()>>, + + /// Holds the result of the inner closure. + result: SharedOption<T>, + + /// A handle to the the spawned thread. + thread: thread::Thread, + + /// Borrows the parent scope with lifetime `'scope`. + _marker: PhantomData<&'scope ()>, +} + +impl<T> ScopedJoinHandle<'_, T> { + /// Waits for the thread to finish and returns its result. + /// + /// If the child thread panics, an error is returned. Note that if panics are implemented by + /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind]. + /// + /// # Panics + /// + /// This function may panic on some platforms if a thread attempts to join itself or otherwise + /// may create a deadlock with joining threads. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle1 = s.spawn(|_| println!("I'm a happy thread :)")); + /// let handle2 = s.spawn(|_| panic!("I'm a sad thread :(")); + /// + /// // Join the first thread and verify that it succeeded. + /// let res = handle1.join(); + /// assert!(res.is_ok()); + /// + /// // Join the second thread and verify that it panicked. + /// let res = handle2.join(); + /// assert!(res.is_err()); + /// }).unwrap(); + /// ``` + pub fn join(self) -> thread::Result<T> { + // Take out the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap().take().unwrap(); + + // Join the thread and then take the result out of its inner closure. + handle + .join() + .map(|()| self.result.lock().unwrap().take().unwrap()) + } + + /// Returns a handle to the underlying thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.spawn(|_| println!("A child thread is running")); + /// println!("The child thread ID: {:?}", handle.thread().id()); + /// }).unwrap(); + /// ``` + pub fn thread(&self) -> &thread::Thread { + &self.thread + } +} + +cfg_if! { + if #[cfg(unix)] { + use std::os::unix::thread::{JoinHandleExt, RawPthread}; + + impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> { + fn as_pthread_t(&self) -> RawPthread { + // Borrow the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap(); + handle.as_ref().unwrap().as_pthread_t() + } + fn into_pthread_t(self) -> RawPthread { + self.as_pthread_t() + } + } + } else if #[cfg(windows)] { + use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle}; + + impl<T> AsRawHandle for ScopedJoinHandle<'_, T> { + fn as_raw_handle(&self) -> RawHandle { + // Borrow the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap(); + handle.as_ref().unwrap().as_raw_handle() + } + } + + impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> { + fn into_raw_handle(self) -> RawHandle { + self.as_raw_handle() + } + } + } +} + +impl<T> fmt::Debug for ScopedJoinHandle<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("ScopedJoinHandle { .. }") + } +} diff --git a/third_party/rust/crossbeam-utils/tests/atomic_cell.rs b/third_party/rust/crossbeam-utils/tests/atomic_cell.rs new file mode 100644 index 0000000000..a1d102210b --- /dev/null +++ b/third_party/rust/crossbeam-utils/tests/atomic_cell.rs @@ -0,0 +1,380 @@ +use std::mem; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +use crossbeam_utils::atomic::AtomicCell; + +#[test] +fn is_lock_free() { + struct UsizeWrap(usize); + struct U8Wrap(bool); + struct I16Wrap(i16); + #[repr(align(8))] + struct U64Align8(u64); + + assert!(AtomicCell::<usize>::is_lock_free()); + assert!(AtomicCell::<isize>::is_lock_free()); + assert!(AtomicCell::<UsizeWrap>::is_lock_free()); + + assert!(AtomicCell::<()>::is_lock_free()); + + assert!(AtomicCell::<u8>::is_lock_free()); + assert!(AtomicCell::<i8>::is_lock_free()); + assert!(AtomicCell::<bool>::is_lock_free()); + assert!(AtomicCell::<U8Wrap>::is_lock_free()); + + assert!(AtomicCell::<u16>::is_lock_free()); + assert!(AtomicCell::<i16>::is_lock_free()); + assert!(AtomicCell::<I16Wrap>::is_lock_free()); + + assert!(AtomicCell::<u32>::is_lock_free()); + assert!(AtomicCell::<i32>::is_lock_free()); + + // Sizes of both types must be equal, and the alignment of `u64` must be greater or equal than + // that of `AtomicU64`. In i686-unknown-linux-gnu, the alignment of `u64` is `4` and alignment + // of `AtomicU64` is `8`, so `AtomicCell<u64>` is not lock-free. + assert_eq!( + AtomicCell::<u64>::is_lock_free(), + cfg!(not(crossbeam_no_atomic_64)) + && cfg!(any( + target_pointer_width = "64", + target_pointer_width = "128" + )) + ); + assert_eq!(mem::size_of::<U64Align8>(), 8); + assert_eq!(mem::align_of::<U64Align8>(), 8); + assert_eq!( + AtomicCell::<U64Align8>::is_lock_free(), + cfg!(not(crossbeam_no_atomic_64)) + ); + + // AtomicU128 is unstable + assert!(!AtomicCell::<u128>::is_lock_free()); +} + +#[test] +fn const_is_lock_free() { + const _U: bool = AtomicCell::<usize>::is_lock_free(); + const _I: bool = AtomicCell::<isize>::is_lock_free(); +} + +#[test] +fn drops_unit() { + static CNT: AtomicUsize = AtomicUsize::new(0); + CNT.store(0, SeqCst); + + #[derive(Debug, PartialEq, Eq)] + struct Foo(); + + impl Foo { + fn new() -> Foo { + CNT.fetch_add(1, SeqCst); + Foo() + } + } + + impl Drop for Foo { + fn drop(&mut self) { + CNT.fetch_sub(1, SeqCst); + } + } + + impl Default for Foo { + fn default() -> Foo { + Foo::new() + } + } + + let a = AtomicCell::new(Foo::new()); + + assert_eq!(a.swap(Foo::new()), Foo::new()); + assert_eq!(CNT.load(SeqCst), 1); + + a.store(Foo::new()); + assert_eq!(CNT.load(SeqCst), 1); + + assert_eq!(a.swap(Foo::default()), Foo::new()); + assert_eq!(CNT.load(SeqCst), 1); + + drop(a); + assert_eq!(CNT.load(SeqCst), 0); +} + +#[test] +fn drops_u8() { + static CNT: AtomicUsize = AtomicUsize::new(0); + CNT.store(0, SeqCst); + + #[derive(Debug, PartialEq, Eq)] + struct Foo(u8); + + impl Foo { + fn new(val: u8) -> Foo { + CNT.fetch_add(1, SeqCst); + Foo(val) + } + } + + impl Drop for Foo { + fn drop(&mut self) { + CNT.fetch_sub(1, SeqCst); + } + } + + impl Default for Foo { + fn default() -> Foo { + Foo::new(0) + } + } + + let a = AtomicCell::new(Foo::new(5)); + + assert_eq!(a.swap(Foo::new(6)), Foo::new(5)); + assert_eq!(a.swap(Foo::new(1)), Foo::new(6)); + assert_eq!(CNT.load(SeqCst), 1); + + a.store(Foo::new(2)); + assert_eq!(CNT.load(SeqCst), 1); + + assert_eq!(a.swap(Foo::default()), Foo::new(2)); + assert_eq!(CNT.load(SeqCst), 1); + + assert_eq!(a.swap(Foo::default()), Foo::new(0)); + assert_eq!(CNT.load(SeqCst), 1); + + drop(a); + assert_eq!(CNT.load(SeqCst), 0); +} + +#[test] +fn drops_usize() { + static CNT: AtomicUsize = AtomicUsize::new(0); + CNT.store(0, SeqCst); + + #[derive(Debug, PartialEq, Eq)] + struct Foo(usize); + + impl Foo { + fn new(val: usize) -> Foo { + CNT.fetch_add(1, SeqCst); + Foo(val) + } + } + + impl Drop for Foo { + fn drop(&mut self) { + CNT.fetch_sub(1, SeqCst); + } + } + + impl Default for Foo { + fn default() -> Foo { + Foo::new(0) + } + } + + let a = AtomicCell::new(Foo::new(5)); + + assert_eq!(a.swap(Foo::new(6)), Foo::new(5)); + assert_eq!(a.swap(Foo::new(1)), Foo::new(6)); + assert_eq!(CNT.load(SeqCst), 1); + + a.store(Foo::new(2)); + assert_eq!(CNT.load(SeqCst), 1); + + assert_eq!(a.swap(Foo::default()), Foo::new(2)); + assert_eq!(CNT.load(SeqCst), 1); + + assert_eq!(a.swap(Foo::default()), Foo::new(0)); + assert_eq!(CNT.load(SeqCst), 1); + + drop(a); + assert_eq!(CNT.load(SeqCst), 0); +} + +#[test] +fn modular_u8() { + #[derive(Clone, Copy, Eq, Debug, Default)] + struct Foo(u8); + + impl PartialEq for Foo { + fn eq(&self, other: &Foo) -> bool { + self.0 % 5 == other.0 % 5 + } + } + + let a = AtomicCell::new(Foo(1)); + + assert_eq!(a.load(), Foo(1)); + assert_eq!(a.swap(Foo(2)), Foo(11)); + assert_eq!(a.load(), Foo(52)); + + a.store(Foo(0)); + assert_eq!(a.compare_exchange(Foo(0), Foo(5)), Ok(Foo(100))); + assert_eq!(a.load().0, 5); + assert_eq!(a.compare_exchange(Foo(10), Foo(15)), Ok(Foo(100))); + assert_eq!(a.load().0, 15); +} + +#[test] +fn modular_usize() { + #[derive(Clone, Copy, Eq, Debug, Default)] + struct Foo(usize); + + impl PartialEq for Foo { + fn eq(&self, other: &Foo) -> bool { + self.0 % 5 == other.0 % 5 + } + } + + let a = AtomicCell::new(Foo(1)); + + assert_eq!(a.load(), Foo(1)); + assert_eq!(a.swap(Foo(2)), Foo(11)); + assert_eq!(a.load(), Foo(52)); + + a.store(Foo(0)); + assert_eq!(a.compare_exchange(Foo(0), Foo(5)), Ok(Foo(100))); + assert_eq!(a.load().0, 5); + assert_eq!(a.compare_exchange(Foo(10), Foo(15)), Ok(Foo(100))); + assert_eq!(a.load().0, 15); +} + +#[test] +fn garbage_padding() { + #[derive(Copy, Clone, Eq, PartialEq)] + struct Object { + a: i64, + b: i32, + } + + let cell = AtomicCell::new(Object { a: 0, b: 0 }); + let _garbage = [0xfe, 0xfe, 0xfe, 0xfe, 0xfe]; // Needed + let next = Object { a: 0, b: 0 }; + + let prev = cell.load(); + assert!(cell.compare_exchange(prev, next).is_ok()); + println!(); +} + +#[test] +fn const_atomic_cell_new() { + static CELL: AtomicCell<usize> = AtomicCell::new(0); + + CELL.store(1); + assert_eq!(CELL.load(), 1); +} + +// https://github.com/crossbeam-rs/crossbeam/pull/767 +macro_rules! test_arithmetic { + ($test_name:ident, $ty:ident) => { + #[test] + fn $test_name() { + let a: AtomicCell<$ty> = AtomicCell::new(7); + + assert_eq!(a.fetch_add(3), 7); + assert_eq!(a.load(), 10); + + assert_eq!(a.fetch_sub(3), 10); + assert_eq!(a.load(), 7); + + assert_eq!(a.fetch_and(3), 7); + assert_eq!(a.load(), 3); + + assert_eq!(a.fetch_or(16), 3); + assert_eq!(a.load(), 19); + + assert_eq!(a.fetch_xor(2), 19); + assert_eq!(a.load(), 17); + + assert_eq!(a.fetch_max(18), 17); + assert_eq!(a.load(), 18); + + assert_eq!(a.fetch_min(17), 18); + assert_eq!(a.load(), 17); + + assert_eq!(a.fetch_nand(7), 17); + assert_eq!(a.load(), !(17 & 7)); + } + }; +} +test_arithmetic!(arithmetic_u8, u8); +test_arithmetic!(arithmetic_i8, i8); +test_arithmetic!(arithmetic_u16, u16); +test_arithmetic!(arithmetic_i16, i16); +test_arithmetic!(arithmetic_u32, u32); +test_arithmetic!(arithmetic_i32, i32); +test_arithmetic!(arithmetic_u64, u64); +test_arithmetic!(arithmetic_i64, i64); +test_arithmetic!(arithmetic_u128, u128); +test_arithmetic!(arithmetic_i128, i128); + +// https://github.com/crossbeam-rs/crossbeam/issues/748 +#[cfg_attr(miri, ignore)] // TODO +#[rustversion::since(1.37)] // #[repr(align(N))] requires Rust 1.37 +#[test] +fn issue_748() { + #[allow(dead_code)] + #[repr(align(8))] + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + enum Test { + Field(u32), + FieldLess, + } + + assert_eq!(mem::size_of::<Test>(), 8); + assert_eq!( + AtomicCell::<Test>::is_lock_free(), + cfg!(not(crossbeam_no_atomic_64)) + ); + let x = AtomicCell::new(Test::FieldLess); + assert_eq!(x.load(), Test::FieldLess); +} + +// https://github.com/crossbeam-rs/crossbeam/issues/833 +#[rustversion::since(1.40)] // const_constructor requires Rust 1.40 +#[test] +fn issue_833() { + use std::num::NonZeroU128; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + + #[cfg(miri)] + const N: usize = 10_000; + #[cfg(not(miri))] + const N: usize = 1_000_000; + + #[allow(dead_code)] + enum Enum { + NeverConstructed, + Cell(AtomicCell<NonZeroU128>), + } + + static STATIC: Enum = Enum::Cell(AtomicCell::new(match NonZeroU128::new(1) { + Some(nonzero) => nonzero, + None => unreachable!(), + })); + static FINISHED: AtomicBool = AtomicBool::new(false); + + let handle = thread::spawn(|| { + let cell = match &STATIC { + Enum::NeverConstructed => unreachable!(), + Enum::Cell(cell) => cell, + }; + let x = NonZeroU128::new(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0000).unwrap(); + let y = NonZeroU128::new(0x0000_0000_0000_0000_FFFF_FFFF_FFFF_FFFF).unwrap(); + while !FINISHED.load(Ordering::Relaxed) { + cell.store(x); + cell.store(y); + } + }); + + for _ in 0..N { + if let Enum::NeverConstructed = STATIC { + unreachable!(":("); + } + } + + FINISHED.store(true, Ordering::Relaxed); + handle.join().unwrap(); +} diff --git a/third_party/rust/crossbeam-utils/tests/cache_padded.rs b/third_party/rust/crossbeam-utils/tests/cache_padded.rs new file mode 100644 index 0000000000..86e9a7709c --- /dev/null +++ b/third_party/rust/crossbeam-utils/tests/cache_padded.rs @@ -0,0 +1,113 @@ +use std::cell::Cell; +use std::mem; + +use crossbeam_utils::CachePadded; + +#[test] +fn default() { + let x: CachePadded<u64> = Default::default(); + assert_eq!(*x, 0); +} + +#[test] +fn store_u64() { + let x: CachePadded<u64> = CachePadded::new(17); + assert_eq!(*x, 17); +} + +#[test] +fn store_pair() { + let x: CachePadded<(u64, u64)> = CachePadded::new((17, 37)); + assert_eq!(x.0, 17); + assert_eq!(x.1, 37); +} + +#[test] +fn distance() { + let arr = [CachePadded::new(17u8), CachePadded::new(37u8)]; + let a = &*arr[0] as *const u8; + let b = &*arr[1] as *const u8; + let align = mem::align_of::<CachePadded<()>>(); + assert!(align >= 32); + assert_eq!(unsafe { a.add(align) }, b); +} + +#[test] +fn different_sizes() { + CachePadded::new(17u8); + CachePadded::new(17u16); + CachePadded::new(17u32); + CachePadded::new([17u64; 0]); + CachePadded::new([17u64; 1]); + CachePadded::new([17u64; 2]); + CachePadded::new([17u64; 3]); + CachePadded::new([17u64; 4]); + CachePadded::new([17u64; 5]); + CachePadded::new([17u64; 6]); + CachePadded::new([17u64; 7]); + CachePadded::new([17u64; 8]); +} + +#[test] +fn large() { + let a = [17u64; 9]; + let b = CachePadded::new(a); + assert!(mem::size_of_val(&a) <= mem::size_of_val(&b)); +} + +#[test] +fn debug() { + assert_eq!( + format!("{:?}", CachePadded::new(17u64)), + "CachePadded { value: 17 }" + ); +} + +#[test] +fn drops() { + let count = Cell::new(0); + + struct Foo<'a>(&'a Cell<usize>); + + impl<'a> Drop for Foo<'a> { + fn drop(&mut self) { + self.0.set(self.0.get() + 1); + } + } + + let a = CachePadded::new(Foo(&count)); + let b = CachePadded::new(Foo(&count)); + + assert_eq!(count.get(), 0); + drop(a); + assert_eq!(count.get(), 1); + drop(b); + assert_eq!(count.get(), 2); +} + +#[allow(clippy::clone_on_copy)] // This is intentional. +#[test] +fn clone() { + let a = CachePadded::new(17); + let b = a.clone(); + assert_eq!(*a, *b); +} + +#[test] +fn runs_custom_clone() { + let count = Cell::new(0); + + struct Foo<'a>(&'a Cell<usize>); + + impl<'a> Clone for Foo<'a> { + fn clone(&self) -> Foo<'a> { + self.0.set(self.0.get() + 1); + Foo::<'a>(self.0) + } + } + + let a = CachePadded::new(Foo(&count)); + let _ = a.clone(); + + assert_eq!(count.get(), 1); +} diff --git a/third_party/rust/crossbeam-utils/tests/parker.rs b/third_party/rust/crossbeam-utils/tests/parker.rs new file mode 100644 index 0000000000..2bf9c37d49 --- /dev/null +++ b/third_party/rust/crossbeam-utils/tests/parker.rs @@ -0,0 +1,41 @@ +use std::thread::sleep; +use std::time::Duration; +use std::u32; + +use crossbeam_utils::sync::Parker; +use crossbeam_utils::thread; + +#[test] +fn park_timeout_unpark_before() { + let p = Parker::new(); + for _ in 0..10 { + p.unparker().unpark(); + p.park_timeout(Duration::from_millis(u32::MAX as u64)); + } +} + +#[test] +fn park_timeout_unpark_not_called() { + let p = Parker::new(); + for _ in 0..10 { + p.park_timeout(Duration::from_millis(10)) + } +} + +#[test] +fn park_timeout_unpark_called_other_thread() { + for _ in 0..10 { + let p = Parker::new(); + let u = p.unparker().clone(); + + thread::scope(|scope| { + scope.spawn(move |_| { + sleep(Duration::from_millis(50)); + u.unpark(); + }); + + p.park_timeout(Duration::from_millis(u32::MAX as u64)) + }) + .unwrap(); + } +} diff --git a/third_party/rust/crossbeam-utils/tests/sharded_lock.rs b/third_party/rust/crossbeam-utils/tests/sharded_lock.rs new file mode 100644 index 0000000000..002f7f5e19 --- /dev/null +++ b/third_party/rust/crossbeam-utils/tests/sharded_lock.rs @@ -0,0 +1,252 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, TryLockError}; +use std::thread; + +use crossbeam_utils::sync::ShardedLock; +use rand::Rng; + +#[derive(Eq, PartialEq, Debug)] +struct NonCopy(i32); + +#[test] +fn smoke() { + let l = ShardedLock::new(()); + drop(l.read().unwrap()); + drop(l.write().unwrap()); + drop((l.read().unwrap(), l.read().unwrap())); + drop(l.write().unwrap()); +} + +#[test] +fn frob() { + const N: u32 = 10; + #[cfg(miri)] + const M: usize = 50; + #[cfg(not(miri))] + const M: usize = 1000; + + let r = Arc::new(ShardedLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..N { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + for _ in 0..M { + if rng.gen_bool(1.0 / (N as f64)) { + drop(r.write().unwrap()); + } else { + drop(r.read().unwrap()); + } + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +#[test] +fn arc_poison_wr() { + let arc = Arc::new(ShardedLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write().unwrap(); + panic!(); + }) + .join(); + assert!(arc.read().is_err()); +} + +#[test] +fn arc_poison_ww() { + let arc = Arc::new(ShardedLock::new(1)); + assert!(!arc.is_poisoned()); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write().unwrap(); + panic!(); + }) + .join(); + assert!(arc.write().is_err()); + assert!(arc.is_poisoned()); +} + +#[test] +fn arc_no_poison_rr() { + let arc = Arc::new(ShardedLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read().unwrap(); + panic!(); + }) + .join(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 1); +} +#[test] +fn arc_no_poison_sl() { + let arc = Arc::new(ShardedLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read().unwrap(); + panic!() + }) + .join(); + let lock = arc.write().unwrap(); + assert_eq!(*lock, 1); +} + +#[test] +fn arc() { + let arc = Arc::new(ShardedLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + thread::spawn(move || { + let mut lock = arc2.write().unwrap(); + for _ in 0..10 { + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + tx.send(()).unwrap(); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.read().unwrap(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + rx.recv().unwrap(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 10); +} + +#[test] +fn arc_access_in_unwind() { + let arc = Arc::new(ShardedLock::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || { + struct Unwinder { + i: Arc<ShardedLock<isize>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write().unwrap(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 2); +} + +#[test] +fn unsized_type() { + let sl: &ShardedLock<[i32]> = &ShardedLock::new([1, 2, 3]); + { + let b = &mut *sl.write().unwrap(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*sl.read().unwrap(), comp); +} + +#[test] +fn try_write() { + let lock = ShardedLock::new(0isize); + let read_guard = lock.read().unwrap(); + + let write_result = lock.try_write(); + match write_result { + Err(TryLockError::WouldBlock) => (), + Ok(_) => panic!("try_write should not succeed while read_guard is in scope"), + Err(_) => panic!("unexpected error"), + } + + drop(read_guard); +} + +#[test] +fn test_into_inner() { + let m = ShardedLock::new(NonCopy(10)); + assert_eq!(m.into_inner().unwrap(), NonCopy(10)); +} + +#[test] +fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = ShardedLock::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner().unwrap(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); +} + +#[test] +fn test_into_inner_poison() { + let m = Arc::new(ShardedLock::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.write().unwrap(); + panic!("test panic in inner thread to poison ShardedLock"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().into_inner() { + Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), + Ok(x) => panic!("into_inner of poisoned ShardedLock is Ok: {:?}", x), + } +} + +#[test] +fn test_get_mut() { + let mut m = ShardedLock::new(NonCopy(10)); + *m.get_mut().unwrap() = NonCopy(20); + assert_eq!(m.into_inner().unwrap(), NonCopy(20)); +} + +#[test] +fn test_get_mut_poison() { + let m = Arc::new(ShardedLock::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.write().unwrap(); + panic!("test panic in inner thread to poison ShardedLock"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().get_mut() { + Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), + Ok(x) => panic!("get_mut of poisoned ShardedLock is Ok: {:?}", x), + } +} diff --git a/third_party/rust/crossbeam-utils/tests/thread.rs b/third_party/rust/crossbeam-utils/tests/thread.rs new file mode 100644 index 0000000000..0dfad90bd6 --- /dev/null +++ b/third_party/rust/crossbeam-utils/tests/thread.rs @@ -0,0 +1,215 @@ +use std::any::Any; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread::sleep; +use std::time::Duration; + +use crossbeam_utils::thread; + +const THREADS: usize = 10; +const SMALL_STACK_SIZE: usize = 20; + +#[test] +fn join() { + let counter = AtomicUsize::new(0); + thread::scope(|scope| { + let handle = scope.spawn(|_| { + counter.store(1, Ordering::Relaxed); + }); + assert!(handle.join().is_ok()); + + let panic_handle = scope.spawn(|_| { + panic!("\"My honey is running out!\", said Pooh."); + }); + assert!(panic_handle.join().is_err()); + }) + .unwrap(); + + // There should be sufficient synchronization. + assert_eq!(1, counter.load(Ordering::Relaxed)); +} + +#[test] +fn counter() { + let counter = AtomicUsize::new(0); + thread::scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }) + .unwrap(); + + assert_eq!(THREADS, counter.load(Ordering::Relaxed)); +} + +#[test] +fn counter_builder() { + let counter = AtomicUsize::new(0); + thread::scope(|scope| { + for i in 0..THREADS { + scope + .builder() + .name(format!("child-{}", i)) + .stack_size(SMALL_STACK_SIZE) + .spawn(|_| { + counter.fetch_add(1, Ordering::Relaxed); + }) + .unwrap(); + } + }) + .unwrap(); + + assert_eq!(THREADS, counter.load(Ordering::Relaxed)); +} + +#[test] +fn counter_panic() { + let counter = AtomicUsize::new(0); + let result = thread::scope(|scope| { + scope.spawn(|_| { + panic!("\"My honey is running out!\", said Pooh."); + }); + sleep(Duration::from_millis(100)); + + for _ in 0..THREADS { + scope.spawn(|_| { + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + + assert_eq!(THREADS, counter.load(Ordering::Relaxed)); + assert!(result.is_err()); +} + +#[test] +fn panic_twice() { + let result = thread::scope(|scope| { + scope.spawn(|_| { + sleep(Duration::from_millis(500)); + panic!("thread #1"); + }); + scope.spawn(|_| { + panic!("thread #2"); + }); + }); + + let err = result.unwrap_err(); + let vec = err + .downcast_ref::<Vec<Box<dyn Any + Send + 'static>>>() + .unwrap(); + assert_eq!(2, vec.len()); + + let first = vec[0].downcast_ref::<&str>().unwrap(); + let second = vec[1].downcast_ref::<&str>().unwrap(); + assert_eq!("thread #1", *first); + assert_eq!("thread #2", *second) +} + +#[test] +fn panic_many() { + let result = thread::scope(|scope| { + scope.spawn(|_| panic!("deliberate panic #1")); + scope.spawn(|_| panic!("deliberate panic #2")); + scope.spawn(|_| panic!("deliberate panic #3")); + }); + + let err = result.unwrap_err(); + let vec = err + .downcast_ref::<Vec<Box<dyn Any + Send + 'static>>>() + .unwrap(); + assert_eq!(3, vec.len()); + + for panic in vec.iter() { + let panic = panic.downcast_ref::<&str>().unwrap(); + assert!( + *panic == "deliberate panic #1" + || *panic == "deliberate panic #2" + || *panic == "deliberate panic #3" + ); + } +} + +#[test] +fn nesting() { + let var = "foo".to_string(); + + struct Wrapper<'a> { + var: &'a String, + } + + impl<'a> Wrapper<'a> { + fn recurse(&'a self, scope: &thread::Scope<'a>, depth: usize) { + assert_eq!(self.var, "foo"); + + if depth > 0 { + scope.spawn(move |scope| { + self.recurse(scope, depth - 1); + }); + } + } + } + + let wrapper = Wrapper { var: &var }; + + thread::scope(|scope| { + scope.spawn(|scope| { + scope.spawn(|scope| { + wrapper.recurse(scope, 5); + }); + }); + }) + .unwrap(); +} + +#[test] +fn join_nested() { + thread::scope(|scope| { + scope.spawn(|scope| { + let handle = scope.spawn(|_| 7); + + sleep(Duration::from_millis(200)); + handle.join().unwrap(); + }); + + sleep(Duration::from_millis(100)); + }) + .unwrap(); +} + +#[test] +fn scope_returns_ok() { + let result = thread::scope(|scope| scope.spawn(|_| 1234).join().unwrap()).unwrap(); + assert_eq!(result, 1234); +} + +#[cfg(unix)] +#[test] +fn as_pthread_t() { + use std::os::unix::thread::JoinHandleExt; + thread::scope(|scope| { + let handle = scope.spawn(|_scope| { + sleep(Duration::from_millis(100)); + 42 + }); + let _pthread_t = handle.as_pthread_t(); + handle.join().unwrap(); + }) + .unwrap(); +} + +#[cfg(windows)] +#[test] +fn as_raw_handle() { + use std::os::windows::io::AsRawHandle; + thread::scope(|scope| { + let handle = scope.spawn(|_scope| { + sleep(Duration::from_millis(100)); + 42 + }); + let _raw_handle = handle.as_raw_handle(); + handle.join().unwrap(); + }) + .unwrap(); +} diff --git a/third_party/rust/crossbeam-utils/tests/wait_group.rs b/third_party/rust/crossbeam-utils/tests/wait_group.rs new file mode 100644 index 0000000000..0ec4a729ca --- /dev/null +++ b/third_party/rust/crossbeam-utils/tests/wait_group.rs @@ -0,0 +1,65 @@ +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +use crossbeam_utils::sync::WaitGroup; + +const THREADS: usize = 10; + +#[test] +fn wait() { + let wg = WaitGroup::new(); + let (tx, rx) = mpsc::channel(); + + for _ in 0..THREADS { + let wg = wg.clone(); + let tx = tx.clone(); + + thread::spawn(move || { + wg.wait(); + tx.send(()).unwrap(); + }); + } + + thread::sleep(Duration::from_millis(100)); + + // At this point, all spawned threads should be blocked, so we shouldn't get anything from the + // channel. + assert!(rx.try_recv().is_err()); + + wg.wait(); + + // Now, the wait group is cleared and we should receive messages. + for _ in 0..THREADS { + rx.recv().unwrap(); + } +} + +#[test] +#[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them +fn wait_and_drop() { + let wg = WaitGroup::new(); + let (tx, rx) = mpsc::channel(); + + for _ in 0..THREADS { + let wg = wg.clone(); + let tx = tx.clone(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + tx.send(()).unwrap(); + drop(wg); + }); + } + + // At this point, all spawned threads should be in `thread::sleep`, so we shouldn't get anything + // from the channel. + assert!(rx.try_recv().is_err()); + + wg.wait(); + + // Now, the wait group is cleared and we should receive messages. + for _ in 0..THREADS { + rx.try_recv().unwrap(); + } +} |