diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/ringbuf | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/ringbuf')
21 files changed, 2695 insertions, 0 deletions
diff --git a/third_party/rust/ringbuf/.cargo-checksum.json b/third_party/rust/ringbuf/.cargo-checksum.json new file mode 100644 index 0000000000..48a1e10d41 --- /dev/null +++ b/third_party/rust/ringbuf/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.lock":"a28327c8be306f9d8c873bd664c17120e932b405aa33c2f1ba6c1ac17a31670a","Cargo.toml":"da0d556a0c1cc18c888de9f1300c7c60e41e76dd4d994fe352b300bb0d45ce76","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"6683f8a2ba538c11948da65a5c0d03ba4a7c6bbbd430c77f950b477a67586729","README.md":"f7fe7daa134b2d6748ed5c2818b973a6ad49c8bf36246248ea04b1955cda6b39","appveyor.yml":"72c363ac4e1e39ec9776b197bb9964c7a8d89211a7a80070c5629d38ad94c3e1","examples/message.rs":"e89f1ceaa10e43e44ad455a2dcf30108a9fce0cbc2ab48d2bbfed775ed45d49d","examples/simple.rs":"47e919144ea0b0fba649d031e68168660e1ccc0700de7647edb2f3da258cd20e","src/benchmark.rs":"6fea88569a03e881e0ff0d530100cb1fec015c02b6e3b850e4f790ae305aa459","src/consumer.rs":"2e009d979b3198bf4a4578af3b717375e3060fcc91ca0d614935e4cbf2208de4","src/lib.rs":"9e7b5742b09d8d200b4620bdf64c1e25e73dae410f591975e46e6bf4decde8fe","src/producer.rs":"e4b5d108ab0cc16c0e0492ea8f0ed1695524b92c595c30498570b400eb19dff1","src/ring_buffer.rs":"52c009ef16e3fe72731fcc5bfd3d93884298e9629d9357875a11518fb22e2b3f","src/tests/access.rs":"6d5c62988d1ef39b6bc1cb24c41a9ab2b62e41ee1c493773bc152c1366cd2ff5","src/tests/drop.rs":"b0b8f647a3865e91abaf02d749c085e2d8f06a10d70cf27a7abc7fb375ba43b5","src/tests/message.rs":"b7a1fa117863c78d795c26cc94e9cb96b2c9afbf451e8a0b7c695707cddbd461","src/tests/mod.rs":"fb30be383ec9f17b7152eda1114829da3adb50f4fdaf04fe534bc97740683771","src/tests/multiple.rs":"841ec3fb02a30da57371075e6b536f0870cd787b9d4a596b702ffff50fdbc974","src/tests/read_write.rs":"f67abd613ae7fc85feed0aeac56ea1a9c6f864bef713a67774a3b9a920686046","src/tests/single.rs":"48e200f4ba1bb0fa6645bfa033fc163deb19ed22e8df5f2b6f526ac08853dc07"},"package":"7c2b29d87cfbdce39849012bb5020fff88b8f01f4f5b55846a0b6ef360774eae"}
\ No newline at end of file diff --git a/third_party/rust/ringbuf/Cargo.lock b/third_party/rust/ringbuf/Cargo.lock new file mode 100644 index 0000000000..db21ccd5f1 --- /dev/null +++ b/third_party/rust/ringbuf/Cargo.lock @@ -0,0 +1,6 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "ringbuf" +version = "0.2.1" + diff --git a/third_party/rust/ringbuf/Cargo.toml b/third_party/rust/ringbuf/Cargo.toml new file mode 100644 index 0000000000..c2aa5ceb35 --- /dev/null +++ b/third_party/rust/ringbuf/Cargo.toml @@ -0,0 +1,31 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +edition = "2018" +name = "ringbuf" +version = "0.2.1" +authors = ["Alexey Gerasev <alexey.gerasev@gmail.com>"] +description = "Lock-free SPSC FIFO ring buffer with direct access to inner data" +homepage = "https://github.com/nthend/ringbuf" +documentation = "https://docs.rs/ringbuf" +readme = "README.md" +keywords = ["lock-free", "spsc", "ring-buffer", "rb", "fifo"] +categories = ["concurrency", "data-structures"] +license = "MIT/Apache-2.0" +repository = "https://github.com/nthend/ringbuf.git" + +[dependencies] + +[features] +benchmark = [] +default = [] diff --git a/third_party/rust/ringbuf/LICENSE-APACHE b/third_party/rust/ringbuf/LICENSE-APACHE new file mode 100644 index 0000000000..16fe87b06e --- /dev/null +++ b/third_party/rust/ringbuf/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/third_party/rust/ringbuf/LICENSE-MIT b/third_party/rust/ringbuf/LICENSE-MIT new file mode 100644 index 0000000000..8ed7609e35 --- /dev/null +++ b/third_party/rust/ringbuf/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2019 Alexey Gerasev + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/ringbuf/README.md b/third_party/rust/ringbuf/README.md new file mode 100644 index 0000000000..082cec5074 --- /dev/null +++ b/third_party/rust/ringbuf/README.md @@ -0,0 +1,147 @@ +# ringbuf + +[![Crates.io][crates_badge]][crates] +[![Docs.rs][docs_badge]][docs] +[![Travis CI][travis_badge]][travis] +[![Appveyor][appveyor_badge]][appveyor] +[![Codecov.io][codecov_badge]][codecov] +[![License][license_badge]][license] + +[crates_badge]: https://img.shields.io/crates/v/ringbuf.svg +[docs_badge]: https://docs.rs/ringbuf/badge.svg +[travis_badge]: https://api.travis-ci.org/agerasev/ringbuf.svg +[appveyor_badge]: https://ci.appveyor.com/api/projects/status/github/agerasev/ringbuf?branch=master&svg=true +[codecov_badge]: https://codecov.io/gh/agerasev/ringbuf/graphs/badge.svg +[license_badge]: https://img.shields.io/crates/l/ringbuf.svg + +[crates]: https://crates.io/crates/ringbuf +[docs]: https://docs.rs/ringbuf +[travis]: https://travis-ci.org/agerasev/ringbuf +[appveyor]: https://ci.appveyor.com/project/agerasev/ringbuf +[codecov]: https://codecov.io/gh/agerasev/ringbuf +[license]: #license + +Lock-free single-producer single-consumer (SPSC) FIFO ring buffer with direct access to inner data. + +# Overview + +`RingBuffer` is the initial structure representing ring buffer itself. +Ring buffer can be splitted into pair of `Producer` and `Consumer`. + +`Producer` and `Consumer` are used to append/remove elements to/from the ring buffer accordingly. They can be safely transfered between threads. +Operations with `Producer` and `Consumer` are lock-free - they're succeded or failed immediately without blocking or waiting. + +Elements can be effectively appended/removed one by one or many at once. +Also data could be loaded/stored directly into/from [`Read`]/[`Write`] instances. +And finally, there are `unsafe` methods allowing thread-safe direct access in place to the inner memory being appended/removed. + +[`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +[`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html + +When building with nightly toolchain it is possible to run benchmarks via `cargo bench --features benchmark`. + +# Examples + +## Simple example + +```rust +use ringbuf::RingBuffer; + +let rb = RingBuffer::<i32>::new(2); +let (mut prod, mut cons) = rb.split(); + +prod.push(0).unwrap(); +prod.push(1).unwrap(); +assert_eq!(prod.push(2), Err(2)); + +assert_eq!(cons.pop().unwrap(), 0); + +prod.push(2).unwrap(); + +assert_eq!(cons.pop().unwrap(), 1); +assert_eq!(cons.pop().unwrap(), 2); +assert_eq!(cons.pop(), None); +``` + +## Message transfer + +This is more complicated example of transfering text message between threads. + +```rust +use std::io::Read; +use std::thread; +use std::time::Duration; + +use ringbuf::RingBuffer; + +let buf = RingBuffer::<u8>::new(10); +let (mut prod, mut cons) = buf.split(); + +let smsg = "The quick brown fox jumps over the lazy dog"; + +let pjh = thread::spawn(move || { + println!("-> sending message: '{}'", smsg); + + let zero = [0 as u8]; + let mut bytes = smsg.as_bytes().chain(&zero[..]); + loop { + if prod.is_full() { + println!("-> buffer is full, waiting"); + thread::sleep(Duration::from_millis(1)); + } else { + let n = prod.read_from(&mut bytes, None).unwrap(); + if n == 0 { + break; + } + println!("-> {} bytes sent", n); + } + } + + println!("-> message sent"); +}); + +let cjh = thread::spawn(move || { + println!("<- receiving message"); + + let mut bytes = Vec::<u8>::new(); + loop { + if cons.is_empty() { + if bytes.ends_with(&[0]) { + break; + } else { + println!("<- buffer is empty, waiting"); + thread::sleep(Duration::from_millis(1)); + } + } else { + let n = cons.write_into(&mut bytes, None).unwrap(); + println!("<- {} bytes received", n); + } + } + + assert_eq!(bytes.pop().unwrap(), 0); + let msg = String::from_utf8(bytes).unwrap(); + println!("<- message received: '{}'", msg); + + msg +}); + +pjh.join().unwrap(); +let rmsg = cjh.join().unwrap(); + +assert_eq!(smsg, rmsg); +``` + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any +additional terms or conditions. diff --git a/third_party/rust/ringbuf/appveyor.yml b/third_party/rust/ringbuf/appveyor.yml new file mode 100644 index 0000000000..91026af6d2 --- /dev/null +++ b/third_party/rust/ringbuf/appveyor.yml @@ -0,0 +1,50 @@ +os: Visual Studio 2015 + +environment: + matrix: + - channel: stable + target: x86_64-pc-windows-msvc + - channel: stable + target: i686-pc-windows-msvc + - channel: beta + target: x86_64-pc-windows-msvc + - channel: beta + target: i686-pc-windows-msvc + - channel: nightly + target: x86_64-pc-windows-msvc + #cargoflags: --features "unstable" + - channel: nightly + target: i686-pc-windows-msvc + #cargoflags: --features "unstable" + + - channel: stable + target: x86_64-pc-windows-gnu + - channel: stable + target: i686-pc-windows-gnu + - channel: beta + target: x86_64-pc-windows-gnu + - channel: beta + target: i686-pc-windows-gnu + - channel: nightly + target: x86_64-pc-windows-gnu + #cargoflags: --features "unstable" + - channel: nightly + target: i686-pc-windows-gnu + #cargoflags: --features "unstable" + +matrix: + allow_failures: + - channel: nightly + +install: + - appveyor DownloadFile https://win.rustup.rs/ -FileName rustup-init.exe + - rustup-init -yv --default-toolchain %channel% --default-host %target% + - set PATH=%PATH%;%USERPROFILE%\.cargo\bin + - rustc -vV + - cargo -vV + +build: false + +test_script: +- cargo test --verbose %cargoflags% +- bash -c "! rustc -V | grep nightly || cargo bench --features benchmark --verbose %cargoflags%" diff --git a/third_party/rust/ringbuf/examples/message.rs b/third_party/rust/ringbuf/examples/message.rs new file mode 100644 index 0000000000..e5345dfe62 --- /dev/null +++ b/third_party/rust/ringbuf/examples/message.rs @@ -0,0 +1,65 @@ +extern crate ringbuf; + +use std::io::Read; +use std::thread; +use std::time::Duration; + +use ringbuf::RingBuffer; + +fn main() { + let buf = RingBuffer::<u8>::new(10); + let (mut prod, mut cons) = buf.split(); + + let smsg = "The quick brown fox jumps over the lazy dog"; + + let pjh = thread::spawn(move || { + println!("-> sending message: '{}'", smsg); + + let zero = [0 as u8]; + let mut bytes = smsg.as_bytes().chain(&zero[..]); + loop { + if prod.is_full() { + println!("-> buffer is full, waiting"); + thread::sleep(Duration::from_millis(1)); + } else { + let n = prod.read_from(&mut bytes, None).unwrap(); + if n == 0 { + break; + } + println!("-> {} bytes sent", n); + } + } + + println!("-> message sent"); + }); + + let cjh = thread::spawn(move || { + println!("<- receiving message"); + + let mut bytes = Vec::<u8>::new(); + loop { + if cons.is_empty() { + if bytes.ends_with(&[0]) { + break; + } else { + println!("<- buffer is empty, waiting"); + thread::sleep(Duration::from_millis(1)); + } + } else { + let n = cons.write_into(&mut bytes, None).unwrap(); + println!("<- {} bytes received", n); + } + } + + assert_eq!(bytes.pop().unwrap(), 0); + let msg = String::from_utf8(bytes).unwrap(); + println!("<- message received: '{}'", msg); + + msg + }); + + pjh.join().unwrap(); + let rmsg = cjh.join().unwrap(); + + assert_eq!(smsg, rmsg); +} diff --git a/third_party/rust/ringbuf/examples/simple.rs b/third_party/rust/ringbuf/examples/simple.rs new file mode 100644 index 0000000000..7ee372d4d1 --- /dev/null +++ b/third_party/rust/ringbuf/examples/simple.rs @@ -0,0 +1,20 @@ +extern crate ringbuf; + +use ringbuf::RingBuffer; + +fn main() { + let rb = RingBuffer::<i32>::new(2); + let (mut prod, mut cons) = rb.split(); + + prod.push(0).unwrap(); + prod.push(1).unwrap(); + assert_eq!(prod.push(2), Err(2)); + + assert_eq!(cons.pop().unwrap(), 0); + + prod.push(2).unwrap(); + + assert_eq!(cons.pop().unwrap(), 1); + assert_eq!(cons.pop().unwrap(), 2); + assert_eq!(cons.pop(), None); +} diff --git a/third_party/rust/ringbuf/src/benchmark.rs b/third_party/rust/ringbuf/src/benchmark.rs new file mode 100644 index 0000000000..e145f580c1 --- /dev/null +++ b/third_party/rust/ringbuf/src/benchmark.rs @@ -0,0 +1,51 @@ +use super::*; + +use test::Bencher; + +const RB_SIZE: usize = 0x400; + +#[bench] +fn single_item(b: &mut Bencher) { + let buf = RingBuffer::<u64>::new(RB_SIZE); + let (mut prod, mut cons) = buf.split(); + prod.push_slice(&[1; RB_SIZE / 2]); + b.iter(|| { + prod.push(1).unwrap(); + cons.pop().unwrap(); + }); +} + +#[bench] +fn slice_x10(b: &mut Bencher) { + let buf = RingBuffer::<u64>::new(RB_SIZE); + let (mut prod, mut cons) = buf.split(); + prod.push_slice(&[1; RB_SIZE / 2]); + let mut data = [1; 10]; + b.iter(|| { + prod.push_slice(&data); + cons.pop_slice(&mut data); + }); +} + +#[bench] +fn slice_x100(b: &mut Bencher) { + let buf = RingBuffer::<u64>::new(RB_SIZE); + let (mut prod, mut cons) = buf.split(); + prod.push_slice(&[1; RB_SIZE / 2]); + let mut data = [1; 100]; + b.iter(|| { + prod.push_slice(&data); + cons.pop_slice(&mut data); + }); +} +#[bench] +fn slice_x1000(b: &mut Bencher) { + let buf = RingBuffer::<u64>::new(RB_SIZE); + let (mut prod, mut cons) = buf.split(); + prod.push_slice(&[1; 16]); + let mut data = [1; 1000]; + b.iter(|| { + prod.push_slice(&data); + cons.pop_slice(&mut data); + }); +} diff --git a/third_party/rust/ringbuf/src/consumer.rs b/third_party/rust/ringbuf/src/consumer.rs new file mode 100644 index 0000000000..f367fdd851 --- /dev/null +++ b/third_party/rust/ringbuf/src/consumer.rs @@ -0,0 +1,356 @@ +use std::{ + cmp::min, + io::{self, Read, Write}, + mem::{self, MaybeUninit}, + ops::Range, + ptr::copy_nonoverlapping, + sync::{atomic::Ordering, Arc}, +}; + +use crate::{producer::Producer, ring_buffer::*}; + +/// Consumer part of ring buffer. +pub struct Consumer<T> { + pub(crate) rb: Arc<RingBuffer<T>>, +} + +impl<T: Sized> Consumer<T> { + /// Returns capacity of the ring buffer. + /// + /// The capacity of the buffer is constant. + pub fn capacity(&self) -> usize { + self.rb.capacity() + } + + /// Checks if the ring buffer is empty. + /// + /// *The result may become irrelevant at any time because of concurring activity of the producer.* + pub fn is_empty(&self) -> bool { + self.rb.is_empty() + } + + /// Checks if the ring buffer is full. + /// + /// The result is relevant until you remove items from the consumer. + pub fn is_full(&self) -> bool { + self.rb.is_full() + } + + /// The length of the data stored in the buffer + /// + /// Actual length may be equal to or greater than the returned value. + pub fn len(&self) -> usize { + self.rb.len() + } + + /// The remaining space in the buffer. + /// + /// Actual remaining space may be equal to or less than the returning value. + pub fn remaining(&self) -> usize { + self.rb.remaining() + } + + fn get_ranges(&self) -> (Range<usize>, Range<usize>) { + let head = self.rb.head.load(Ordering::Acquire); + let tail = self.rb.tail.load(Ordering::Acquire); + let len = unsafe { self.rb.data.get_ref().len() }; + + if head < tail { + (head..tail, 0..0) + } else if head > tail { + (head..len, 0..tail) + } else { + (0..0, 0..0) + } + } + + /// Gives immutable access to the elements contained by the ring buffer without removing them. + /// + /// The method takes a function `f` as argument. + /// `f` takes two slices of ring buffer content (the second one or both of them may be empty). + /// First slice contains older elements. + /// + /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.* + pub fn access<F: FnOnce(&[T], &[T])>(&self, f: F) { + let ranges = self.get_ranges(); + + unsafe { + let left = &self.rb.data.get_ref()[ranges.0]; + let right = &self.rb.data.get_ref()[ranges.1]; + + f( + &*(left as *const [MaybeUninit<T>] as *const [T]), + &*(right as *const [MaybeUninit<T>] as *const [T]), + ); + } + } + + /// Gives mutable access to the elements contained by the ring buffer without removing them. + /// + /// The method takes a function `f` as argument. + /// `f` takes two slices of ring buffer content (the second one or both of them may be empty). + /// First slice contains older elements. + /// + /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.* + pub fn access_mut<F: FnOnce(&mut [T], &mut [T])>(&mut self, f: F) { + let ranges = self.get_ranges(); + + unsafe { + let left = &mut self.rb.data.get_mut()[ranges.0]; + let right = &mut self.rb.data.get_mut()[ranges.1]; + + f( + &mut *(left as *mut [MaybeUninit<T>] as *mut [T]), + &mut *(right as *mut [MaybeUninit<T>] as *mut [T]), + ); + } + } + + /// Allows to read from ring buffer memory directry. + /// + /// *This function is unsafe because it gives access to possibly uninitialized memory* + /// + /// The method takes a function `f` as argument. + /// `f` takes two slices of ring buffer content (the second one or both of them may be empty). + /// First slice contains older elements. + /// + /// `f` should return number of elements been read. + /// *There is no checks for returned number - it remains on the developer's conscience.* + /// + /// The method **always** calls `f` even if ring buffer is empty. + /// + /// The method returns number returned from `f`. + pub unsafe fn pop_access<F>(&mut self, f: F) -> usize + where + F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize, + { + let head = self.rb.head.load(Ordering::Acquire); + let tail = self.rb.tail.load(Ordering::Acquire); + let len = self.rb.data.get_ref().len(); + + let ranges = if head < tail { + (head..tail, 0..0) + } else if head > tail { + (head..len, 0..tail) + } else { + (0..0, 0..0) + }; + + let slices = ( + &mut self.rb.data.get_mut()[ranges.0], + &mut self.rb.data.get_mut()[ranges.1], + ); + + let n = f(slices.0, slices.1); + + if n > 0 { + let new_head = (head + n) % len; + self.rb.head.store(new_head, Ordering::Release); + } + n + } + + /// Copies data from the ring buffer to the slice in byte-to-byte manner. + /// + /// The `elems` slice should contain **un-initialized** data before the method call. + /// After the call the copied part of data in `elems` should be interpreted as **initialized**. + /// The remaining part is still **un-iniitilized**. + /// + /// Returns the number of items been copied. + pub unsafe fn pop_copy(&mut self, elems: &mut [MaybeUninit<T>]) -> usize { + self.pop_access(|left, right| { + if elems.len() < left.len() { + copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), elems.len()); + elems.len() + } else { + copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), left.len()); + if elems.len() < left.len() + right.len() { + copy_nonoverlapping( + right.as_ptr(), + elems.as_mut_ptr().add(left.len()), + elems.len() - left.len(), + ); + elems.len() + } else { + copy_nonoverlapping( + right.as_ptr(), + elems.as_mut_ptr().add(left.len()), + right.len(), + ); + left.len() + right.len() + } + } + }) + } + + /// Removes latest element from the ring buffer and returns it. + /// Returns `None` if the ring buffer is empty. + pub fn pop(&mut self) -> Option<T> { + let mut elem_mu = MaybeUninit::uninit(); + let n = unsafe { + self.pop_access(|slice, _| { + if !slice.is_empty() { + mem::swap(slice.get_unchecked_mut(0), &mut elem_mu); + 1 + } else { + 0 + } + }) + }; + match n { + 0 => None, + 1 => Some(unsafe { elem_mu.assume_init() }), + _ => unreachable!(), + } + } + + /// Repeatedly calls the closure `f` passing elements removed from the ring buffer to it. + /// + /// The closure is called until it returns `false` or the ring buffer is empty. + /// + /// The method returns number of elements been removed from the buffer. + pub fn pop_each<F: FnMut(T) -> bool>(&mut self, mut f: F, count: Option<usize>) -> usize { + unsafe { + self.pop_access(|left, right| { + let lb = match count { + Some(n) => min(n, left.len()), + None => left.len(), + }; + for (i, dst) in left[0..lb].iter_mut().enumerate() { + if !f(mem::replace(dst, MaybeUninit::uninit()).assume_init()) { + return i + 1; + } + } + if lb < left.len() { + return lb; + } + + let rb = match count { + Some(n) => min(n - lb, right.len()), + None => right.len(), + }; + for (i, dst) in right[0..rb].iter_mut().enumerate() { + if !f(mem::replace(dst, MaybeUninit::uninit()).assume_init()) { + return i + lb + 1; + } + } + left.len() + right.len() + }) + } + } + + /// Iterate immutably over the elements contained by the ring buffer without removing them. + /// + /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.* + pub fn for_each<F: FnMut(&T)>(&self, mut f: F) { + self.access(|left, right| { + for c in left.iter() { + f(c); + } + for c in right.iter() { + f(c); + } + }); + } + + /// Iterate mutably over the elements contained by the ring buffer without removing them. + /// + /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.* + pub fn for_each_mut<F: FnMut(&mut T)>(&mut self, mut f: F) { + self.access_mut(|left, right| { + for c in left.iter_mut() { + f(c); + } + for c in right.iter_mut() { + f(c); + } + }); + } + + /// Removes at most `count` elements from the consumer and appends them to the producer. + /// If `count` is `None` then as much as possible elements will be moved. + /// The producer and consumer parts may be of different buffers as well as of the same one. + /// + /// On success returns count of elements been moved. + pub fn move_to(&mut self, other: &mut Producer<T>, count: Option<usize>) -> usize { + move_items(self, other, count) + } +} + +impl<T: Sized + Copy> Consumer<T> { + /// Removes first elements from the ring buffer and writes them into a slice. + /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html). + /// + /// On success returns count of elements been removed from the ring buffer. + pub fn pop_slice(&mut self, elems: &mut [T]) -> usize { + unsafe { self.pop_copy(&mut *(elems as *mut [T] as *mut [MaybeUninit<T>])) } + } +} + +impl Consumer<u8> { + /// Removes at most first `count` bytes from the ring buffer and writes them into + /// a [`Write`](https://doc.rust-lang.org/std/io/trait.Write.html) instance. + /// If `count` is `None` then as much as possible bytes will be written. + /// + /// Returns `Ok(n)` if `write` is succeded. `n` is number of bytes been written. + /// `n == 0` means that either `write` returned zero or ring buffer is empty. + /// + /// If `write` is failed then error is returned. + pub fn write_into( + &mut self, + writer: &mut dyn Write, + count: Option<usize>, + ) -> io::Result<usize> { + let mut err = None; + let n = unsafe { + self.pop_access(|left, _| -> usize { + let left = match count { + Some(c) => { + if c < left.len() { + &mut left[0..c] + } else { + left + } + } + None => left, + }; + match writer + .write(&*(left as *const [MaybeUninit<u8>] as *const [u8])) + .and_then(|n| { + if n <= left.len() { + Ok(n) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Write operation returned invalid number", + )) + } + }) { + Ok(n) => n, + Err(e) => { + err = Some(e); + 0 + } + } + }) + }; + match err { + Some(e) => Err(e), + None => Ok(n), + } + } +} + +impl Read for Consumer<u8> { + fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> { + let n = self.pop_slice(buffer); + if n == 0 && !buffer.is_empty() { + Err(io::Error::new( + io::ErrorKind::WouldBlock, + "Ring buffer is empty", + )) + } else { + Ok(n) + } + } +} diff --git a/third_party/rust/ringbuf/src/lib.rs b/third_party/rust/ringbuf/src/lib.rs new file mode 100644 index 0000000000..5b45f9017a --- /dev/null +++ b/third_party/rust/ringbuf/src/lib.rs @@ -0,0 +1,134 @@ +//! Lock-free single-producer single-consumer (SPSC) FIFO ring buffer with direct access to inner data. +//! +//! # Overview +//! +//! `RingBuffer` is the initial structure representing ring buffer itself. +//! Ring buffer can be splitted into pair of `Producer` and `Consumer`. +//! +//! `Producer` and `Consumer` are used to append/remove elements to/from the ring buffer accordingly. They can be safely transfered between threads. +//! Operations with `Producer` and `Consumer` are lock-free - they're succeded or failed immediately without blocking or waiting. +//! +//! Elements can be effectively appended/removed one by one or many at once. +//! Also data could be loaded/stored directly into/from [`Read`]/[`Write`] instances. +//! And finally, there are `unsafe` methods allowing thread-safe direct access in place to the inner memory being appended/removed. +//! +//! [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +//! [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html +//! +//! When building with nightly toolchain it is possible to run benchmarks via `cargo bench --features benchmark`. +//! +//! # Examples +//! +//! ## Simple example +//! +//! ```rust +//! # extern crate ringbuf; +//! use ringbuf::RingBuffer; +//! # fn main() { +//! let rb = RingBuffer::<i32>::new(2); +//! let (mut prod, mut cons) = rb.split(); +//! +//! prod.push(0).unwrap(); +//! prod.push(1).unwrap(); +//! assert_eq!(prod.push(2), Err(2)); +//! +//! assert_eq!(cons.pop().unwrap(), 0); +//! +//! prod.push(2).unwrap(); +//! +//! assert_eq!(cons.pop().unwrap(), 1); +//! assert_eq!(cons.pop().unwrap(), 2); +//! assert_eq!(cons.pop(), None); +//! # } +//! ``` +//! +//! ## Message transfer +//! +//! This is more complicated example of transfering text message between threads. +//! +//! ```rust +//! # extern crate ringbuf; +//! use std::io::Read; +//! use std::thread; +//! use std::time::Duration; +//! +//! use ringbuf::RingBuffer; +//! +//! # fn main() { +//! let buf = RingBuffer::<u8>::new(10); +//! let (mut prod, mut cons) = buf.split(); +//! +//! let smsg = "The quick brown fox jumps over the lazy dog"; +//! +//! let pjh = thread::spawn(move || { +//! println!("-> sending message: '{}'", smsg); +//! +//! let zero = [0 as u8]; +//! let mut bytes = smsg.as_bytes().chain(&zero[..]); +//! loop { +//! if prod.is_full() { +//! println!("-> buffer is full, waiting"); +//! thread::sleep(Duration::from_millis(1)); +//! } else { +//! let n = prod.read_from(&mut bytes, None).unwrap(); +//! if n == 0 { +//! break; +//! } +//! println!("-> {} bytes sent", n); +//! } +//! } +//! +//! println!("-> message sent"); +//! }); +//! +//! let cjh = thread::spawn(move || { +//! println!("<- receiving message"); +//! +//! let mut bytes = Vec::<u8>::new(); +//! loop { +//! if cons.is_empty() { +//! if bytes.ends_with(&[0]) { +//! break; +//! } else { +//! println!("<- buffer is empty, waiting"); +//! thread::sleep(Duration::from_millis(1)); +//! } +//! } else { +//! let n = cons.write_into(&mut bytes, None).unwrap(); +//! println!("<- {} bytes received", n); +//! } +//! } +//! +//! assert_eq!(bytes.pop().unwrap(), 0); +//! let msg = String::from_utf8(bytes).unwrap(); +//! println!("<- message received: '{}'", msg); +//! +//! msg +//! }); +//! +//! pjh.join().unwrap(); +//! let rmsg = cjh.join().unwrap(); +//! +//! assert_eq!(smsg, rmsg); +//! # } +//! ``` +//! + +#![cfg_attr(feature = "benchmark", feature(test))] + +#[cfg(feature = "benchmark")] +extern crate test; + +#[cfg(feature = "benchmark")] +mod benchmark; + +#[cfg(test)] +mod tests; + +mod consumer; +mod producer; +mod ring_buffer; + +pub use consumer::*; +pub use producer::*; +pub use ring_buffer::*; diff --git a/third_party/rust/ringbuf/src/producer.rs b/third_party/rust/ringbuf/src/producer.rs new file mode 100644 index 0000000000..e77e7528f5 --- /dev/null +++ b/third_party/rust/ringbuf/src/producer.rs @@ -0,0 +1,274 @@ +use std::{ + io::{self, Read, Write}, + mem::{self, MaybeUninit}, + ptr::copy_nonoverlapping, + sync::{atomic::Ordering, Arc}, +}; + +use crate::{consumer::Consumer, ring_buffer::*}; + +/// Producer part of ring buffer. +pub struct Producer<T> { + pub(crate) rb: Arc<RingBuffer<T>>, +} + +impl<T: Sized> Producer<T> { + /// Returns capacity of the ring buffer. + /// + /// The capacity of the buffer is constant. + pub fn capacity(&self) -> usize { + self.rb.capacity() + } + + /// Checks if the ring buffer is empty. + /// + /// The result is relevant until you push items to the producer. + pub fn is_empty(&self) -> bool { + self.rb.is_empty() + } + + /// Checks if the ring buffer is full. + /// + /// *The result may become irrelevant at any time because of concurring activity of the consumer.* + pub fn is_full(&self) -> bool { + self.rb.is_full() + } + + /// The length of the data stored in the buffer. + /// + /// Actual length may be equal to or less than the returned value. + pub fn len(&self) -> usize { + self.rb.len() + } + + /// The remaining space in the buffer. + /// + /// Actual remaining space may be equal to or greater than the returning value. + pub fn remaining(&self) -> usize { + self.rb.remaining() + } + + /// Allows to write into ring buffer memory directry. + /// + /// *This function is unsafe because it gives access to possibly uninitialized memory* + /// + /// The method takes a function `f` as argument. + /// `f` takes two slices of ring buffer content (the second one or both of them may be empty). + /// First slice contains older elements. + /// + /// `f` should return number of elements been written. + /// *There is no checks for returned number - it remains on the developer's conscience.* + /// + /// The method **always** calls `f` even if ring buffer is full. + /// + /// The method returns number returned from `f`. + pub unsafe fn push_access<F>(&mut self, f: F) -> usize + where + F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize, + { + let head = self.rb.head.load(Ordering::Acquire); + let tail = self.rb.tail.load(Ordering::Acquire); + let len = self.rb.data.get_ref().len(); + + let ranges = if tail >= head { + if head > 0 { + (tail..len, 0..(head - 1)) + } else if tail < len - 1 { + (tail..(len - 1), 0..0) + } else { + (0..0, 0..0) + } + } else if tail < head - 1 { + (tail..(head - 1), 0..0) + } else { + (0..0, 0..0) + }; + + let slices = ( + &mut self.rb.data.get_mut()[ranges.0], + &mut self.rb.data.get_mut()[ranges.1], + ); + + let n = f(slices.0, slices.1); + + if n > 0 { + let new_tail = (tail + n) % len; + self.rb.tail.store(new_tail, Ordering::Release); + } + n + } + + /// Copies data from the slice to the ring buffer in byte-to-byte manner. + /// + /// The `elems` slice should contain **initialized** data before the method call. + /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**. + /// + /// Returns the number of items been copied. + pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize { + self.push_access(|left, right| -> usize { + if elems.len() < left.len() { + copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len()); + elems.len() + } else { + copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len()); + if elems.len() < left.len() + right.len() { + copy_nonoverlapping( + elems.as_ptr().add(left.len()), + right.as_mut_ptr(), + elems.len() - left.len(), + ); + elems.len() + } else { + copy_nonoverlapping( + elems.as_ptr().add(left.len()), + right.as_mut_ptr(), + right.len(), + ); + left.len() + right.len() + } + } + }) + } + + /// Appends an element to the ring buffer. + /// On failure returns an error containing the element that hasn't beed appended. + pub fn push(&mut self, elem: T) -> Result<(), T> { + let mut elem_mu = MaybeUninit::new(elem); + let n = unsafe { + self.push_access(|slice, _| { + if !slice.is_empty() { + mem::swap(slice.get_unchecked_mut(0), &mut elem_mu); + 1 + } else { + 0 + } + }) + }; + match n { + 0 => Err(unsafe { elem_mu.assume_init() }), + 1 => Ok(()), + _ => unreachable!(), + } + } + + /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer. + /// + /// The closure is called until it returns `None` or the ring buffer is full. + /// + /// The method returns number of elements been put into the buffer. + pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize { + unsafe { + self.push_access(|left, right| { + for (i, dst) in left.iter_mut().enumerate() { + match f() { + Some(e) => mem::replace(dst, MaybeUninit::new(e)), + None => return i, + }; + } + for (i, dst) in right.iter_mut().enumerate() { + match f() { + Some(e) => mem::replace(dst, MaybeUninit::new(e)), + None => return i + left.len(), + }; + } + left.len() + right.len() + }) + } + } + + /// Appends elements from an iterator to the ring buffer. + /// Elements that haven't been added to the ring buffer remain in the iterator. + /// + /// Returns count of elements been appended to the ring buffer. + pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize { + self.push_each(|| elems.next()) + } + + /// Removes at most `count` elements from the consumer and appends them to the producer. + /// If `count` is `None` then as much as possible elements will be moved. + /// The producer and consumer parts may be of different buffers as well as of the same one. + /// + /// On success returns number of elements been moved. + pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize { + move_items(other, self, count) + } +} + +impl<T: Sized + Copy> Producer<T> { + /// Appends elements from slice to the ring buffer. + /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html). + /// + /// Returns count of elements been appended to the ring buffer. + pub fn push_slice(&mut self, elems: &[T]) -> usize { + unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) } + } +} + +impl Producer<u8> { + /// Reads at most `count` bytes + /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance + /// and appends them to the ring buffer. + /// If `count` is `None` then as much as possible bytes will be read. + /// + /// Returns `Ok(n)` if `read` is succeded. `n` is number of bytes been read. + /// `n == 0` means that either `read` returned zero or ring buffer is full. + /// + /// If `read` is failed then error is returned. + pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> { + let mut err = None; + let n = unsafe { + self.push_access(|left, _| -> usize { + let left = match count { + Some(c) => { + if c < left.len() { + &mut left[0..c] + } else { + left + } + } + None => left, + }; + match reader + .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8])) + .and_then(|n| { + if n <= left.len() { + Ok(n) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Read operation returned invalid number", + )) + } + }) { + Ok(n) => n, + Err(e) => { + err = Some(e); + 0 + } + } + }) + }; + match err { + Some(e) => Err(e), + None => Ok(n), + } + } +} + +impl Write for Producer<u8> { + fn write(&mut self, buffer: &[u8]) -> io::Result<usize> { + let n = self.push_slice(buffer); + if n == 0 && !buffer.is_empty() { + Err(io::Error::new( + io::ErrorKind::WouldBlock, + "Ring buffer is full", + )) + } else { + Ok(n) + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} diff --git a/third_party/rust/ringbuf/src/ring_buffer.rs b/third_party/rust/ringbuf/src/ring_buffer.rs new file mode 100644 index 0000000000..8ae68afa51 --- /dev/null +++ b/third_party/rust/ringbuf/src/ring_buffer.rs @@ -0,0 +1,187 @@ +use std::{ + cell::UnsafeCell, + cmp::min, + mem::{self, MaybeUninit}, + ptr::{self, copy}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +use crate::{consumer::Consumer, producer::Producer}; + +pub(crate) struct SharedVec<T: Sized> { + cell: UnsafeCell<Vec<T>>, +} + +unsafe impl<T: Sized> Sync for SharedVec<T> {} + +impl<T: Sized> SharedVec<T> { + pub fn new(data: Vec<T>) -> Self { + Self { + cell: UnsafeCell::new(data), + } + } + pub unsafe fn get_ref(&self) -> &Vec<T> { + &*self.cell.get() + } + #[allow(clippy::mut_from_ref)] + pub unsafe fn get_mut(&self) -> &mut Vec<T> { + &mut *self.cell.get() + } +} + +/// Ring buffer itself. +pub struct RingBuffer<T: Sized> { + pub(crate) data: SharedVec<MaybeUninit<T>>, + pub(crate) head: AtomicUsize, + pub(crate) tail: AtomicUsize, +} + +impl<T: Sized> RingBuffer<T> { + /// Creates a new instance of a ring buffer. + pub fn new(capacity: usize) -> Self { + let mut data = Vec::new(); + data.resize_with(capacity + 1, MaybeUninit::uninit); + Self { + data: SharedVec::new(data), + head: AtomicUsize::new(0), + tail: AtomicUsize::new(0), + } + } + + /// Splits ring buffer into producer and consumer. + pub fn split(self) -> (Producer<T>, Consumer<T>) { + let arc = Arc::new(self); + (Producer { rb: arc.clone() }, Consumer { rb: arc }) + } + + /// Returns capacity of the ring buffer. + pub fn capacity(&self) -> usize { + unsafe { self.data.get_ref() }.len() - 1 + } + + /// Checks if the ring buffer is empty. + pub fn is_empty(&self) -> bool { + let head = self.head.load(Ordering::Acquire); + let tail = self.tail.load(Ordering::Acquire); + head == tail + } + + /// Checks if the ring buffer is full. + pub fn is_full(&self) -> bool { + let head = self.head.load(Ordering::Acquire); + let tail = self.tail.load(Ordering::Acquire); + (tail + 1) % (self.capacity() + 1) == head + } + + /// The length of the data in the buffer. + pub fn len(&self) -> usize { + let head = self.head.load(Ordering::Acquire); + let tail = self.tail.load(Ordering::Acquire); + (tail + self.capacity() + 1 - head) % (self.capacity() + 1) + } + + /// The remaining space in the buffer. + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } +} + +impl<T: Sized> Drop for RingBuffer<T> { + fn drop(&mut self) { + let data = unsafe { self.data.get_mut() }; + + let head = self.head.load(Ordering::Acquire); + let tail = self.tail.load(Ordering::Acquire); + let len = data.len(); + + let slices = if head <= tail { + (head..tail, 0..0) + } else { + (head..len, 0..tail) + }; + + let drop = |elem_ref: &mut MaybeUninit<T>| unsafe { + mem::replace(elem_ref, MaybeUninit::uninit()).assume_init(); + }; + for elem in data[slices.0].iter_mut() { + drop(elem); + } + for elem in data[slices.1].iter_mut() { + drop(elem); + } + } +} + +struct SlicePtr<T: Sized> { + pub ptr: *mut T, + pub len: usize, +} + +impl<T> SlicePtr<T> { + fn null() -> Self { + Self { + ptr: ptr::null_mut(), + len: 0, + } + } + fn new(slice: &mut [T]) -> Self { + Self { + ptr: slice.as_mut_ptr(), + len: slice.len(), + } + } + unsafe fn shift(&mut self, count: usize) { + self.ptr = self.ptr.add(count); + self.len -= count; + } +} + +/// Moves at most `count` items from the `src` consumer to the `dst` producer. +/// Consumer and producer may be of different buffers as well as of the same one. +/// +/// `count` is the number of items being moved, if `None` - as much as possible items will be moved. +/// +/// Returns number of items been moved. +pub fn move_items<T>(src: &mut Consumer<T>, dst: &mut Producer<T>, count: Option<usize>) -> usize { + unsafe { + src.pop_access(|src_left, src_right| -> usize { + dst.push_access(|dst_left, dst_right| -> usize { + let n = count.unwrap_or_else(|| { + min( + src_left.len() + src_right.len(), + dst_left.len() + dst_right.len(), + ) + }); + let mut m = 0; + let mut src = (SlicePtr::new(src_left), SlicePtr::new(src_right)); + let mut dst = (SlicePtr::new(dst_left), SlicePtr::new(dst_right)); + + loop { + let k = min(n - m, min(src.0.len, dst.0.len)); + if k == 0 { + break; + } + copy(src.0.ptr, dst.0.ptr, k); + if src.0.len == k { + src.0 = src.1; + src.1 = SlicePtr::null(); + } else { + src.0.shift(k); + } + if dst.0.len == k { + dst.0 = dst.1; + dst.1 = SlicePtr::null(); + } else { + dst.0.shift(k); + } + m += k + } + + m + }) + }) + } +} diff --git a/third_party/rust/ringbuf/src/tests/access.rs b/third_party/rust/ringbuf/src/tests/access.rs new file mode 100644 index 0000000000..22cba9efee --- /dev/null +++ b/third_party/rust/ringbuf/src/tests/access.rs @@ -0,0 +1,234 @@ +use std::mem::MaybeUninit; + +use crate::RingBuffer; + +#[test] +fn push() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + let vs_20 = (123, 456); + let push_fn_20 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + assert_eq!(left.len(), 2); + assert_eq!(right.len(), 0); + left[0] = MaybeUninit::new(vs_20.0); + left[1] = MaybeUninit::new(vs_20.1); + 2 + }; + + assert_eq!(unsafe { prod.push_access(push_fn_20) }, 2); + + assert_eq!(cons.pop().unwrap(), vs_20.0); + assert_eq!(cons.pop().unwrap(), vs_20.1); + assert_eq!(cons.pop(), None); + + let vs_11 = (123, 456); + let push_fn_11 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + assert_eq!(left.len(), 1); + assert_eq!(right.len(), 1); + left[0] = MaybeUninit::new(vs_11.0); + right[0] = MaybeUninit::new(vs_11.1); + 2 + }; + + assert_eq!(unsafe { prod.push_access(push_fn_11) }, 2); + + assert_eq!(cons.pop().unwrap(), vs_11.0); + assert_eq!(cons.pop().unwrap(), vs_11.1); + assert_eq!(cons.pop(), None); +} + +#[test] +fn pop_full() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (_, mut cons) = buf.split(); + + let dummy_fn = |_l: &mut [MaybeUninit<i32>], _r: &mut [MaybeUninit<i32>]| -> usize { 0 }; + assert_eq!(unsafe { cons.pop_access(dummy_fn) }, 0); +} + +#[test] +fn pop_empty() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (_, mut cons) = buf.split(); + + let dummy_fn = |_l: &mut [MaybeUninit<i32>], _r: &mut [MaybeUninit<i32>]| -> usize { 0 }; + assert_eq!(unsafe { cons.pop_access(dummy_fn) }, 0); +} + +#[test] +fn pop() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + let vs_20 = (123, 456); + + assert_eq!(prod.push(vs_20.0), Ok(())); + assert_eq!(prod.push(vs_20.1), Ok(())); + assert_eq!(prod.push(0), Err(0)); + + let pop_fn_20 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + unsafe { + assert_eq!(left.len(), 2); + assert_eq!(right.len(), 0); + assert_eq!(left[0].assume_init(), vs_20.0); + assert_eq!(left[1].assume_init(), vs_20.1); + 2 + } + }; + + assert_eq!(unsafe { cons.pop_access(pop_fn_20) }, 2); + + let vs_11 = (123, 456); + + assert_eq!(prod.push(vs_11.0), Ok(())); + assert_eq!(prod.push(vs_11.1), Ok(())); + assert_eq!(prod.push(0), Err(0)); + + let pop_fn_11 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + unsafe { + assert_eq!(left.len(), 1); + assert_eq!(right.len(), 1); + assert_eq!(left[0].assume_init(), vs_11.0); + assert_eq!(right[0].assume_init(), vs_11.1); + 2 + } + }; + + assert_eq!(unsafe { cons.pop_access(pop_fn_11) }, 2); +} + +#[test] +fn push_return() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + let push_fn_0 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + assert_eq!(left.len(), 2); + assert_eq!(right.len(), 0); + 0 + }; + + assert_eq!(unsafe { prod.push_access(push_fn_0) }, 0); + + let push_fn_1 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + assert_eq!(left.len(), 2); + assert_eq!(right.len(), 0); + left[0] = MaybeUninit::new(12); + 1 + }; + + assert_eq!(unsafe { prod.push_access(push_fn_1) }, 1); + + let push_fn_2 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + assert_eq!(left.len(), 1); + assert_eq!(right.len(), 0); + left[0] = MaybeUninit::new(34); + 1 + }; + + assert_eq!(unsafe { prod.push_access(push_fn_2) }, 1); + + assert_eq!(cons.pop().unwrap(), 12); + assert_eq!(cons.pop().unwrap(), 34); + assert_eq!(cons.pop(), None); +} + +#[test] +fn pop_return() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + assert_eq!(prod.push(12), Ok(())); + assert_eq!(prod.push(34), Ok(())); + assert_eq!(prod.push(0), Err(0)); + + let pop_fn_0 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + assert_eq!(left.len(), 2); + assert_eq!(right.len(), 0); + 0 + }; + + assert_eq!(unsafe { cons.pop_access(pop_fn_0) }, 0); + + let pop_fn_1 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + unsafe { + assert_eq!(left.len(), 2); + assert_eq!(right.len(), 0); + assert_eq!(left[0].assume_init(), 12); + 1 + } + }; + + assert_eq!(unsafe { cons.pop_access(pop_fn_1) }, 1); + + let pop_fn_2 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + unsafe { + assert_eq!(left.len(), 1); + assert_eq!(right.len(), 0); + assert_eq!(left[0].assume_init(), 34); + 1 + } + }; + + assert_eq!(unsafe { cons.pop_access(pop_fn_2) }, 1); +} + +#[test] +fn push_pop() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + let vs_20 = (123, 456); + let push_fn_20 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + assert_eq!(left.len(), 2); + assert_eq!(right.len(), 0); + left[0] = MaybeUninit::new(vs_20.0); + left[1] = MaybeUninit::new(vs_20.1); + 2 + }; + + assert_eq!(unsafe { prod.push_access(push_fn_20) }, 2); + + let pop_fn_20 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + unsafe { + assert_eq!(left.len(), 2); + assert_eq!(right.len(), 0); + assert_eq!(left[0].assume_init(), vs_20.0); + assert_eq!(left[1].assume_init(), vs_20.1); + 2 + } + }; + + assert_eq!(unsafe { cons.pop_access(pop_fn_20) }, 2); + + let vs_11 = (123, 456); + let push_fn_11 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + assert_eq!(left.len(), 1); + assert_eq!(right.len(), 1); + left[0] = MaybeUninit::new(vs_11.0); + right[0] = MaybeUninit::new(vs_11.1); + 2 + }; + + assert_eq!(unsafe { prod.push_access(push_fn_11) }, 2); + + let pop_fn_11 = |left: &mut [MaybeUninit<i32>], right: &mut [MaybeUninit<i32>]| -> usize { + unsafe { + assert_eq!(left.len(), 1); + assert_eq!(right.len(), 1); + assert_eq!(left[0].assume_init(), vs_11.0); + assert_eq!(right[0].assume_init(), vs_11.1); + 2 + } + }; + + assert_eq!(unsafe { cons.pop_access(pop_fn_11) }, 2); +} diff --git a/third_party/rust/ringbuf/src/tests/drop.rs b/third_party/rust/ringbuf/src/tests/drop.rs new file mode 100644 index 0000000000..d07b9b1ae5 --- /dev/null +++ b/third_party/rust/ringbuf/src/tests/drop.rs @@ -0,0 +1,240 @@ +use std::{cell::RefCell, collections::HashSet}; + +use crate::RingBuffer; + +#[derive(Debug)] +struct Dropper<'a> { + id: i32, + set: &'a RefCell<HashSet<i32>>, +} + +impl<'a> Dropper<'a> { + fn new(set: &'a RefCell<HashSet<i32>>, id: i32) -> Self { + if !set.borrow_mut().insert(id) { + panic!("value {} already exists", id); + } + Self { set, id } + } +} + +impl<'a> Drop for Dropper<'a> { + fn drop(&mut self) { + if !self.set.borrow_mut().remove(&self.id) { + panic!("value {} already removed", self.id); + } + } +} + +#[test] +fn single() { + let set = RefCell::new(HashSet::new()); + + let cap = 3; + let buf = RingBuffer::new(cap); + + assert_eq!(set.borrow().len(), 0); + + { + let (mut prod, mut cons) = buf.split(); + + prod.push(Dropper::new(&set, 1)).unwrap(); + assert_eq!(set.borrow().len(), 1); + prod.push(Dropper::new(&set, 2)).unwrap(); + assert_eq!(set.borrow().len(), 2); + prod.push(Dropper::new(&set, 3)).unwrap(); + assert_eq!(set.borrow().len(), 3); + + cons.pop().unwrap(); + assert_eq!(set.borrow().len(), 2); + cons.pop().unwrap(); + assert_eq!(set.borrow().len(), 1); + + prod.push(Dropper::new(&set, 4)).unwrap(); + assert_eq!(set.borrow().len(), 2); + } + + assert_eq!(set.borrow().len(), 0); +} + +#[test] +fn multiple_each() { + let set = RefCell::new(HashSet::new()); + + let cap = 5; + let buf = RingBuffer::new(cap); + + assert_eq!(set.borrow().len(), 0); + + { + let (mut prod, mut cons) = buf.split(); + let mut id = 0; + let mut cnt = 0; + + assert_eq!( + prod.push_each(|| { + if cnt < 4 { + id += 1; + cnt += 1; + Some(Dropper::new(&set, id)) + } else { + None + } + }), + 4 + ); + assert_eq!(cnt, 4); + assert_eq!(cnt, set.borrow().len()); + + assert_eq!( + cons.pop_each( + |_| { + cnt -= 1; + true + }, + Some(2) + ), + 2 + ); + assert_eq!(cnt, 2); + assert_eq!(cnt, set.borrow().len()); + + assert_eq!( + prod.push_each(|| { + id += 1; + cnt += 1; + Some(Dropper::new(&set, id)) + }), + 3 + ); + assert_eq!(cnt, 5); + assert_eq!(cnt, set.borrow().len()); + + assert_eq!( + cons.pop_each( + |_| { + cnt -= 1; + true + }, + None + ), + 5 + ); + assert_eq!(cnt, 0); + assert_eq!(cnt, set.borrow().len()); + + assert_eq!( + prod.push_each(|| { + id += 1; + cnt += 1; + Some(Dropper::new(&set, id)) + }), + 5 + ); + assert_eq!(cnt, 5); + assert_eq!(cnt, set.borrow().len()); + } + + assert_eq!(set.borrow().len(), 0); +} + +/// Test the `pop_each` with internal function that returns false +#[test] +fn pop_each_test1() { + let cap = 10usize; + let (mut producer, mut consumer) = RingBuffer::new(cap).split(); + + for i in 0..cap { + producer.push((i, vec![0u8; 1000])).unwrap(); + } + + for _ in 0..cap { + let removed = consumer.pop_each(|_val| -> bool { false }, None); + assert_eq!(removed, 1); + } + + assert_eq!(consumer.len(), 0); +} + +/// Test the `pop_each` with capped pop +#[test] +fn pop_each_test2() { + let cap = 10usize; + let (mut producer, mut consumer) = RingBuffer::new(cap).split(); + + for i in 0..cap { + producer.push((i, vec![0u8; 1000])).unwrap(); + } + + for _ in 0..cap { + let removed = consumer.pop_each(|_val| -> bool { true }, Some(1)); + assert_eq!(removed, 1); + } + + assert_eq!(consumer.len(), 0); +} + +/// Test the `push_each` with internal function that adds only 1 element. +#[test] +fn push_each_test1() { + let cap = 10usize; + let (mut producer, mut consumer) = RingBuffer::new(cap).split(); + + for i in 0..cap { + let mut count = 0; + // Add 1 element at a time + let added = producer.push_each(|| -> Option<(usize, Vec<u8>)> { + if count == 0 { + count += 1; + Some((i, vec![0u8; 1000])) + } else { + None + } + }); + assert_eq!(added, 1); + } + + for _ in 0..cap { + consumer.pop().unwrap(); + } + + assert_eq!(consumer.len(), 0); +} + +/// Test the `push_each` with split internal buffer +#[test] +fn push_each_test2() { + let cap = 10usize; + let cap_half = 5usize; + let (mut producer, mut consumer) = RingBuffer::new(cap).split(); + + // Fill the entire buffer + for i in 0..cap { + producer.push((i, vec![0u8; 1000])).unwrap(); + } + + // Remove half elements + for _ in 0..cap_half { + consumer.pop().unwrap(); + } + + // Re add half elements one by one and check the return count. + for i in 0..cap_half { + let mut count = 0; + // Add 1 element at a time + let added = producer.push_each(|| -> Option<(usize, Vec<u8>)> { + if count == 0 { + count += 1; + Some((i, vec![0u8; 1000])) + } else { + None + } + }); + assert_eq!(added, 1); + } + + for _ in 0..cap { + consumer.pop().unwrap(); + } + + assert_eq!(consumer.len(), 0); +} diff --git a/third_party/rust/ringbuf/src/tests/message.rs b/third_party/rust/ringbuf/src/tests/message.rs new file mode 100644 index 0000000000..0a20a6137b --- /dev/null +++ b/third_party/rust/ringbuf/src/tests/message.rs @@ -0,0 +1,167 @@ +use std::{ + io::{self, Read, Write}, + thread, + time::Duration, +}; + +use crate::RingBuffer; + +const THE_BOOK_FOREWORD: &'static str = " +It wasn’t always so clear, but the Rust programming language is fundamentally about empowerment: no matter what kind of code you are writing now, Rust empowers you to reach farther, to program with confidence in a wider variety of domains than you did before. +Take, for example, “systems-level” work that deals with low-level details of memory management, data representation, and concurrency. Traditionally, this realm of programming is seen as arcane, accessible only to a select few who have devoted the necessary years learning to avoid its infamous pitfalls. And even those who practice it do so with caution, lest their code be open to exploits, crashes, or corruption. +Rust breaks down these barriers by eliminating the old pitfalls and providing a friendly, polished set of tools to help you along the way. Programmers who need to “dip down” into lower-level control can do so with Rust, without taking on the customary risk of crashes or security holes, and without having to learn the fine points of a fickle toolchain. Better yet, the language is designed to guide you naturally towards reliable code that is efficient in terms of speed and memory usage. +Programmers who are already working with low-level code can use Rust to raise their ambitions. For example, introducing parallelism in Rust is a relatively low-risk operation: the compiler will catch the classical mistakes for you. And you can tackle more aggressive optimizations in your code with the confidence that you won’t accidentally introduce crashes or vulnerabilities. +But Rust isn’t limited to low-level systems programming. It’s expressive and ergonomic enough to make CLI apps, web servers, and many other kinds of code quite pleasant to write — you’ll find simple examples of both later in the book. Working with Rust allows you to build skills that transfer from one domain to another; you can learn Rust by writing a web app, then apply those same skills to target your Raspberry Pi. +This book fully embraces the potential of Rust to empower its users. It’s a friendly and approachable text intended to help you level up not just your knowledge of Rust, but also your reach and confidence as a programmer in general. So dive in, get ready to learn—and welcome to the Rust community! + +— Nicholas Matsakis and Aaron Turon +"; + +#[test] +fn push_pop_slice() { + let buf = RingBuffer::<u8>::new(7); + let (mut prod, mut cons) = buf.split(); + + let smsg = THE_BOOK_FOREWORD; + + let pjh = thread::spawn(move || { + let mut bytes = smsg.as_bytes(); + while bytes.len() > 0 { + let n = prod.push_slice(bytes); + if n > 0 { + bytes = &bytes[n..bytes.len()] + } else { + thread::sleep(Duration::from_millis(1)) + } + } + loop { + match prod.push(0) { + Ok(()) => break, + Err(_) => thread::sleep(Duration::from_millis(1)), + } + } + }); + + let cjh = thread::spawn(move || { + let mut bytes = Vec::<u8>::new(); + let mut buffer = [0; 5]; + loop { + let n = cons.pop_slice(&mut buffer); + if n > 0 { + bytes.extend_from_slice(&buffer[0..n]) + } else { + if bytes.ends_with(&[0]) { + break; + } else { + thread::sleep(Duration::from_millis(1)); + } + } + } + + assert_eq!(bytes.pop().unwrap(), 0); + String::from_utf8(bytes).unwrap() + }); + + pjh.join().unwrap(); + let rmsg = cjh.join().unwrap(); + + assert_eq!(smsg, rmsg); +} + +#[test] +fn read_from_write_into() { + let buf = RingBuffer::<u8>::new(7); + let (mut prod, mut cons) = buf.split(); + + let smsg = THE_BOOK_FOREWORD; + + let pjh = thread::spawn(move || { + let zero = [0 as u8]; + let mut bytes = smsg.as_bytes().chain(&zero[..]); + loop { + if prod.is_full() { + thread::sleep(Duration::from_millis(1)); + } else { + if prod.read_from(&mut bytes, None).unwrap() == 0 { + break; + } + } + } + }); + + let cjh = thread::spawn(move || { + let mut bytes = Vec::<u8>::new(); + loop { + if cons.is_empty() { + if bytes.ends_with(&[0]) { + break; + } else { + thread::sleep(Duration::from_millis(1)); + } + } else { + cons.write_into(&mut bytes, None).unwrap(); + } + } + + assert_eq!(bytes.pop().unwrap(), 0); + String::from_utf8(bytes).unwrap() + }); + + pjh.join().unwrap(); + let rmsg = cjh.join().unwrap(); + + assert_eq!(smsg, rmsg); +} + +#[test] +fn read_write() { + let buf = RingBuffer::<u8>::new(7); + let (mut prod, mut cons) = buf.split(); + + let smsg = THE_BOOK_FOREWORD; + + let pjh = thread::spawn(move || { + let mut bytes = smsg.as_bytes(); + while bytes.len() > 0 { + match prod.write(bytes) { + Ok(n) => bytes = &bytes[n..bytes.len()], + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + thread::sleep(Duration::from_millis(1)); + } + } + } + loop { + match prod.push(0) { + Ok(()) => break, + Err(_) => thread::sleep(Duration::from_millis(1)), + } + } + }); + + let cjh = thread::spawn(move || { + let mut bytes = Vec::<u8>::new(); + let mut buffer = [0; 5]; + loop { + match cons.read(&mut buffer) { + Ok(n) => bytes.extend_from_slice(&buffer[0..n]), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + if bytes.ends_with(&[0]) { + break; + } else { + thread::sleep(Duration::from_millis(1)); + } + } + } + } + + assert_eq!(bytes.pop().unwrap(), 0); + String::from_utf8(bytes).unwrap() + }); + + pjh.join().unwrap(); + let rmsg = cjh.join().unwrap(); + + assert_eq!(smsg, rmsg); +} diff --git a/third_party/rust/ringbuf/src/tests/mod.rs b/third_party/rust/ringbuf/src/tests/mod.rs new file mode 100644 index 0000000000..c07e9d8316 --- /dev/null +++ b/third_party/rust/ringbuf/src/tests/mod.rs @@ -0,0 +1,6 @@ +mod access; +mod drop; +mod message; +mod multiple; +mod read_write; +mod single; diff --git a/third_party/rust/ringbuf/src/tests/multiple.rs b/third_party/rust/ringbuf/src/tests/multiple.rs new file mode 100644 index 0000000000..002076f0fa --- /dev/null +++ b/third_party/rust/ringbuf/src/tests/multiple.rs @@ -0,0 +1,140 @@ +use crate::RingBuffer; + +#[test] +fn for_each() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + prod.push(10).unwrap(); + prod.push(20).unwrap(); + + let mut sum_1 = 0; + cons.for_each(|v| { + sum_1 += *v; + }); + + let first = cons.pop().expect("First element not available"); + let second = cons.pop().expect("Second element not available"); + + assert_eq!(sum_1, first + second); +} + +#[test] +fn for_each_mut() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + prod.push(10).unwrap(); + prod.push(20).unwrap(); + + cons.for_each_mut(|v| { + *v *= 2; + }); + + let mut sum_1 = 0; + cons.for_each_mut(|v| { + sum_1 += *v; + }); + + let first = cons.pop().expect("First element not available"); + let second = cons.pop().expect("Second element not available"); + + assert_eq!(sum_1, first + second); +} + +#[test] +fn push_pop_slice() { + let buf = RingBuffer::<i32>::new(4); + let (mut prod, mut cons) = buf.split(); + + let mut tmp = [0; 5]; + + assert_eq!(prod.push_slice(&[]), 0); + assert_eq!(prod.push_slice(&[0, 1, 2]), 3); + + assert_eq!(cons.pop_slice(&mut tmp[0..2]), 2); + assert_eq!(tmp[0..2], [0, 1]); + + assert_eq!(prod.push_slice(&[3, 4]), 2); + assert_eq!(prod.push_slice(&[5, 6]), 1); + + assert_eq!(cons.pop_slice(&mut tmp[0..3]), 3); + assert_eq!(tmp[0..3], [2, 3, 4]); + + assert_eq!(prod.push_slice(&[6, 7, 8, 9]), 3); + + assert_eq!(cons.pop_slice(&mut tmp), 4); + assert_eq!(tmp[0..4], [5, 6, 7, 8]); +} + +#[test] +fn move_slice() { + let buf0 = RingBuffer::<i32>::new(4); + let buf1 = RingBuffer::<i32>::new(4); + let (mut prod0, mut cons0) = buf0.split(); + let (mut prod1, mut cons1) = buf1.split(); + + let mut tmp = [0; 5]; + + assert_eq!(prod0.push_slice(&[0, 1, 2]), 3); + + assert_eq!(prod1.move_from(&mut cons0, None), 3); + assert_eq!(prod1.move_from(&mut cons0, None), 0); + + assert_eq!(cons1.pop_slice(&mut tmp), 3); + assert_eq!(tmp[0..3], [0, 1, 2]); + + assert_eq!(prod0.push_slice(&[3, 4, 5]), 3); + + assert_eq!(prod1.move_from(&mut cons0, None), 3); + + assert_eq!(cons1.pop_slice(&mut tmp), 3); + assert_eq!(tmp[0..3], [3, 4, 5]); + + assert_eq!(prod1.push_slice(&[6, 7, 8]), 3); + assert_eq!(prod0.push_slice(&[9, 10]), 2); + + assert_eq!(prod1.move_from(&mut cons0, None), 1); + assert_eq!(prod1.move_from(&mut cons0, None), 0); + + assert_eq!(cons1.pop_slice(&mut tmp), 4); + assert_eq!(tmp[0..4], [6, 7, 8, 9]); +} + +#[test] +fn move_slice_count() { + let buf0 = RingBuffer::<i32>::new(4); + let buf1 = RingBuffer::<i32>::new(4); + let (mut prod0, mut cons0) = buf0.split(); + let (mut prod1, mut cons1) = buf1.split(); + + let mut tmp = [0; 5]; + + assert_eq!(prod0.push_slice(&[0, 1, 2]), 3); + + assert_eq!(prod1.move_from(&mut cons0, Some(2)), 2); + + assert_eq!(cons1.pop_slice(&mut tmp), 2); + assert_eq!(tmp[0..2], [0, 1]); + + assert_eq!(prod1.move_from(&mut cons0, Some(2)), 1); + + assert_eq!(cons1.pop_slice(&mut tmp), 1); + assert_eq!(tmp[0..1], [2]); + + assert_eq!(prod0.push_slice(&[3, 4, 5, 6]), 4); + + assert_eq!(prod1.move_from(&mut cons0, Some(3)), 3); + + assert_eq!(cons1.pop_slice(&mut tmp), 3); + assert_eq!(tmp[0..3], [3, 4, 5]); + + assert_eq!(prod0.push_slice(&[7, 8, 9]), 3); + + assert_eq!(prod1.move_from(&mut cons0, Some(5)), 4); + + assert_eq!(cons1.pop_slice(&mut tmp), 4); + assert_eq!(tmp[0..4], [6, 7, 8, 9]); +} diff --git a/third_party/rust/ringbuf/src/tests/read_write.rs b/third_party/rust/ringbuf/src/tests/read_write.rs new file mode 100644 index 0000000000..08270416af --- /dev/null +++ b/third_party/rust/ringbuf/src/tests/read_write.rs @@ -0,0 +1,159 @@ +use std::io::{self}; + +use crate::RingBuffer; + +#[test] +fn from() { + let buf0 = RingBuffer::<u8>::new(4); + let buf1 = RingBuffer::<u8>::new(4); + let (mut prod0, mut cons0) = buf0.split(); + let (mut prod1, mut cons1) = buf1.split(); + + let mut tmp = [0; 5]; + + assert_eq!(prod0.push_slice(&[0, 1, 2]), 3); + + match prod1.read_from(&mut cons0, None) { + Ok(n) => assert_eq!(n, 3), + other => panic!("{:?}", other), + } + match prod1.read_from(&mut cons0, None) { + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + } + other => panic!("{:?}", other), + } + + assert_eq!(cons1.pop_slice(&mut tmp), 3); + assert_eq!(tmp[0..3], [0, 1, 2]); + + assert_eq!(prod0.push_slice(&[3, 4, 5]), 3); + + match prod1.read_from(&mut cons0, None) { + Ok(n) => assert_eq!(n, 2), + other => panic!("{:?}", other), + } + assert_eq!(cons1.pop_slice(&mut tmp), 2); + assert_eq!(tmp[0..2], [3, 4]); + + match prod1.read_from(&mut cons0, None) { + Ok(n) => assert_eq!(n, 1), + other => panic!("{:?}", other), + } + assert_eq!(cons1.pop_slice(&mut tmp), 1); + assert_eq!(tmp[0..1], [5]); + + assert_eq!(prod1.push_slice(&[6, 7, 8]), 3); + assert_eq!(prod0.push_slice(&[9, 10]), 2); + + match prod1.read_from(&mut cons0, None) { + Ok(n) => assert_eq!(n, 1), + other => panic!("{:?}", other), + } + match prod1.read_from(&mut cons0, None) { + Ok(n) => assert_eq!(n, 0), + other => panic!("{:?}", other), + } + + assert_eq!(cons1.pop_slice(&mut tmp), 4); + assert_eq!(tmp[0..4], [6, 7, 8, 9]); +} + +#[test] +fn into() { + let buf0 = RingBuffer::<u8>::new(4); + let buf1 = RingBuffer::<u8>::new(4); + let (mut prod0, mut cons0) = buf0.split(); + let (mut prod1, mut cons1) = buf1.split(); + + let mut tmp = [0; 5]; + + assert_eq!(prod0.push_slice(&[0, 1, 2]), 3); + + match cons0.write_into(&mut prod1, None) { + Ok(n) => assert_eq!(n, 3), + other => panic!("{:?}", other), + } + match cons0.write_into(&mut prod1, None) { + Ok(n) => assert_eq!(n, 0), + other => panic!("{:?}", other), + } + + assert_eq!(cons1.pop_slice(&mut tmp), 3); + assert_eq!(tmp[0..3], [0, 1, 2]); + + assert_eq!(prod0.push_slice(&[3, 4, 5]), 3); + + match cons0.write_into(&mut prod1, None) { + Ok(n) => assert_eq!(n, 2), + other => panic!("{:?}", other), + } + assert_eq!(cons1.pop_slice(&mut tmp), 2); + assert_eq!(tmp[0..2], [3, 4]); + + match cons0.write_into(&mut prod1, None) { + Ok(n) => assert_eq!(n, 1), + other => panic!("{:?}", other), + } + assert_eq!(cons1.pop_slice(&mut tmp), 1); + assert_eq!(tmp[0..1], [5]); + + assert_eq!(prod1.push_slice(&[6, 7, 8]), 3); + assert_eq!(prod0.push_slice(&[9, 10]), 2); + + match cons0.write_into(&mut prod1, None) { + Ok(n) => assert_eq!(n, 1), + other => panic!("{:?}", other), + } + match cons0.write_into(&mut prod1, None) { + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + } + other => panic!("{:?}", other), + } + + assert_eq!(cons1.pop_slice(&mut tmp), 4); + assert_eq!(tmp[0..4], [6, 7, 8, 9]); +} + +#[test] +fn count() { + let buf0 = RingBuffer::<u8>::new(4); + let buf1 = RingBuffer::<u8>::new(4); + let (mut prod0, mut cons0) = buf0.split(); + let (mut prod1, mut cons1) = buf1.split(); + + let mut tmp = [0; 5]; + + assert_eq!(prod0.push_slice(&[0, 1, 2, 3]), 4); + + match prod1.read_from(&mut cons0, Some(3)) { + Ok(n) => assert_eq!(n, 3), + other => panic!("{:?}", other), + } + match prod1.read_from(&mut cons0, Some(2)) { + Ok(n) => assert_eq!(n, 1), + other => panic!("{:?}", other), + } + + assert_eq!(cons1.pop_slice(&mut tmp), 4); + assert_eq!(tmp[0..4], [0, 1, 2, 3]); + + assert_eq!(prod0.push_slice(&[4, 5, 6, 7]), 4); + + match cons0.write_into(&mut prod1, Some(3)) { + Ok(n) => assert_eq!(n, 1), + other => panic!("{:?}", other), + } + match cons0.write_into(&mut prod1, Some(2)) { + Ok(n) => assert_eq!(n, 2), + other => panic!("{:?}", other), + } + match cons0.write_into(&mut prod1, Some(2)) { + Ok(n) => assert_eq!(n, 1), + other => panic!("{:?}", other), + } + + assert_eq!(cons1.pop_slice(&mut tmp), 4); + assert_eq!(tmp[0..4], [4, 5, 6, 7]); +} diff --git a/third_party/rust/ringbuf/src/tests/single.rs b/third_party/rust/ringbuf/src/tests/single.rs new file mode 100644 index 0000000000..29958861fe --- /dev/null +++ b/third_party/rust/ringbuf/src/tests/single.rs @@ -0,0 +1,201 @@ +use std::{sync::atomic::Ordering, thread}; + +use crate::RingBuffer; + +fn head_tail<T>(rb: &RingBuffer<T>) -> (usize, usize) { + ( + rb.head.load(Ordering::Acquire), + rb.tail.load(Ordering::Acquire), + ) +} + +#[test] +fn capacity() { + let cap = 13; + let buf = RingBuffer::<i32>::new(cap); + assert_eq!(buf.capacity(), cap); +} +#[test] +fn split_capacity() { + let cap = 13; + let buf = RingBuffer::<i32>::new(cap); + let (prod, cons) = buf.split(); + + assert_eq!(prod.capacity(), cap); + assert_eq!(cons.capacity(), cap); +} + +#[test] +fn split_threads() { + let buf = RingBuffer::<i32>::new(10); + let (prod, cons) = buf.split(); + + let pjh = thread::spawn(move || { + let _ = prod; + }); + + let cjh = thread::spawn(move || { + let _ = cons; + }); + + pjh.join().unwrap(); + cjh.join().unwrap(); +} + +#[test] +fn push() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, _) = buf.split(); + + assert_eq!(head_tail(&prod.rb), (0, 0)); + + assert_eq!(prod.push(123), Ok(())); + assert_eq!(head_tail(&prod.rb), (0, 1)); + + assert_eq!(prod.push(234), Ok(())); + assert_eq!(head_tail(&prod.rb), (0, 2)); + + assert_eq!(prod.push(345), Err(345)); + assert_eq!(head_tail(&prod.rb), (0, 2)); +} + +#[test] +fn pop_empty() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (_, mut cons) = buf.split(); + + assert_eq!(head_tail(&cons.rb), (0, 0)); + + assert_eq!(cons.pop(), None); + assert_eq!(head_tail(&cons.rb), (0, 0)); +} + +#[test] +fn push_pop_one() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + let vcap = cap + 1; + let values = [12, 34, 56, 78, 90]; + assert_eq!(head_tail(&cons.rb), (0, 0)); + + for (i, v) in values.iter().enumerate() { + assert_eq!(prod.push(*v), Ok(())); + assert_eq!(head_tail(&cons.rb), (i % vcap, (i + 1) % vcap)); + + assert_eq!(cons.pop().unwrap(), *v); + assert_eq!(head_tail(&cons.rb), ((i + 1) % vcap, (i + 1) % vcap)); + + assert_eq!(cons.pop(), None); + assert_eq!(head_tail(&cons.rb), ((i + 1) % vcap, (i + 1) % vcap)); + } +} + +#[test] +fn push_pop_all() { + let cap = 2; + let buf = RingBuffer::<i32>::new(cap); + let (mut prod, mut cons) = buf.split(); + + let vcap = cap + 1; + let values = [(12, 34, 13), (56, 78, 57), (90, 10, 91)]; + assert_eq!(head_tail(&cons.rb), (0, 0)); + + for (i, v) in values.iter().enumerate() { + assert_eq!(prod.push(v.0), Ok(())); + assert_eq!(head_tail(&cons.rb), (cap * i % vcap, (cap * i + 1) % vcap)); + + assert_eq!(prod.push(v.1), Ok(())); + assert_eq!(head_tail(&cons.rb), (cap * i % vcap, (cap * i + 2) % vcap)); + + assert_eq!(prod.push(v.2).unwrap_err(), v.2); + assert_eq!(head_tail(&cons.rb), (cap * i % vcap, (cap * i + 2) % vcap)); + + assert_eq!(cons.pop().unwrap(), v.0); + assert_eq!( + head_tail(&cons.rb), + ((cap * i + 1) % vcap, (cap * i + 2) % vcap) + ); + + assert_eq!(cons.pop().unwrap(), v.1); + assert_eq!( + head_tail(&cons.rb), + ((cap * i + 2) % vcap, (cap * i + 2) % vcap) + ); + + assert_eq!(cons.pop(), None); + assert_eq!( + head_tail(&cons.rb), + ((cap * i + 2) % vcap, (cap * i + 2) % vcap) + ); + } +} + +#[test] +fn empty_full() { + let buf = RingBuffer::<i32>::new(1); + let (mut prod, cons) = buf.split(); + + assert!(prod.is_empty()); + assert!(cons.is_empty()); + assert!(!prod.is_full()); + assert!(!cons.is_full()); + + assert_eq!(prod.push(123), Ok(())); + + assert!(!prod.is_empty()); + assert!(!cons.is_empty()); + assert!(prod.is_full()); + assert!(cons.is_full()); +} + +#[test] +fn len_remaining() { + let buf = RingBuffer::<i32>::new(2); + let (mut prod, mut cons) = buf.split(); + + assert_eq!(prod.len(), 0); + assert_eq!(cons.len(), 0); + assert_eq!(prod.remaining(), 2); + assert_eq!(cons.remaining(), 2); + + assert_eq!(prod.push(123), Ok(())); + + assert_eq!(prod.len(), 1); + assert_eq!(cons.len(), 1); + assert_eq!(prod.remaining(), 1); + assert_eq!(cons.remaining(), 1); + + assert_eq!(prod.push(456), Ok(())); + + assert_eq!(prod.len(), 2); + assert_eq!(cons.len(), 2); + assert_eq!(prod.remaining(), 0); + assert_eq!(cons.remaining(), 0); + + assert_eq!(cons.pop(), Some(123)); + + assert_eq!(prod.len(), 1); + assert_eq!(cons.len(), 1); + assert_eq!(prod.remaining(), 1); + assert_eq!(cons.remaining(), 1); + + assert_eq!(cons.pop(), Some(456)); + + assert_eq!(prod.len(), 0); + assert_eq!(cons.len(), 0); + assert_eq!(prod.remaining(), 2); + assert_eq!(cons.remaining(), 2); + + // now head is at 2, so tail will be at 0. This caught an overflow error + // when tail+1 < head because of the substraction of usize. + assert_eq!(prod.push(789), Ok(())); + + assert_eq!(prod.len(), 1); + assert_eq!(cons.len(), 1); + assert_eq!(prod.remaining(), 1); + assert_eq!(cons.remaining(), 1); +} |