diff options
Diffstat (limited to 'third_party/rust/oneshot-uniffi')
27 files changed, 4030 insertions, 0 deletions
diff --git a/third_party/rust/oneshot-uniffi/.cargo-checksum.json b/third_party/rust/oneshot-uniffi/.cargo-checksum.json new file mode 100644 index 0000000000..6b252c6773 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"e1165d97c283b915d87e22f209494be39933723a0b544e725f69cfa5cef3876c","Cargo.lock":"7625529900ca1e3626b90e74ef268b31e06da85e8334a885711fdfd80821ddda","Cargo.toml":"81dde8ad3180c7b97325a6a67bfbefb145590606de9008d881c4e04808865f0a","README.md":"811ea1c958d5a65583d0223b7ab09bb282e7a51ed60f9a2cb90ef6d555325a68","benches/benches.rs":"67dcc916d0b7e28e396c28dac0499726366e1cb10e9991948d1c881a5abf5faa","check_mem_leaks.sh":"c1ab6ef27997c7f971352ab1c86a184004843c499bc24925da953aefcf1c624c","examples/recv_before_send.rs":"9a3cabcc2878990b61787d0048061b382555a8cd1a08b1ddec63a6e8a4a31e56","examples/recv_before_send_then_drop_sender.rs":"14706c6b4308a690662ceaa47f1699588bd833b3ec020eb9f42f220f3ffc7ae7","examples/recv_ref_before_send.rs":"43699f4720c46b5f138c260b866eb708ddf616e2b442ffa74a97373f4f48d4d0","examples/recv_ref_before_send_then_drop_sender.rs":"a190ed220cb4288d4965485365c9afaed30535cbfad5f8cb7389071b82d67cac","examples/recv_timeout_before_send.rs":"2262aa6531afce7816d43182ad9cbec2c04f3dc129064e11e89452278ce8b163","examples/recv_timeout_before_send_then_drop_sender.rs":"4cc8eade4c211f52f5b9be0f72a5906689b894490f4cb5255525e44106e7a4a8","examples/recv_with_dropped_sender.rs":"7906685053ce1c53ff6c26ce11d3221d4bf5ca3429d1d4d2c28de9237cb151c6","examples/send_before_recv.rs":"5555bd61ad52273b663007794128d8f012fc54272bd3225259b5546221bcd591","examples/send_then_drop_receiver.rs":"c3612de207309098404b057468687a2d2311d07f354b7e046398e35e93c4cdcf","examples/send_with_dropped_receiver.rs":"f5a7762b231a24a0db4397c5139437cba155d09b9dbb59872d662c7923080706","src/errors.rs":"df6a1db663fdb1c54d6941d737f6591bfe0dc6f01bd627ba0a94d67ed50b27a9","src/lib.rs":"86893f56e8e762b41ee079b42f4248608e9efb68bd76aa9550fce61e7466bbb0","src/loombox.rs":"fc85d1c2d3fda432be60f0c4d1d528e5998ec2b738a5b395a242285051b94d65","tests/assert_mem.rs":"b1e5190af01af22e55c7c1cd1ff2711807591f788e4eb8b6c6d89123e146105e","tests/async.rs":"6fd2826e589b94677d4eeed1080deda8bcc429aa05a20d843d1442a3a48ea757","tests/future.rs":"0e71f0293cd5a8c44210e8882aca20cfbf1e3771ecd4e4f6b59b924c0d01dd97","tests/helpers/mod.rs":"19161ed33e0ba8862746f04678d0606dee90205896083f85d8c1dcd4d211ccb0","tests/helpers/waker.rs":"77494d49f62d0d320df3830643c306e06e6e20751d210cf6fa58b238bd96c3f9","tests/loom.rs":"ea350fa424a95581e1871bc0037badecc5a090f28fd10532917abbaf561218ab","tests/sync.rs":"1186fa6cdb5a180944fa7d793ccb8be412c4a4e88bb504daa70bc097ee081b06"},"package":"9ae4988774e7a7e6a0783d119bdc683ea8c1d01a24d4fff9b4bdc280e07bd99e"}
\ No newline at end of file diff --git a/third_party/rust/oneshot-uniffi/CHANGELOG.md b/third_party/rust/oneshot-uniffi/CHANGELOG.md new file mode 100644 index 0000000000..e12f6cc979 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/CHANGELOG.md @@ -0,0 +1,62 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) +and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). + +### Categories each change fall into + +* **Added**: for new features. +* **Changed**: for changes in existing functionality. +* **Deprecated**: for soon-to-be removed features. +* **Removed**: for now removed features. +* **Fixed**: for any bug fixes. +* **Security**: in case of vulnerabilities. + + +## [Unreleased] + +## [0.1.5] - 2022-09-01 +### Fixed +- Handle the UNPARKING state correctly in all recv methods. `try_recv` will now not panic + if used on a `Receiver` that is being unparked from an async wait. The other `recv` methods + will still panic (as they should), but with a better error message. + + +## [0.1.4] - 2022-08-30 +### Changed +- Upgrade to Rust edition 2021. Also increases the MSRV to Rust 1.60. +- Add null-pointer optimization to `Sender`, `Receiver` and `SendError`. + This reduces the call stack size of Sender::send and it makes + `Option<Sender>` and `Option<Receiver>` pointer sized (#18). +- Relax the memory ordering of all atomic operations from `SeqCst` to the most appropriate + lower ordering (#17 + #20). + +### Fixed +- Fix undefined behavior due to multiple mutable references to the same channel instance (#18). +- Fix race condition that could happen during unparking of a receiving `Receiver` (#17 + #20). + + +## [0.1.3] - 2021-11-23 +### Fixed +- Keep the *last* `Waker` in `Future::poll`, not the *first* one. Stops breaking the contract + on how futures should work. + + +## [0.1.2] - 2020-08-11 +### Fixed +- Fix unreachable code panic that happened if the `Receiver` of an empty but open channel was + polled and then dropped. + + +## [0.1.1] - 2020-05-10 +Initial implementation. Supports basically all the (for now) intended functionality. +Sender is as lock-free as I think it can get and the receiver can both do thread blocking +and be awaited asynchronously. The receiver also has a wait-free `try_recv` method. + +The crate has two features. They are activated by default, but the user can opt out of async +support as well as usage of libstd (making the crate `no_std` but still requiring liballoc) + + +## [0.1.0] - 2019-05-30 +Name reserved on crate.io by someone other than the author of this crate. diff --git a/third_party/rust/oneshot-uniffi/Cargo.lock b/third_party/rust/oneshot-uniffi/Cargo.lock new file mode 100644 index 0000000000..1b75d43675 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/Cargo.lock @@ -0,0 +1,1118 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "aho-corasick" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a" +dependencies = [ + "memchr", +] + +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi 0.1.19", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "log", +] + +[[package]] +name = "bumpalo" +version = "3.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" + +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "2.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "bitflags", + "textwrap", + "unicode-width", +] + +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "criterion" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f" +dependencies = [ + "atty", + "cast", + "clap", + "criterion-plot", + "csv", + "itertools", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_cbor", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2673cc8207403546f45f5fd319a974b1e6983ad1a3ee7e6041650013be041876" +dependencies = [ + "cast", + "itertools", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "csv" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "626ae34994d3d8d668f4269922248239db4ae42d538b14c398b74a52208e8086" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + +[[package]] +name = "errno" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "gimli" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" + +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "half" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "hermit-abi" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi 0.3.2", + "libc", + "windows-sys", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" + +[[package]] +name = "js-sys" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.147" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" + +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] + +[[package]] +name = "memchr" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76fc44e2588d5b436dbc3c6cf62aef290f90dab6235744a93dfe1cc18f451e2c" + +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + +[[package]] +name = "num-traits" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.2", + "libc", +] + +[[package]] +name = "object" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" + +[[package]] +name = "oneshot-uniffi" +version = "0.1.5" +dependencies = [ + "async-std", + "criterion", + "tokio", +] + +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + +[[package]] +name = "pin-project-lite" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "plotters" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c224ba00d7cadd4d5c660deaf2098e5e80e07846537c51f9cfa4be50c1fd45" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e76628b4d3a7581389a35d5b6e2139607ad7c75b17aed325f210aa91f4a9609" + +[[package]] +name = "plotters-svg" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f6d39893cca0701371e3c27294f09797214b86f1fb951b89ade8ec04e2abab" +dependencies = [ + "plotters-backend", +] + +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys", +] + +[[package]] +name = "proc-macro2" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rayon" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + +[[package]] +name = "regex" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "rustix" +version = "0.37.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys", +] + +[[package]] +name = "ryu" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" + +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.188" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_cbor" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" +dependencies = [ + "half", + "serde", +] + +[[package]] +name = "serde_derive" +version = "1.0.188" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "serde_json" +version = "1.0.105" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "tokio" +version = "1.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +dependencies = [ + "backtrace", + "num_cpus", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "unicode-ident" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" + +[[package]] +name = "unicode-width" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" + +[[package]] +name = "value-bag" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" + +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + +[[package]] +name = "walkdir" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698" +dependencies = [ + "same-file", + "winapi-util", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.29", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" + +[[package]] +name = "web-sys" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" diff --git a/third_party/rust/oneshot-uniffi/Cargo.toml b/third_party/rust/oneshot-uniffi/Cargo.toml new file mode 100644 index 0000000000..078a477fb2 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/Cargo.toml @@ -0,0 +1,64 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2021" +rust-version = "1.60.0" +name = "oneshot-uniffi" +version = "0.1.5" +authors = ["Linus Färnstrand <faern@faern.net>"] +description = """ +Patched version of oneshot specifically for the UniFFI project. + +This removes the `loom` target and dependency which helps with UniFFI's downstream consumers. +""" +readme = "README.md" +keywords = [ + "oneshot", + "spsc", + "async", + "sync", + "channel", +] +categories = [ + "asynchronous", + "concurrency", +] +license = "MIT OR Apache-2.0" +repository = "https://github.com/faern/oneshot" + +[[bench]] +name = "benches" +harness = false + +[dev-dependencies.async-std] +version = "1" +features = ["attributes"] + +[dev-dependencies.criterion] +version = "0.3" + +[dev-dependencies.tokio] +version = "1" +features = [ + "rt", + "rt-multi-thread", + "macros", + "time", +] + +[features] +async = [] +default = [ + "std", + "async", +] +std = [] diff --git a/third_party/rust/oneshot-uniffi/README.md b/third_party/rust/oneshot-uniffi/README.md new file mode 100644 index 0000000000..535f011778 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/README.md @@ -0,0 +1,94 @@ +# oneshot + +Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance +can only transport a single message. This has a few nice outcomes. One thing is that +the implementation can be very efficient, utilizing the knowledge that there will +only be one message. But more importantly, it allows the API to be expressed in such +a way that certain edge cases that you don't want to care about when only sending a +single message on a channel does not exist. For example: The sender can't be copied +or cloned, and the send method takes ownership and consumes the sender. +So you are guaranteed, at the type level, that there can only be one message sent. + +The sender's send method is non-blocking, and potentially lock- and wait-free. +See documentation on [Sender::send] for situations where it might not be fully wait-free. +The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time +limited thread blocking receive operations. The receiver also implements `Future` and +supports asynchronously awaiting the message. + + +## Examples + +This example sets up a background worker that processes requests coming in on a standard +mpsc channel and replies on a oneshot channel provided with each request. The worker can +be interacted with both from sync and async contexts since the oneshot receiver +can receive both blocking and async. + +```rust +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +type Request = String; + +// Starts a background thread performing some computation on requests sent to it. +// Delivers the response back over a oneshot channel. +fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> { + let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>(); + thread::spawn(move || { + for (request_data, response_sender) in request_receiver.iter() { + let compute_operation = || request_data.len(); + let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel + } + }); + request_sender +} + +let processor = spawn_processing_thread(); + +// If compiled with `std` the library can receive messages with timeout on regular threads +#[cfg(feature = "std")] { + let (response_sender, response_receiver) = oneshot::channel(); + let request = Request::from("data from sync thread"); + + processor.send((request, response_sender)).expect("Processor down"); + match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel + Ok(result) => println!("Processor returned {}", result), + Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"), + Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"), + } +} + +// If compiled with the `async` feature, the `Receiver` can be awaited in an async context +#[cfg(feature = "async")] { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let (response_sender, response_receiver) = oneshot::channel(); + let request = Request::from("data from sync thread"); + + processor.send((request, response_sender)).expect("Processor down"); + match response_receiver.await { // <- Receive on the oneshot channel asynchronously + Ok(result) => println!("Processor returned {}", result), + Err(_e) => panic!("Processor exited"), + } + }); +} +``` + +## Sync vs async + +The main motivation for writing this library was that there were no (known to me) channel +implementations allowing you to seamlessly send messages between a normal thread and an async +task, or the other way around. If message passing is the way you are communicating, of course +that should work smoothly between the sync and async parts of the program! + +This library achieves that by having a fast and cheap send operation that can +be used in both sync threads and async tasks. The receiver has both thread blocking +receive methods for synchronous usage, and implements `Future` for asynchronous usage. + +The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on +in an asynchronous task. This implementation is completely executor/runtime agnostic. It should +be possible to use this library with any executor. + + +License: MIT OR Apache-2.0 diff --git a/third_party/rust/oneshot-uniffi/benches/benches.rs b/third_party/rust/oneshot-uniffi/benches/benches.rs new file mode 100644 index 0000000000..438d46a498 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/benches/benches.rs @@ -0,0 +1,122 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use std::mem; +use std::time::{Duration, Instant}; + +criterion_group!(benches, bench); +criterion_main!(benches); + +macro_rules! bench_send_and_recv { + ($c:expr, $($type:ty => $value:expr);+) => { + // Sanity check that all $values are of $type. + $(let _: $type = $value;)* + { + let mut group = $c.benchmark_group("create_channel"); + $(group.bench_function(stringify!($type), |b| { + b.iter(oneshot::channel::<$type>) + });)* + group.finish(); + } + { + let mut group = $c.benchmark_group("create_and_send"); + $(group.bench_function(stringify!($type), |b| { + b.iter(|| { + let (sender, _receiver) = oneshot::channel(); + sender.send(black_box($value)).unwrap() + }); + });)* + group.finish(); + } + { + let mut group = $c.benchmark_group("create_and_send_on_closed"); + $(group.bench_function(stringify!($type), |b| { + b.iter(|| { + let (sender, _) = oneshot::channel(); + sender.send(black_box($value)).unwrap_err() + }); + });)* + group.finish(); + } + { + let mut group = $c.benchmark_group("create_send_and_recv"); + $(group.bench_function(stringify!($type), |b| { + b.iter(|| { + let (sender, receiver) = oneshot::channel(); + sender.send(black_box($value)).unwrap(); + receiver.recv().unwrap() + }); + });)* + group.finish(); + } + { + let mut group = $c.benchmark_group("create_send_and_recv_ref"); + $(group.bench_function(stringify!($type), |b| { + b.iter(|| { + let (sender, receiver) = oneshot::channel(); + sender.send(black_box($value)).unwrap(); + receiver.recv_ref().unwrap() + }); + });)* + group.finish(); + } + }; +} + +fn bench(c: &mut Criterion) { + bench_send_and_recv!(c, + () => (); + u8 => 7u8; + usize => 9876usize; + u128 => 1234567u128; + [u8; 64] => [0b10101010u8; 64]; + [u8; 4096] => [0b10101010u8; 4096] + ); + + bench_try_recv(c); + bench_recv_deadline_now(c); + bench_recv_timeout_zero(c); +} + +fn bench_try_recv(c: &mut Criterion) { + let (sender, receiver) = oneshot::channel::<u128>(); + c.bench_function("try_recv_empty", |b| { + b.iter(|| receiver.try_recv().unwrap_err()) + }); + mem::drop(sender); + c.bench_function("try_recv_empty_closed", |b| { + b.iter(|| receiver.try_recv().unwrap_err()) + }); +} + +fn bench_recv_deadline_now(c: &mut Criterion) { + let now = Instant::now(); + { + let (_sender, receiver) = oneshot::channel::<u128>(); + c.bench_function("recv_deadline_now", |b| { + b.iter(|| receiver.recv_deadline(now).unwrap_err()) + }); + } + { + let (sender, receiver) = oneshot::channel::<u128>(); + mem::drop(sender); + c.bench_function("recv_deadline_now_closed", |b| { + b.iter(|| receiver.recv_deadline(now).unwrap_err()) + }); + } +} + +fn bench_recv_timeout_zero(c: &mut Criterion) { + let zero = Duration::from_nanos(0); + { + let (_sender, receiver) = oneshot::channel::<u128>(); + c.bench_function("recv_timeout_zero", |b| { + b.iter(|| receiver.recv_timeout(zero).unwrap_err()) + }); + } + { + let (sender, receiver) = oneshot::channel::<u128>(); + mem::drop(sender); + c.bench_function("recv_timeout_zero_closed", |b| { + b.iter(|| receiver.recv_timeout(zero).unwrap_err()) + }); + } +} diff --git a/third_party/rust/oneshot-uniffi/check_mem_leaks.sh b/third_party/rust/oneshot-uniffi/check_mem_leaks.sh new file mode 100755 index 0000000000..5a10835a0f --- /dev/null +++ b/third_party/rust/oneshot-uniffi/check_mem_leaks.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -eu + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd "$SCRIPT_DIR" + +for example_path in examples/*.rs; do + example_filename=$(basename -- $example_path) + example=${example_filename%.*} + echo $example + cargo valgrind run --example "$example" +done diff --git a/third_party/rust/oneshot-uniffi/examples/recv_before_send.rs b/third_party/rust/oneshot-uniffi/examples/recv_before_send.rs new file mode 100644 index 0000000000..2eda3dd610 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/recv_before_send.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv(), Ok(9)); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/third_party/rust/oneshot-uniffi/examples/recv_before_send_then_drop_sender.rs b/third_party/rust/oneshot-uniffi/examples/recv_before_send_then_drop_sender.rs new file mode 100644 index 0000000000..aea7d66b90 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/recv_before_send_then_drop_sender.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel::<u128>(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + std::mem::drop(sender); + }); + assert!(receiver.recv().is_err()); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/third_party/rust/oneshot-uniffi/examples/recv_ref_before_send.rs b/third_party/rust/oneshot-uniffi/examples/recv_ref_before_send.rs new file mode 100644 index 0000000000..6ed74ddfca --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/recv_ref_before_send.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv_ref(), Ok(9)); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/third_party/rust/oneshot-uniffi/examples/recv_ref_before_send_then_drop_sender.rs b/third_party/rust/oneshot-uniffi/examples/recv_ref_before_send_then_drop_sender.rs new file mode 100644 index 0000000000..75ff3d6006 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/recv_ref_before_send_then_drop_sender.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel::<u128>(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + std::mem::drop(sender); + }); + assert!(receiver.recv_ref().is_err()); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/third_party/rust/oneshot-uniffi/examples/recv_timeout_before_send.rs b/third_party/rust/oneshot-uniffi/examples/recv_timeout_before_send.rs new file mode 100644 index 0000000000..85a2ac88be --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/recv_timeout_before_send.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv_timeout(Duration::from_millis(100)), Ok(9)); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/third_party/rust/oneshot-uniffi/examples/recv_timeout_before_send_then_drop_sender.rs b/third_party/rust/oneshot-uniffi/examples/recv_timeout_before_send_then_drop_sender.rs new file mode 100644 index 0000000000..32c31fcd9c --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/recv_timeout_before_send_then_drop_sender.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel::<u128>(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + std::mem::drop(sender); + }); + assert!(receiver.recv_timeout(Duration::from_millis(100)).is_err()); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/third_party/rust/oneshot-uniffi/examples/recv_with_dropped_sender.rs b/third_party/rust/oneshot-uniffi/examples/recv_with_dropped_sender.rs new file mode 100644 index 0000000000..f7a7171e1b --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/recv_with_dropped_sender.rs @@ -0,0 +1,11 @@ +#[cfg(feature = "std")] +fn main() { + let (sender, receiver) = oneshot::channel::<u128>(); + std::mem::drop(sender); + receiver.recv().unwrap_err(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/third_party/rust/oneshot-uniffi/examples/send_before_recv.rs b/third_party/rust/oneshot-uniffi/examples/send_before_recv.rs new file mode 100644 index 0000000000..c31ba658d3 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/send_before_recv.rs @@ -0,0 +1,11 @@ +#[cfg(feature = "std")] +fn main() { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + assert_eq!(receiver.recv(), Ok(19i128)); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/third_party/rust/oneshot-uniffi/examples/send_then_drop_receiver.rs b/third_party/rust/oneshot-uniffi/examples/send_then_drop_receiver.rs new file mode 100644 index 0000000000..941c508d5b --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/send_then_drop_receiver.rs @@ -0,0 +1,7 @@ +use std::mem; + +fn main() { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + mem::drop(receiver); +} diff --git a/third_party/rust/oneshot-uniffi/examples/send_with_dropped_receiver.rs b/third_party/rust/oneshot-uniffi/examples/send_with_dropped_receiver.rs new file mode 100644 index 0000000000..19bfa385f0 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/examples/send_with_dropped_receiver.rs @@ -0,0 +1,8 @@ +use std::mem; + +fn main() { + let (sender, receiver) = oneshot::channel(); + mem::drop(receiver); + let send_error = sender.send(5u128).unwrap_err(); + assert_eq!(send_error.into_inner(), 5); +} diff --git a/third_party/rust/oneshot-uniffi/src/errors.rs b/third_party/rust/oneshot-uniffi/src/errors.rs new file mode 100644 index 0000000000..afc48acd03 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/src/errors.rs @@ -0,0 +1,144 @@ +use super::{dealloc, Channel}; +use core::fmt; +use core::mem; +use core::ptr::NonNull; + +/// An error returned when trying to send on a closed channel. Returned from +/// [`Sender::send`] if the corresponding [`Receiver`] has already been dropped. +/// +/// The message that could not be sent can be retreived again with [`SendError::into_inner`]. +pub struct SendError<T> { + channel_ptr: NonNull<Channel<T>>, +} + +unsafe impl<T: Send> Send for SendError<T> {} +unsafe impl<T: Sync> Sync for SendError<T> {} + +impl<T> SendError<T> { + /// # Safety + /// + /// By calling this function, the caller semantically transfers ownership of the + /// channel's resources to the created `SendError`. Thus the caller must ensure that the + /// pointer is not used in a way which would violate this ownership transfer. Moreover, + /// the caller must assert that the channel contains a valid, initialized message. + pub(crate) const unsafe fn new(channel_ptr: NonNull<Channel<T>>) -> Self { + Self { channel_ptr } + } + + /// Consumes the error and returns the message that failed to be sent. + #[inline] + pub fn into_inner(self) -> T { + let channel_ptr = self.channel_ptr; + + // Don't run destructor if we consumed ourselves. Freeing happens here. + mem::forget(self); + + // SAFETY: we have ownership of the channel + let channel: &Channel<T> = unsafe { channel_ptr.as_ref() }; + + // SAFETY: we know that the message is initialized according to the safety requirements of + // `new` + let message = unsafe { channel.take_message() }; + + // SAFETY: we own the channel + unsafe { dealloc(channel_ptr) }; + + message + } + + /// Get a reference to the message that failed to be sent. + #[inline] + pub fn as_inner(&self) -> &T { + unsafe { self.channel_ptr.as_ref().message().assume_init_ref() } + } +} + +impl<T> Drop for SendError<T> { + fn drop(&mut self) { + // SAFETY: we have ownership of the channel and require that the message is initialized + // upon construction + unsafe { + self.channel_ptr.as_ref().drop_message(); + dealloc(self.channel_ptr); + } + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "sending on a closed channel".fmt(f) + } +} + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SendError<{}>(_)", stringify!(T)) + } +} + +#[cfg(feature = "std")] +impl<T> std::error::Error for SendError<T> {} + +/// An error returned from the indefinitely blocking recv functions on a [`Receiver`]. +/// +/// The recv operation can only fail if the corresponding [`Sender`] was dropped before sending +/// any message. Or if a message has already been sent and received on the channel. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub struct RecvError; + +impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "receiving on a closed channel".fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for RecvError {} + +/// An error returned when trying a non blocking receive on a [`Receiver`]. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum TryRecvError { + /// The channel is still open, but there was no message present in it. + Empty, + + /// The channel is closed. Either the sender was dropped before sending any message, or the + /// message has already been extracted from the receiver. + Disconnected, +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let msg = match self { + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", + }; + msg.fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for TryRecvError {} + +/// An error returned when trying a time limited blocking receive on a [`Receiver`]. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum RecvTimeoutError { + /// No message arrived on the channel before the timeout was reached. The channel is still open. + Timeout, + + /// The channel is closed. Either the sender was dropped before sending any message, or the + /// message has already been extracted from the receiver. + Disconnected, +} + +impl fmt::Display for RecvTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let msg = match self { + RecvTimeoutError::Timeout => "timed out waiting on channel", + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", + }; + msg.fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for RecvTimeoutError {} diff --git a/third_party/rust/oneshot-uniffi/src/lib.rs b/third_party/rust/oneshot-uniffi/src/lib.rs new file mode 100644 index 0000000000..94bb35d12a --- /dev/null +++ b/third_party/rust/oneshot-uniffi/src/lib.rs @@ -0,0 +1,1193 @@ +//! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance +//! can only transport a single message. This has a few nice outcomes. One thing is that +//! the implementation can be very efficient, utilizing the knowledge that there will +//! only be one message. But more importantly, it allows the API to be expressed in such +//! a way that certain edge cases that you don't want to care about when only sending a +//! single message on a channel does not exist. For example: The sender can't be copied +//! or cloned, and the send method takes ownership and consumes the sender. +//! So you are guaranteed, at the type level, that there can only be one message sent. +//! +//! The sender's send method is non-blocking, and potentially lock- and wait-free. +//! See documentation on [Sender::send] for situations where it might not be fully wait-free. +//! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time +//! limited thread blocking receive operations. The receiver also implements `Future` and +//! supports asynchronously awaiting the message. +//! +//! +//! # Examples +//! +//! This example sets up a background worker that processes requests coming in on a standard +//! mpsc channel and replies on a oneshot channel provided with each request. The worker can +//! be interacted with both from sync and async contexts since the oneshot receiver +//! can receive both blocking and async. +//! +//! ```rust +//! use std::sync::mpsc; +//! use std::thread; +//! use std::time::Duration; +//! +//! type Request = String; +//! +//! // Starts a background thread performing some computation on requests sent to it. +//! // Delivers the response back over a oneshot channel. +//! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> { +//! let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>(); +//! thread::spawn(move || { +//! for (request_data, response_sender) in request_receiver.iter() { +//! let compute_operation = || request_data.len(); +//! let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel +//! } +//! }); +//! request_sender +//! } +//! +//! let processor = spawn_processing_thread(); +//! +//! // If compiled with `std` the library can receive messages with timeout on regular threads +//! #[cfg(feature = "std")] { +//! let (response_sender, response_receiver) = oneshot::channel(); +//! let request = Request::from("data from sync thread"); +//! +//! processor.send((request, response_sender)).expect("Processor down"); +//! match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel +//! Ok(result) => println!("Processor returned {}", result), +//! Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"), +//! Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"), +//! } +//! } +//! +//! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context +//! #[cfg(feature = "async")] { +//! tokio::runtime::Runtime::new() +//! .unwrap() +//! .block_on(async move { +//! let (response_sender, response_receiver) = oneshot::channel(); +//! let request = Request::from("data from sync thread"); +//! +//! processor.send((request, response_sender)).expect("Processor down"); +//! match response_receiver.await { // <- Receive on the oneshot channel asynchronously +//! Ok(result) => println!("Processor returned {}", result), +//! Err(_e) => panic!("Processor exited"), +//! } +//! }); +//! } +//! ``` +//! +//! # Sync vs async +//! +//! The main motivation for writing this library was that there were no (known to me) channel +//! implementations allowing you to seamlessly send messages between a normal thread and an async +//! task, or the other way around. If message passing is the way you are communicating, of course +//! that should work smoothly between the sync and async parts of the program! +//! +//! This library achieves that by having a fast and cheap send operation that can +//! be used in both sync threads and async tasks. The receiver has both thread blocking +//! receive methods for synchronous usage, and implements `Future` for asynchronous usage. +//! +//! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on +//! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should +//! be possible to use this library with any executor. +//! + +// # Implementation description +// +// When a channel is created via the channel function, it creates a single heap allocation +// containing: +// * A one byte atomic integer that represents the current channel state, +// * Uninitialized memory to fit the message, +// * Uninitialized memory to fit the waker that can wake the receiving task or thread up. +// +// The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1]. +// So with all features enabled (the default) each channel allocates 25 bytes plus the size of the +// message, plus any padding needed to get correct memory alignment. +// +// The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint +// to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to +// be consumed or dropped signal via the state that it is gone. And the second one see this and +// frees the memory. +// +// ## Footnotes +// +// [1]: Mind that the waker only takes zero bytes when all features are disabled, making it +// impossible to *wait* for the message. `try_recv` the only available method in this scenario. + +#![deny(rust_2018_idioms)] +#![cfg_attr(not(feature = "std"), no_std)] + +#[cfg(not(loom))] +extern crate alloc; + +use core::{ + marker::PhantomData, + mem::{self, MaybeUninit}, + ptr::{self, NonNull}, +}; + +#[cfg(not(loom))] +use core::{ + cell::UnsafeCell, + sync::atomic::{fence, AtomicU8, Ordering::*}, +}; +#[cfg(loom)] +use loom::{ + cell::UnsafeCell, + sync::atomic::{fence, AtomicU8, Ordering::*}, +}; + +#[cfg(all(feature = "async", not(loom)))] +use core::hint; +#[cfg(all(feature = "async", loom))] +use loom::hint; + +#[cfg(feature = "async")] +use core::{ + pin::Pin, + task::{self, Poll}, +}; +#[cfg(feature = "std")] +use std::time::{Duration, Instant}; + +#[cfg(feature = "std")] +mod thread { + #[cfg(not(loom))] + pub use std::thread::{current, park, park_timeout, yield_now, Thread}; + + #[cfg(loom)] + pub use loom::thread::{current, park, yield_now, Thread}; + + // loom does not support parking with a timeout. So we just + // yield. This means that the "park" will "spuriously" wake up + // way too early. But the code should properly handle this. + // One thing to note is that very short timeouts are needed + // when using loom, since otherwise the looping will cause + // an overflow in loom. + #[cfg(loom)] + pub fn park_timeout(_timeout: std::time::Duration) { + loom::thread::yield_now() + } +} + +#[cfg(loom)] +mod loombox; +#[cfg(not(loom))] +use alloc::boxed::Box; +#[cfg(loom)] +use loombox::Box; + +mod errors; +pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError}; + +/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`]. +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + // Allocate the channel on the heap and get the pointer. + // The last endpoint of the channel to be alive is responsible for freeing the channel + // and dropping any object that might have been written to it. + + let channel_ptr = Box::into_raw(Box::new(Channel::new())); + + // SAFETY: `channel_ptr` came from a Box and thus is not null + let channel_ptr = unsafe { NonNull::new_unchecked(channel_ptr) }; + + ( + Sender { + channel_ptr, + _invariant: PhantomData, + }, + Receiver { channel_ptr }, + ) +} + +#[derive(Debug)] +pub struct Sender<T> { + channel_ptr: NonNull<Channel<T>>, + // In reality we want contravariance, however we can't obtain that. + // + // Consider the following scenario: + // ``` + // let (mut tx, rx) = channel::<&'short u8>(); + // let (tx2, rx2) = channel::<&'long u8>(); + // + // tx = tx2; + // + // // Pretend short_ref is some &'short u8 + // tx.send(short_ref).unwrap(); + // let long_ref = rx2.recv().unwrap(); + // ``` + // + // If this type were covariant then we could safely extend lifetimes, which is not okay. + // Hence, we enforce invariance. + _invariant: PhantomData<fn(T) -> T>, +} + +#[derive(Debug)] +pub struct Receiver<T> { + // Covariance is the right choice here. Consider the example presented in Sender, and you'll + // see that if we replaced `rx` instead then we would get the expected behavior + channel_ptr: NonNull<Channel<T>>, +} + +unsafe impl<T: Send> Send for Sender<T> {} +unsafe impl<T: Send> Send for Receiver<T> {} +impl<T> Unpin for Receiver<T> {} + +impl<T> Sender<T> { + /// Sends `message` over the channel to the corresponding [`Receiver`]. + /// + /// Returns an error if the receiver has already been dropped. The message can + /// be extracted from the error. + /// + /// This method is lock-free and wait-free when sending on a channel that the + /// receiver is currently not receiving on. If the receiver is receiving during the send + /// operation this method includes waking up the thread/task. Unparking a thread involves + /// a mutex in Rust's standard library at the time of writing this. + /// How lock-free waking up an async task is + /// depends on your executor. If this method returns a `SendError`, please mind that dropping + /// the error involves running any drop implementation on the message type, and freeing the + /// channel's heap allocation, which might or might not be lock-free. + pub fn send(self, message: T) -> Result<(), SendError<T>> { + let channel_ptr = self.channel_ptr; + + // Don't run our Drop implementation if send was called, any cleanup now happens here + mem::forget(self); + + // SAFETY: The channel exists on the heap for the entire duration of this method and we + // only ever acquire shared references to it. Note that if the receiver disconnects it + // does not free the channel. + let channel = unsafe { channel_ptr.as_ref() }; + + // Write the message into the channel on the heap. + // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE + // state, and since we're responsible for setting that state, we can guarantee that we have + // exclusive access to this memory location to perform this write. + unsafe { channel.write_message(message) }; + + // Set the state to signal there is a message on the channel. + // ORDERING: we use release ordering to ensure the write of the message is visible to the + // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state, + // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization + // independent of this operation. + // + // EMPTY + 1 = MESSAGE + // RECEIVING + 1 = UNPARKING + // DISCONNECTED + 1 = invalid, however this state is never observed + match channel.state.fetch_add(1, Release) { + // The receiver is alive and has not started waiting. Send done. + EMPTY => Ok(()), + // The receiver is waiting. Wake it up so it can return the message. + RECEIVING => { + // ORDERING: Synchronizes with the write of the waker to memory, and prevents the + // taking of the waker from being ordered before this operation. + fence(Acquire); + + // Take the waker, but critically do not unpark it. If we unparked now, then the + // receiving thread could still observe the UNPARKING state and re-park, meaning + // that after we change to the MESSAGE state, it would remain parked indefinitely + // or until a spurious wakeup. + // SAFETY: at this point we are in the UNPARKING state, and the receiving thread + // does not access the waker while in this state, nor does it free the channel + // allocation in this state. + let waker = unsafe { channel.take_waker() }; + + // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load + // in the receiving thread, ensuring that both our read of the waker and write of + // the message happen-before the taking of the message and freeing of the channel. + // Furthermore, we need acquire ordering to ensure the unparking of the receiver + // happens after the channel state is updated. + channel.state.swap(MESSAGE, AcqRel); + + // Note: it is possible that between the store above and this statement that + // the receiving thread is spuriously unparked, takes the message, and frees + // the channel allocation. However, we took ownership of the channel out of + // that allocation, and freeing the channel does not drop the waker since the + // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of + // whether or not the receive has completed by this point. + waker.unpark(); + + Ok(()) + } + // The receiver was already dropped. The error is responsible for freeing the channel. + // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so + // we can transfer exclusive ownership of the channel's resources to the error. + // Moreover, since we just placed the message in the channel, the channel contains a + // valid message. + DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }), + _ => unreachable!(), + } + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or + // DISCONNECTED states. If we are in the MESSAGE state, then we called + // mem::forget(self), so we should not be in this function call. If we are in the + // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is + // unreachable, or was dropped and observed that our side was still alive, and thus didn't + // free the channel. + let channel = unsafe { self.channel_ptr.as_ref() }; + + // Set the channel state to disconnected and read what state the receiver was in + // ORDERING: we don't need release ordering here since there are no modifications we + // need to make visible to other thread, and the Err(RECEIVING) branch handles + // synchronization independent of this cmpxchg + // + // EMPTY ^ 001 = DISCONNECTED + // RECEIVING ^ 001 = UNPARKING + // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed + match channel.state.fetch_xor(0b001, Relaxed) { + // The receiver has not started waiting, nor is it dropped. + EMPTY => (), + // The receiver is waiting. Wake it up so it can detect that the channel disconnected. + RECEIVING => { + // See comments in Sender::send + + fence(Acquire); + + let waker = unsafe { channel.take_waker() }; + + // We still need release ordering here to make sure our read of the waker happens + // before this, and acquire ordering to ensure the unparking of the receiver + // happens after this. + channel.state.swap(DISCONNECTED, AcqRel); + + // The Acquire ordering above ensures that the write of the DISCONNECTED state + // happens-before unparking the receiver. + waker.unpark(); + } + // The receiver was already dropped. We are responsible for freeing the channel. + DISCONNECTED => { + // SAFETY: when the receiver switches the state to DISCONNECTED they have received + // the message or will no longer be trying to receive the message, and have + // observed that the sender is still alive, meaning that we're responsible for + // freeing the channel allocation. + unsafe { dealloc(self.channel_ptr) }; + } + _ => unreachable!(), + } + } +} + +impl<T> Receiver<T> { + /// Checks if there is a message in the channel without blocking. Returns: + /// * `Ok(message)` if there was a message in the channel. + /// * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message. + /// * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the + /// message has already been extracted by a previous receive call. + /// + /// If a message is returned, the channel is disconnected and any subsequent receive operation + /// using this receiver will return an error. + /// + /// This method is completely lock-free and wait-free. The only thing it does is an atomic + /// integer load of the channel state. And if there is a message in the channel it additionally + /// performs one atomic integer store and copies the message from the heap to the stack for + /// returning it. + pub fn try_recv(&self) -> Result<T, TryRecvError> { + // SAFETY: The channel will not be freed while this method is still running. + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: we use acquire ordering to synchronize with the store of the message. + match channel.state.load(Acquire) { + MESSAGE => { + // It's okay to break up the load and store since once we're in the message state + // the sender no longer modifies the state + // ORDERING: at this point the sender has done its job and is no longer active, so + // we don't need to make any side effects visible to it + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we are in the MESSAGE state so the message is present + Ok(unsafe { channel.take_message() }) + } + EMPTY => Err(TryRecvError::Empty), + DISCONNECTED => Err(TryRecvError::Disconnected), + #[cfg(feature = "async")] + RECEIVING | UNPARKING => Err(TryRecvError::Empty), + _ => unreachable!(), + } + } + + /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is + /// disconnected. + /// + /// This method will always block the current thread if there is no data available and it is + /// still possible for the message to be sent. Once the message is sent to the corresponding + /// [`Sender`], then this receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while + /// this call is blocking, this call will wake up and return `Err` to indicate that the message + /// can never be received on this channel. + /// + /// If a sent message has already been extracted from this channel this method will return an + /// error. + /// + /// # Panics + /// + /// Panics if called after this receiver has been polled asynchronously. + #[cfg(feature = "std")] + pub fn recv(self) -> Result<T, RecvError> { + // Note that we don't need to worry about changing the state to disconnected or setting the + // state to an invalid value at any point in this function because we take ownership of + // self, and this function does not exit until the message has been received or both side + // of the channel are inactive and cleaned up. + + let channel_ptr = self.channel_ptr; + + // Don't run our Drop implementation if we are receiving consuming ourselves. + mem::forget(self); + + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so channel_ptr is valid + let channel = unsafe { channel_ptr.as_ref() }; + + // ORDERING: we use acquire ordering to synchronize with the write of the message in the + // case that it's available + match channel.state.load(Acquire) { + // The sender is alive but has not sent anything yet. We prepare to park. + EMPTY => { + // Conditionally add a delay here to help the tests trigger the edge cases where + // the sender manages to be dropped or send something before we are able to store + // our waker object in the channel. + #[cfg(oneshot_test_delay)] + std::thread::sleep(std::time::Duration::from_millis(10)); + + // Write our waker instance to the channel. + // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not + // try to access the waker until it sees the state set to RECEIVING below + unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; + + // Switch the state to RECEIVING. We need to do this in one atomic step in case the + // sender disconnected or sent the message while we wrote the waker to memory. We + // don't need to do a compare exchange here however because if the original state + // was not EMPTY, then the sender has either finished sending the message or is + // being dropped, so the RECEIVING state will never be observed after we return. + // ORDERING: we use release ordering so the sender can synchronize with our writing + // of the waker to memory. The individual branches handle any additional + // synchronizaton + match channel.state.swap(RECEIVING, Release) { + // We stored our waker, now we park until the sender has changed the state + EMPTY => loop { + thread::park(); + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender sent the message while we were parked. + MESSAGE => { + // SAFETY: we are in the message state so the message is valid + let message = unsafe { channel.take_message() }; + + // SAFETY: the Sender delegates the responsibility of deallocating + // the channel to us upon sending the message + unsafe { dealloc(channel_ptr) }; + + break Ok(message); + } + // The sender was dropped while we were parked. + DISCONNECTED => { + // SAFETY: the Sender doesn't deallocate the channel allocation in + // its drop implementation if we're receiving + unsafe { dealloc(channel_ptr) }; + + break Err(RecvError); + } + // State did not change, spurious wakeup, park again. + RECEIVING | UNPARKING => (), + _ => unreachable!(), + } + }, + // The sender sent the message while we prepared to park. + MESSAGE => { + // ORDERING: Synchronize with the write of the message. This branch is + // unlikely to be taken, so it's likely more efficient to use a fence here + // instead of AcqRel ordering on the RMW operation + fence(Acquire); + + // SAFETY: we started in the empty state and the sender switched us to the + // message state. This means that it did not take the waker, so we're + // responsible for dropping it. + unsafe { channel.drop_waker() }; + + // SAFETY: we are in the message state so the message is valid + let message = unsafe { channel.take_message() }; + + // SAFETY: the Sender delegates the responsibility of deallocating the + // channel to us upon sending the message + unsafe { dealloc(channel_ptr) }; + + Ok(message) + } + // The sender was dropped before sending anything while we prepared to park. + DISCONNECTED => { + // SAFETY: we started in the empty state and the sender switched us to the + // disconnected state. It does not take the waker when it does this so we + // need to drop it. + unsafe { channel.drop_waker() }; + + // SAFETY: the sender does not deallocate the channel if it switches from + // empty to disconnected so we need to free the allocation + unsafe { dealloc(channel_ptr) }; + + Err(RecvError) + } + _ => unreachable!(), + } + } + // The sender already sent the message. + MESSAGE => { + // SAFETY: we are in the message state so the message is valid + let message = unsafe { channel.take_message() }; + + // SAFETY: we are already in the message state so the sender has been forgotten + // and it's our job to clean up resources + unsafe { dealloc(channel_ptr) }; + + Ok(message) + } + // The sender was dropped before sending anything, or we already received the message. + DISCONNECTED => { + // SAFETY: the sender does not deallocate the channel if it switches from empty to + // disconnected so we need to free the allocation + unsafe { dealloc(channel_ptr) }; + + Err(RecvError) + } + // The receiver must have been `Future::poll`ed prior to this call. + #[cfg(feature = "async")] + RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), + _ => unreachable!(), + } + } + + /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is + /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit + /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver. + /// + /// If a message is returned, the channel is disconnected and any subsequent receive operation + /// using this receiver will return an error. + /// + /// # Panics + /// + /// Panics if called after this receiver has been polled asynchronously. + #[cfg(feature = "std")] + pub fn recv_ref(&self) -> Result<T, RecvError> { + self.start_recv_ref(RecvError, |channel| { + loop { + thread::park(); + + // ORDERING: we use acquire ordering to synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender sent the message while we were parked. + // We take the message and mark the channel disconnected. + MESSAGE => { + // ORDERING: the sender is inactive at this point so we don't need to make + // any reads or writes visible to the sending thread + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we were just in the message state so the message is valid + break Ok(unsafe { channel.take_message() }); + } + // The sender was dropped while we were parked. + DISCONNECTED => break Err(RecvError), + // State did not change, spurious wakeup, park again. + RECEIVING | UNPARKING => (), + _ => unreachable!(), + } + } + }) + } + + /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns: + /// * `Ok(message)` if there was a message in the channel before the timeout was reached. + /// * `Err(Timeout)` if no message arrived on the channel before the timeout was reached. + /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message + /// has already been extracted by a previous receive call. + /// + /// If a message is returned, the channel is disconnected and any subsequent receive operation + /// using this receiver will return an error. + /// + /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point + /// in the future this falls back to an indefinitely blocking receive operation. + /// + /// # Panics + /// + /// Panics if called after this receiver has been polled asynchronously. + #[cfg(feature = "std")] + pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { + match Instant::now().checked_add(timeout) { + Some(deadline) => self.recv_deadline(deadline), + None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected), + } + } + + /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns: + /// * `Ok(message)` if there was a message in the channel before the deadline was reached. + /// * `Err(Timeout)` if no message arrived on the channel before the deadline was reached. + /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message + /// has already been extracted by a previous receive call. + /// + /// If a message is returned, the channel is disconnected and any subsequent receive operation + /// using this receiver will return an error. + /// + /// # Panics + /// + /// Panics if called after this receiver has been polled asynchronously. + #[cfg(feature = "std")] + pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { + /// # Safety + /// + /// If the sender is unparking us after a message send, the message must already have been + /// written to the channel and an acquire memory barrier issued before calling this function + #[cold] + unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> { + loop { + thread::park(); + + // ORDERING: The callee has already synchronized with any message write + match channel.state.load(Relaxed) { + MESSAGE => { + // ORDERING: the sender has been dropped, so this update only + // needs to be visible to us + channel.state.store(DISCONNECTED, Relaxed); + break Ok(channel.take_message()); + } + DISCONNECTED => break Err(RecvTimeoutError::Disconnected), + // The sender is still unparking us. We continue on the empty state here since + // the current implementation eagerly sets the state to EMPTY upon timeout. + EMPTY => (), + _ => unreachable!(), + } + } + } + + self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| { + loop { + match deadline.checked_duration_since(Instant::now()) { + Some(timeout) => { + thread::park_timeout(timeout); + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender sent the message while we were parked. + MESSAGE => { + // ORDERING: the sender has been `mem::forget`-ed so this update + // only needs to be visible to us. + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we either are in the message state or were just in the + // message state + break Ok(unsafe { channel.take_message() }); + } + // The sender was dropped while we were parked. + DISCONNECTED => break Err(RecvTimeoutError::Disconnected), + // State did not change, spurious wakeup, park again. + RECEIVING | UNPARKING => (), + _ => unreachable!(), + } + } + None => { + // ORDERING: synchronize with the write of the message + match channel.state.swap(EMPTY, Acquire) { + // We reached the end of the timeout without receiving a message + RECEIVING => { + // SAFETY: we were in the receiving state and are now in the empty + // state, so the sender has not and will not try to read the waker, + // so we have exclusive access to drop it. + unsafe { channel.drop_waker() }; + + break Err(RecvTimeoutError::Timeout); + } + // The sender sent the message while we were parked. + MESSAGE => { + // Same safety and ordering as the Some branch + + channel.state.store(DISCONNECTED, Relaxed); + break Ok(unsafe { channel.take_message() }); + } + // The sender was dropped while we were parked. + DISCONNECTED => { + // ORDERING: we were originally in the disconnected state meaning + // that the sender is inactive and no longer observing the state, + // so we only need to change it back to DISCONNECTED for if the + // receiver is dropped or a recv* method is called again + channel.state.store(DISCONNECTED, Relaxed); + + break Err(RecvTimeoutError::Disconnected); + } + // The sender sent the message and started unparking us + UNPARKING => { + // We were in the UNPARKING state and are now in the EMPTY state. + // We wait to be properly unparked and to observe if the sender + // sets MESSAGE or DISCONNECTED state. + // SAFETY: The load above has synchronized with any message write. + break unsafe { wait_for_unpark(channel) }; + } + _ => unreachable!(), + } + } + } + } + }) + } + + /// Begins the process of receiving on the channel by reference. If the message is already + /// ready, or the sender has disconnected, then this function will return the appropriate + /// Result immediately. Otherwise, it will write the waker to memory, check to see if the + /// sender has finished or disconnected again, and then will call `finish`. `finish` is + /// thus responsible for cleaning up the channel's resources appropriately before it returns, + /// such as destroying the waker, for instance. + #[cfg(feature = "std")] + #[inline] + fn start_recv_ref<E>( + &self, + disconnected_error: E, + finish: impl FnOnce(&Channel<T>) -> Result<T, E>, + ) -> Result<T, E> { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender is alive but has not sent anything yet. We prepare to park. + EMPTY => { + // Conditionally add a delay here to help the tests trigger the edge cases where + // the sender manages to be dropped or send something before we are able to store + // our waker object in the channel. + #[cfg(oneshot_test_delay)] + std::thread::sleep(std::time::Duration::from_millis(10)); + + // Write our waker instance to the channel. + // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not + // try to access the waker until it sees the state set to RECEIVING below + unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; + + // ORDERING: we use release ordering on success so the sender can synchronize with + // our write of the waker. We use relaxed ordering on failure since the sender does + // not need to synchronize with our write and the individual match arms handle any + // additional synchronization + match channel + .state + .compare_exchange(EMPTY, RECEIVING, Release, Relaxed) + { + // We stored our waker, now we delegate to the callback to finish the receive + // operation + Ok(_) => finish(channel), + // The sender sent the message while we prepared to finish + Err(MESSAGE) => { + // See comments in `recv` for ordering and safety + + fence(Acquire); + + unsafe { channel.drop_waker() }; + + // ORDERING: the sender has been `mem::forget`-ed so this update only + // needs to be visible to us + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: The MESSAGE state tells us there is a correctly initialized + // message + Ok(unsafe { channel.take_message() }) + } + // The sender was dropped before sending anything while we prepared to park. + Err(DISCONNECTED) => { + // See comments in `recv` for safety + unsafe { channel.drop_waker() }; + Err(disconnected_error) + } + _ => unreachable!(), + } + } + // The sender sent the message. We take the message and mark the channel disconnected. + MESSAGE => { + // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be + // visible to us + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we are in the message state so the message is valid + Ok(unsafe { channel.take_message() }) + } + // The sender was dropped before sending anything, or we already received the message. + DISCONNECTED => Err(disconnected_error), + // The receiver must have been `Future::poll`ed prior to this call. + #[cfg(feature = "async")] + RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), + _ => unreachable!(), + } + } +} + +#[cfg(feature = "async")] +impl<T> core::future::Future for Receiver<T> { + type Output = Result<T, RecvError>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: we use acquire ordering to synchronize with the store of the message. + match channel.state.load(Acquire) { + // The sender is alive but has not sent anything yet. + EMPTY => { + // SAFETY: We can't be in the forbidden states, and no waker in the channel. + unsafe { channel.write_async_waker(cx) } + } + // We were polled again while waiting for the sender. Replace the waker with the new one. + RECEIVING => { + // ORDERING: We use relaxed ordering on both success and failure since we have not + // written anything above that must be released, and the individual match arms + // handle any additional synchronization. + match channel + .state + .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed) + { + // We successfully changed the state back to EMPTY. Replace the waker. + // This is the most likely branch to be taken, which is why we don't use any + // memory barriers in the compare_exchange above. + Ok(_) => { + // SAFETY: We wrote the waker in a previous call to poll. We do not need + // a memory barrier since the previous write here was by ourselves. + unsafe { channel.drop_waker() }; + // SAFETY: We can't be in the forbidden states, and no waker in the channel. + unsafe { channel.write_async_waker(cx) } + } + // The sender sent the message while we prepared to replace the waker. + // We take the message and mark the channel disconnected. + // The sender has already taken the waker. + Err(MESSAGE) => { + // ORDERING: Synchronize with the write of the message. This branch is + // unlikely to be taken. + channel.state.swap(DISCONNECTED, Acquire); + // SAFETY: The state tells us the sender has initialized the message. + Poll::Ready(Ok(unsafe { channel.take_message() })) + } + // The sender was dropped before sending anything while we prepared to park. + // The sender has taken the waker already. + Err(DISCONNECTED) => Poll::Ready(Err(RecvError)), + // The sender is currently waking us up. + Err(UNPARKING) => { + // We can't trust that the old waker that the sender has access to + // is honored by the async runtime at this point. So we wake ourselves + // up to get polled instantly again. + cx.waker().wake_by_ref(); + Poll::Pending + } + _ => unreachable!(), + } + } + // The sender sent the message. + MESSAGE => { + // ORDERING: the sender has been dropped so this update only needs to be + // visible to us + channel.state.store(DISCONNECTED, Relaxed); + Poll::Ready(Ok(unsafe { channel.take_message() })) + } + // The sender was dropped before sending anything, or we already received the message. + DISCONNECTED => Poll::Ready(Err(RecvError)), + // The sender has observed the RECEIVING state and is currently reading the waker from + // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED + // state. We busy loop here since we know the sender is done very soon. + UNPARKING => loop { + hint::spin_loop(); + // ORDERING: The load above has already synchronized with the write of the message. + match channel.state.load(Relaxed) { + MESSAGE => { + // ORDERING: the sender has been dropped, so this update only + // needs to be visible to us + channel.state.store(DISCONNECTED, Relaxed); + // SAFETY: We observed the MESSAGE state + break Poll::Ready(Ok(unsafe { channel.take_message() })); + } + DISCONNECTED => break Poll::Ready(Err(RecvError)), + UNPARKING => (), + _ => unreachable!(), + } + }, + _ => unreachable!(), + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + // SAFETY: since the receiving side is still alive the sender would have observed that and + // left deallocating the channel allocation to us. + let channel = unsafe { self.channel_ptr.as_ref() }; + + // Set the channel state to disconnected and read what state the receiver was in + match channel.state.swap(DISCONNECTED, Acquire) { + // The sender has not sent anything, nor is it dropped. + EMPTY => (), + // The sender already sent something. We must drop it, and free the channel. + MESSAGE => { + // SAFETY: we are in the message state so the message is initialized + unsafe { channel.drop_message() }; + + // SAFETY: see safety comment at top of function + unsafe { dealloc(self.channel_ptr) }; + } + // The receiver has been polled. + #[cfg(feature = "async")] + RECEIVING => { + // TODO: figure this out when async is fixed + unsafe { channel.drop_waker() }; + } + // The sender was already dropped. We are responsible for freeing the channel. + DISCONNECTED => { + // SAFETY: see safety comment at top of function + unsafe { dealloc(self.channel_ptr) }; + } + _ => unreachable!(), + } + } +} + +/// All the values that the `Channel::state` field can have during the lifetime of a channel. +mod states { + // These values are very explicitly chosen so that we can replace some cmpxchg calls with + // fetch_* calls. + + /// The initial channel state. Active while both endpoints are still alive, no message has been + /// sent, and the receiver is not receiving. + pub const EMPTY: u8 = 0b011; + /// A message has been sent to the channel, but the receiver has not yet read it. + pub const MESSAGE: u8 = 0b100; + /// No message has yet been sent on the channel, but the receiver is currently receiving. + pub const RECEIVING: u8 = 0b000; + #[cfg(any(feature = "std", feature = "async"))] + pub const UNPARKING: u8 = 0b001; + /// The channel has been closed. This means that either the sender or receiver has been dropped, + /// or the message sent to the channel has already been received. Since this is a oneshot + /// channel, it is disconnected after the one message it is supposed to hold has been + /// transmitted. + pub const DISCONNECTED: u8 = 0b010; +} +use states::*; + +/// Internal channel data structure structure. the `channel` method allocates and puts one instance +/// of this struct on the heap for each oneshot channel instance. The struct holds: +/// * The current state of the channel. +/// * The message in the channel. This memory is uninitialized until the message is sent. +/// * The waker instance for the thread or task that is currently receiving on this channel. +/// This memory is uninitialized until the receiver starts receiving. +struct Channel<T> { + state: AtomicU8, + message: UnsafeCell<MaybeUninit<T>>, + waker: UnsafeCell<MaybeUninit<ReceiverWaker>>, +} + +impl<T> Channel<T> { + pub fn new() -> Self { + Self { + state: AtomicU8::new(EMPTY), + message: UnsafeCell::new(MaybeUninit::uninit()), + waker: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + #[inline(always)] + unsafe fn message(&self) -> &MaybeUninit<T> { + #[cfg(loom)] + { + self.message.with(|ptr| &*ptr) + } + + #[cfg(not(loom))] + { + &*self.message.get() + } + } + + #[inline(always)] + unsafe fn with_message_mut<F>(&self, op: F) + where + F: FnOnce(&mut MaybeUninit<T>), + { + #[cfg(loom)] + { + self.message.with_mut(|ptr| op(&mut *ptr)) + } + + #[cfg(not(loom))] + { + op(&mut *self.message.get()) + } + } + + #[inline(always)] + #[cfg(any(feature = "std", feature = "async"))] + unsafe fn with_waker_mut<F>(&self, op: F) + where + F: FnOnce(&mut MaybeUninit<ReceiverWaker>), + { + #[cfg(loom)] + { + self.waker.with_mut(|ptr| op(&mut *ptr)) + } + + #[cfg(not(loom))] + { + op(&mut *self.waker.get()) + } + } + + #[inline(always)] + unsafe fn write_message(&self, message: T) { + self.with_message_mut(|slot| slot.as_mut_ptr().write(message)); + } + + #[inline(always)] + unsafe fn take_message(&self) -> T { + #[cfg(loom)] + { + self.message.with(|ptr| ptr::read(ptr)).assume_init() + } + + #[cfg(not(loom))] + { + ptr::read(self.message.get()).assume_init() + } + } + + #[inline(always)] + unsafe fn drop_message(&self) { + self.with_message_mut(|slot| slot.assume_init_drop()); + } + + #[cfg(any(feature = "std", feature = "async"))] + #[inline(always)] + unsafe fn write_waker(&self, waker: ReceiverWaker) { + self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker)); + } + + #[inline(always)] + unsafe fn take_waker(&self) -> ReceiverWaker { + #[cfg(loom)] + { + self.waker.with(|ptr| ptr::read(ptr)).assume_init() + } + + #[cfg(not(loom))] + { + ptr::read(self.waker.get()).assume_init() + } + } + + #[cfg(any(feature = "std", feature = "async"))] + #[inline(always)] + unsafe fn drop_waker(&self) { + self.with_waker_mut(|slot| slot.assume_init_drop()); + } + + /// # Safety + /// + /// * `Channel::waker` must not have a waker stored in it when calling this method. + /// * Channel state must not be RECEIVING or UNPARKING when calling this method. + #[cfg(feature = "async")] + unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> { + // Write our thread instance to the channel. + // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not + // try to access the waker until it sees the state set to RECEIVING below + self.write_waker(ReceiverWaker::task_waker(cx)); + + // ORDERING: we use release ordering on success so the sender can synchronize with + // our write of the waker. We use relaxed ordering on failure since the sender does + // not need to synchronize with our write and the individual match arms handle any + // additional synchronization + match self + .state + .compare_exchange(EMPTY, RECEIVING, Release, Relaxed) + { + // We stored our waker, now we return and let the sender wake us up + Ok(_) => Poll::Pending, + // The sender sent the message while we prepared to park. + // We take the message and mark the channel disconnected. + Err(MESSAGE) => { + // ORDERING: Synchronize with the write of the message. This branch is + // unlikely to be taken, so it's likely more efficient to use a fence here + // instead of AcqRel ordering on the compare_exchange operation + fence(Acquire); + + // SAFETY: we started in the EMPTY state and the sender switched us to the + // MESSAGE state. This means that it did not take the waker, so we're + // responsible for dropping it. + self.drop_waker(); + + // ORDERING: sender does not exist, so this update only needs to be visible to us + self.state.store(DISCONNECTED, Relaxed); + + // SAFETY: The MESSAGE state tells us there is a correctly initialized message + Poll::Ready(Ok(self.take_message())) + } + // The sender was dropped before sending anything while we prepared to park. + Err(DISCONNECTED) => { + // SAFETY: we started in the EMPTY state and the sender switched us to the + // DISCONNECTED state. This means that it did not take the waker, so we're + // responsible for dropping it. + self.drop_waker(); + Poll::Ready(Err(RecvError)) + } + _ => unreachable!(), + } + } +} + +enum ReceiverWaker { + /// The receiver is waiting synchronously. Its thread is parked. + #[cfg(feature = "std")] + Thread(thread::Thread), + /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`. + #[cfg(feature = "async")] + Task(task::Waker), + /// A little hack to not make this enum an uninhibitable type when no features are enabled. + #[cfg(not(any(feature = "async", feature = "std")))] + _Uninhabited, +} + +impl ReceiverWaker { + #[cfg(feature = "std")] + pub fn current_thread() -> Self { + Self::Thread(thread::current()) + } + + #[cfg(feature = "async")] + pub fn task_waker(cx: &task::Context<'_>) -> Self { + Self::Task(cx.waker().clone()) + } + + pub fn unpark(self) { + match self { + #[cfg(feature = "std")] + ReceiverWaker::Thread(thread) => thread.unpark(), + #[cfg(feature = "async")] + ReceiverWaker::Task(waker) => waker.wake(), + #[cfg(not(any(feature = "async", feature = "std")))] + ReceiverWaker::_Uninhabited => unreachable!(), + } + } +} + +#[cfg(not(loom))] +#[test] +fn receiver_waker_size() { + let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) { + (false, false) => 0, + (false, true) => 16, + (true, false) => 8, + (true, true) => 24, + }; + assert_eq!(mem::size_of::<ReceiverWaker>(), expected); +} + +#[cfg(all(feature = "std", feature = "async"))] +const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str = + "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled"; + +#[inline] +pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) { + drop(Box::from_raw(channel.as_ptr())) +} diff --git a/third_party/rust/oneshot-uniffi/src/loombox.rs b/third_party/rust/oneshot-uniffi/src/loombox.rs new file mode 100644 index 0000000000..615db30174 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/src/loombox.rs @@ -0,0 +1,151 @@ +use core::{borrow, fmt, hash, mem, ptr}; +use loom::alloc; + +pub struct Box<T: ?Sized> { + ptr: *mut T, +} + +impl<T> Box<T> { + pub fn new(value: T) -> Self { + let layout = alloc::Layout::new::<T>(); + let ptr = unsafe { alloc::alloc(layout) } as *mut T; + unsafe { ptr::write(ptr, value) }; + Self { ptr } + } +} + +impl<T: ?Sized> Box<T> { + #[inline] + pub fn into_raw(b: Box<T>) -> *mut T { + let ptr = b.ptr; + mem::forget(b); + ptr + } + + pub const unsafe fn from_raw(ptr: *mut T) -> Box<T> { + Self { ptr } + } +} + +impl<T: ?Sized> Drop for Box<T> { + fn drop(&mut self) { + unsafe { + let size = mem::size_of_val(&*self.ptr); + let align = mem::align_of_val(&*self.ptr); + let layout = alloc::Layout::from_size_align(size, align).unwrap(); + ptr::drop_in_place(self.ptr); + alloc::dealloc(self.ptr as *mut u8, layout); + } + } +} + +unsafe impl<T: Send> Send for Box<T> {} +unsafe impl<T: Sync> Sync for Box<T> {} + +impl<T: ?Sized> core::ops::Deref for Box<T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.ptr } + } +} + +impl<T: ?Sized> core::ops::DerefMut for Box<T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.ptr } + } +} + +impl<T: ?Sized> borrow::Borrow<T> for Box<T> { + fn borrow(&self) -> &T { + &**self + } +} + +impl<T: ?Sized> borrow::BorrowMut<T> for Box<T> { + fn borrow_mut(&mut self) -> &mut T { + &mut **self + } +} + +impl<T: ?Sized> AsRef<T> for Box<T> { + fn as_ref(&self) -> &T { + &**self + } +} + +impl<T: ?Sized> AsMut<T> for Box<T> { + fn as_mut(&mut self) -> &mut T { + &mut **self + } +} + +impl<T: fmt::Display + ?Sized> fmt::Display for Box<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<T: fmt::Debug + ?Sized> fmt::Debug for Box<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<T: Clone> Clone for Box<T> { + #[inline] + fn clone(&self) -> Box<T> { + Self::new(self.as_ref().clone()) + } +} + +impl<T: ?Sized + PartialEq> PartialEq for Box<T> { + #[inline] + fn eq(&self, other: &Box<T>) -> bool { + PartialEq::eq(&**self, &**other) + } + + #[allow(clippy::partialeq_ne_impl)] + #[inline] + fn ne(&self, other: &Box<T>) -> bool { + PartialEq::ne(&**self, &**other) + } +} + +impl<T: ?Sized + Eq> Eq for Box<T> {} + +impl<T: ?Sized + PartialOrd> PartialOrd for Box<T> { + #[inline] + fn partial_cmp(&self, other: &Box<T>) -> Option<core::cmp::Ordering> { + PartialOrd::partial_cmp(&**self, &**other) + } + #[inline] + fn lt(&self, other: &Box<T>) -> bool { + PartialOrd::lt(&**self, &**other) + } + #[inline] + fn le(&self, other: &Box<T>) -> bool { + PartialOrd::le(&**self, &**other) + } + #[inline] + fn ge(&self, other: &Box<T>) -> bool { + PartialOrd::ge(&**self, &**other) + } + #[inline] + fn gt(&self, other: &Box<T>) -> bool { + PartialOrd::gt(&**self, &**other) + } +} + +impl<T: ?Sized + Ord> Ord for Box<T> { + #[inline] + fn cmp(&self, other: &Box<T>) -> core::cmp::Ordering { + Ord::cmp(&**self, &**other) + } +} + +impl<T: ?Sized + hash::Hash> hash::Hash for Box<T> { + fn hash<H: hash::Hasher>(&self, state: &mut H) { + (**self).hash(state); + } +} diff --git a/third_party/rust/oneshot-uniffi/tests/assert_mem.rs b/third_party/rust/oneshot-uniffi/tests/assert_mem.rs new file mode 100644 index 0000000000..a993ad715a --- /dev/null +++ b/third_party/rust/oneshot-uniffi/tests/assert_mem.rs @@ -0,0 +1,37 @@ +use oneshot::{Receiver, Sender}; +use std::mem; + +/// Just sanity check that both channel endpoints stay the size of a single pointer. +#[test] +fn channel_endpoints_single_pointer() { + const PTR_SIZE: usize = mem::size_of::<*const ()>(); + + assert_eq!(mem::size_of::<Sender<()>>(), PTR_SIZE); + assert_eq!(mem::size_of::<Receiver<()>>(), PTR_SIZE); + + assert_eq!(mem::size_of::<Sender<u8>>(), PTR_SIZE); + assert_eq!(mem::size_of::<Receiver<u8>>(), PTR_SIZE); + + assert_eq!(mem::size_of::<Sender<[u8; 1024]>>(), PTR_SIZE); + assert_eq!(mem::size_of::<Receiver<[u8; 1024]>>(), PTR_SIZE); + + assert_eq!(mem::size_of::<Option<Sender<[u8; 1024]>>>(), PTR_SIZE); + assert_eq!(mem::size_of::<Option<Receiver<[u8; 1024]>>>(), PTR_SIZE); +} + +/// Check that the `SendError` stays small. Useful to automatically detect if it is refactored +/// to become large. We do not want the stack requirement for calling `Sender::send` to grow. +#[test] +fn error_sizes() { + const PTR_SIZE: usize = mem::size_of::<usize>(); + + assert_eq!(mem::size_of::<oneshot::SendError<()>>(), PTR_SIZE); + assert_eq!(mem::size_of::<oneshot::SendError<u8>>(), PTR_SIZE); + assert_eq!(mem::size_of::<oneshot::SendError<[u8; 1024]>>(), PTR_SIZE); + + // The type returned from `Sender::send` is also just pointer sized + assert_eq!( + mem::size_of::<Result<(), oneshot::SendError<[u8; 1024]>>>(), + PTR_SIZE + ); +} diff --git a/third_party/rust/oneshot-uniffi/tests/async.rs b/third_party/rust/oneshot-uniffi/tests/async.rs new file mode 100644 index 0000000000..e7633aad82 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/tests/async.rs @@ -0,0 +1,128 @@ +#![cfg(all(feature = "async", not(loom)))] + +use core::mem; +use core::time::Duration; + +mod helpers; +use helpers::DropCounter; + +#[tokio::test] +async fn send_before_await_tokio() { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + assert_eq!(receiver.await, Ok(19i128)); +} + +#[async_std::test] +async fn send_before_await_async_std() { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + assert_eq!(receiver.await, Ok(19i128)); +} + +#[tokio::test] +async fn await_with_dropped_sender_tokio() { + let (sender, receiver) = oneshot::channel::<u128>(); + mem::drop(sender); + receiver.await.unwrap_err(); +} + +#[async_std::test] +async fn await_with_dropped_sender_async_std() { + let (sender, receiver) = oneshot::channel::<u128>(); + mem::drop(sender); + receiver.await.unwrap_err(); +} + +#[tokio::test] +async fn await_before_send_tokio() { + let (sender, receiver) = oneshot::channel(); + let (message, counter) = DropCounter::new(79u128); + let t = tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(10)).await; + sender.send(message) + }); + let returned_message = receiver.await.unwrap(); + assert_eq!(counter.count(), 0); + assert_eq!(*returned_message.value(), 79u128); + mem::drop(returned_message); + assert_eq!(counter.count(), 1); + t.await.unwrap().unwrap(); +} + +#[async_std::test] +async fn await_before_send_async_std() { + let (sender, receiver) = oneshot::channel(); + let (message, counter) = DropCounter::new(79u128); + let t = async_std::task::spawn(async move { + async_std::task::sleep(Duration::from_millis(10)).await; + sender.send(message) + }); + let returned_message = receiver.await.unwrap(); + assert_eq!(counter.count(), 0); + assert_eq!(*returned_message.value(), 79u128); + mem::drop(returned_message); + assert_eq!(counter.count(), 1); + t.await.unwrap(); +} + +#[tokio::test] +async fn await_before_send_then_drop_sender_tokio() { + let (sender, receiver) = oneshot::channel::<u128>(); + let t = tokio::spawn(async { + tokio::time::sleep(Duration::from_millis(10)).await; + mem::drop(sender); + }); + assert!(receiver.await.is_err()); + t.await.unwrap(); +} + +#[async_std::test] +async fn await_before_send_then_drop_sender_async_std() { + let (sender, receiver) = oneshot::channel::<u128>(); + let t = async_std::task::spawn(async { + async_std::task::sleep(Duration::from_millis(10)).await; + mem::drop(sender); + }); + assert!(receiver.await.is_err()); + t.await; +} + +// Tests that the Receiver handles being used synchronously even after being polled +#[tokio::test] +async fn poll_future_and_then_try_recv() { + use core::future::Future; + use core::pin::Pin; + use core::task::{self, Poll}; + + struct StupidReceiverFuture(oneshot::Receiver<()>); + + impl Future for StupidReceiverFuture { + type Output = Result<(), oneshot::RecvError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let poll_result = Future::poll(Pin::new(&mut self.0), cx); + self.0.try_recv().expect_err("Should never be a message"); + poll_result + } + } + + let (sender, receiver) = oneshot::channel(); + let t = tokio::spawn(async { + tokio::time::sleep(Duration::from_millis(20)).await; + mem::drop(sender); + }); + StupidReceiverFuture(receiver).await.unwrap_err(); + t.await.unwrap(); +} + +#[tokio::test] +async fn poll_receiver_then_drop_it() { + let (sender, receiver) = oneshot::channel::<()>(); + // This will poll the receiver and then give up after 100 ms. + tokio::time::timeout(Duration::from_millis(100), receiver) + .await + .unwrap_err(); + // Make sure the receiver has been dropped by the runtime. + assert!(sender.send(()).is_err()); +} diff --git a/third_party/rust/oneshot-uniffi/tests/future.rs b/third_party/rust/oneshot-uniffi/tests/future.rs new file mode 100644 index 0000000000..3895946bec --- /dev/null +++ b/third_party/rust/oneshot-uniffi/tests/future.rs @@ -0,0 +1,65 @@ +#![cfg(feature = "async")] + +use core::{future, mem, pin, task}; + +#[cfg(loom)] +pub use loom::sync::{Arc, Mutex}; +#[cfg(not(loom))] +pub use std::sync::{Arc, Mutex}; + +mod helpers; +use helpers::maybe_loom_model; + +#[test] +fn multiple_receiver_polls_keeps_only_latest_waker() { + #[derive(Default)] + struct MockWaker { + cloned: usize, + dropped: usize, + } + + fn clone_mock_waker(waker: *const ()) -> task::RawWaker { + let mock_waker = unsafe { Arc::from_raw(waker as *const Mutex<MockWaker>) }; + mock_waker.lock().unwrap().cloned += 1; + let new_waker = + task::RawWaker::new(Arc::into_raw(mock_waker.clone()) as *const (), &VTABLE); + mem::forget(mock_waker); + new_waker + } + + fn drop_mock_waker(waker: *const ()) { + let mock_waker = unsafe { Arc::from_raw(waker as *const Mutex<MockWaker>) }; + mock_waker.lock().unwrap().dropped += 1; + } + + const VTABLE: task::RawWakerVTable = + task::RawWakerVTable::new(clone_mock_waker, |_| (), |_| (), drop_mock_waker); + + maybe_loom_model(|| { + let mock_waker1 = Arc::new(Mutex::new(MockWaker::default())); + let raw_waker1 = + task::RawWaker::new(Arc::into_raw(mock_waker1.clone()) as *const (), &VTABLE); + let waker1 = unsafe { task::Waker::from_raw(raw_waker1) }; + let mut context1 = task::Context::from_waker(&waker1); + + let (_sender, mut receiver) = oneshot::channel::<()>(); + + let poll_result = future::Future::poll(pin::Pin::new(&mut receiver), &mut context1); + assert_eq!(poll_result, task::Poll::Pending); + assert_eq!(mock_waker1.lock().unwrap().cloned, 1); + assert_eq!(mock_waker1.lock().unwrap().dropped, 0); + + let mock_waker2 = Arc::new(Mutex::new(MockWaker::default())); + let raw_waker2 = + task::RawWaker::new(Arc::into_raw(mock_waker2.clone()) as *const (), &VTABLE); + let waker2 = unsafe { task::Waker::from_raw(raw_waker2) }; + let mut context2 = task::Context::from_waker(&waker2); + + let poll_result = future::Future::poll(pin::Pin::new(&mut receiver), &mut context2); + assert_eq!(poll_result, task::Poll::Pending); + assert_eq!(mock_waker2.lock().unwrap().cloned, 1); + assert_eq!(mock_waker2.lock().unwrap().dropped, 0); + assert_eq!(mock_waker1.lock().unwrap().cloned, 1); + assert_eq!(mock_waker1.lock().unwrap().dropped, 1); + }); +} diff --git a/third_party/rust/oneshot-uniffi/tests/helpers/mod.rs b/third_party/rust/oneshot-uniffi/tests/helpers/mod.rs new file mode 100644 index 0000000000..1b145396e7 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/tests/helpers/mod.rs @@ -0,0 +1,63 @@ +#![allow(dead_code)] + +extern crate alloc; + +#[cfg(not(loom))] +use alloc::sync::Arc; +#[cfg(not(loom))] +use core::sync::atomic::{AtomicUsize, Ordering::SeqCst}; +#[cfg(loom)] +use loom::sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, +}; + +#[cfg(loom)] +pub mod waker; + +pub fn maybe_loom_model(test: impl Fn() + Sync + Send + 'static) { + #[cfg(loom)] + loom::model(test); + #[cfg(not(loom))] + test(); +} + +pub struct DropCounter<T> { + drop_count: Arc<AtomicUsize>, + value: Option<T>, +} + +pub struct DropCounterHandle(Arc<AtomicUsize>); + +impl<T> DropCounter<T> { + pub fn new(value: T) -> (Self, DropCounterHandle) { + let drop_count = Arc::new(AtomicUsize::new(0)); + ( + Self { + drop_count: drop_count.clone(), + value: Some(value), + }, + DropCounterHandle(drop_count), + ) + } + + pub fn value(&self) -> &T { + self.value.as_ref().unwrap() + } + + pub fn into_value(mut self) -> T { + self.value.take().unwrap() + } +} + +impl DropCounterHandle { + pub fn count(&self) -> usize { + self.0.load(SeqCst) + } +} + +impl<T> Drop for DropCounter<T> { + fn drop(&mut self) { + self.drop_count.fetch_add(1, SeqCst); + } +} diff --git a/third_party/rust/oneshot-uniffi/tests/helpers/waker.rs b/third_party/rust/oneshot-uniffi/tests/helpers/waker.rs new file mode 100644 index 0000000000..2e3f1bee19 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/tests/helpers/waker.rs @@ -0,0 +1,64 @@ +//! Creates a Waker that can be observed from tests. + +use std::mem::forget; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::task::{RawWaker, RawWakerVTable, Waker}; + +#[derive(Default)] +pub struct WakerHandle { + clone_count: AtomicU32, + drop_count: AtomicU32, + wake_count: AtomicU32, +} + +impl WakerHandle { + pub fn clone_count(&self) -> u32 { + self.clone_count.load(Ordering::Relaxed) + } + + pub fn drop_count(&self) -> u32 { + self.drop_count.load(Ordering::Relaxed) + } + + pub fn wake_count(&self) -> u32 { + self.wake_count.load(Ordering::Relaxed) + } +} + +pub fn waker() -> (Waker, Arc<WakerHandle>) { + let waker_handle = Arc::new(WakerHandle::default()); + let waker_handle_ptr = Arc::into_raw(waker_handle.clone()); + let raw_waker = RawWaker::new(waker_handle_ptr as *const _, waker_vtable()); + (unsafe { Waker::from_raw(raw_waker) }, waker_handle) +} + +pub(super) fn waker_vtable() -> &'static RawWakerVTable { + &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) +} + +unsafe fn clone_raw(data: *const ()) -> RawWaker { + let handle: Arc<WakerHandle> = Arc::from_raw(data as *const _); + handle.clone_count.fetch_add(1, Ordering::Relaxed); + forget(handle.clone()); + forget(handle); + RawWaker::new(data, waker_vtable()) +} + +unsafe fn wake_raw(data: *const ()) { + let handle: Arc<WakerHandle> = Arc::from_raw(data as *const _); + handle.wake_count.fetch_add(1, Ordering::Relaxed); + handle.drop_count.fetch_add(1, Ordering::Relaxed); +} + +unsafe fn wake_by_ref_raw(data: *const ()) { + let handle: Arc<WakerHandle> = Arc::from_raw(data as *const _); + handle.wake_count.fetch_add(1, Ordering::Relaxed); + forget(handle) +} + +unsafe fn drop_raw(data: *const ()) { + let handle: Arc<WakerHandle> = Arc::from_raw(data as *const _); + handle.drop_count.fetch_add(1, Ordering::Relaxed); + drop(handle) +} diff --git a/third_party/rust/oneshot-uniffi/tests/loom.rs b/third_party/rust/oneshot-uniffi/tests/loom.rs new file mode 100644 index 0000000000..a7625a494d --- /dev/null +++ b/third_party/rust/oneshot-uniffi/tests/loom.rs @@ -0,0 +1,223 @@ +#![cfg(loom)] + +use oneshot::TryRecvError; + +use loom::hint; +use loom::thread; +#[cfg(feature = "async")] +use std::future::Future; +#[cfg(feature = "async")] +use std::pin::Pin; +#[cfg(feature = "async")] +use std::task::{self, Poll}; +#[cfg(feature = "std")] +use std::time::Duration; + +mod helpers; + +#[test] +fn try_recv() { + loom::model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + + let t = thread::spawn(move || loop { + match receiver.try_recv() { + Ok(msg) => break msg, + Err(TryRecvError::Empty) => hint::spin_loop(), + Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"), + } + }); + + assert!(sender.send(19).is_ok()); + assert_eq!(t.join().unwrap(), 19); + }) +} + +#[cfg(feature = "std")] +#[test] +fn send_recv_different_threads() { + loom::model(|| { + let (sender, receiver) = oneshot::channel(); + let t2 = thread::spawn(move || { + assert_eq!(receiver.recv_timeout(Duration::from_millis(1)), Ok(9)); + }); + let t1 = thread::spawn(move || { + sender.send(9u128).unwrap(); + }); + t1.join().unwrap(); + t2.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_drop_sender_different_threads() { + loom::model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + let t2 = thread::spawn(move || { + assert!(receiver.recv_timeout(Duration::from_millis(0)).is_err()); + }); + let t1 = thread::spawn(move || { + drop(sender); + }); + t1.join().unwrap(); + t2.join().unwrap(); + }) +} + +#[cfg(feature = "async")] +#[test] +fn async_recv() { + loom::model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + let t1 = thread::spawn(move || { + sender.send(987).unwrap(); + }); + assert_eq!(loom::future::block_on(receiver), Ok(987)); + t1.join().unwrap(); + }) +} + +#[cfg(feature = "async")] +#[test] +fn send_then_poll() { + loom::model(|| { + let (sender, mut receiver) = oneshot::channel::<u128>(); + sender.send(1234).unwrap(); + + let (waker, waker_handle) = helpers::waker::waker(); + let mut context = task::Context::from_waker(&waker); + + assert_eq!( + Pin::new(&mut receiver).poll(&mut context), + Poll::Ready(Ok(1234)) + ); + assert_eq!(waker_handle.clone_count(), 0); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + }) +} + +#[cfg(feature = "async")] +#[test] +fn poll_then_send() { + loom::model(|| { + let (sender, mut receiver) = oneshot::channel::<u128>(); + + let (waker, waker_handle) = helpers::waker::waker(); + let mut context = task::Context::from_waker(&waker); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + + sender.send(1234).unwrap(); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 1); + + assert_eq!( + Pin::new(&mut receiver).poll(&mut context), + Poll::Ready(Ok(1234)) + ); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 1); + }) +} + +#[cfg(feature = "async")] +#[test] +fn poll_with_different_wakers() { + loom::model(|| { + let (sender, mut receiver) = oneshot::channel::<u128>(); + + let (waker1, waker_handle1) = helpers::waker::waker(); + let mut context1 = task::Context::from_waker(&waker1); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context1), Poll::Pending); + assert_eq!(waker_handle1.clone_count(), 1); + assert_eq!(waker_handle1.drop_count(), 0); + assert_eq!(waker_handle1.wake_count(), 0); + + let (waker2, waker_handle2) = helpers::waker::waker(); + let mut context2 = task::Context::from_waker(&waker2); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context2), Poll::Pending); + assert_eq!(waker_handle1.clone_count(), 1); + assert_eq!(waker_handle1.drop_count(), 1); + assert_eq!(waker_handle1.wake_count(), 0); + + assert_eq!(waker_handle2.clone_count(), 1); + assert_eq!(waker_handle2.drop_count(), 0); + assert_eq!(waker_handle2.wake_count(), 0); + + // Sending should cause the waker from the latest poll to be woken up + sender.send(1234).unwrap(); + assert_eq!(waker_handle1.clone_count(), 1); + assert_eq!(waker_handle1.drop_count(), 1); + assert_eq!(waker_handle1.wake_count(), 0); + + assert_eq!(waker_handle2.clone_count(), 1); + assert_eq!(waker_handle2.drop_count(), 1); + assert_eq!(waker_handle2.wake_count(), 1); + }) +} + +#[cfg(feature = "async")] +#[test] +fn poll_then_try_recv() { + loom::model(|| { + let (_sender, mut receiver) = oneshot::channel::<u128>(); + + let (waker, waker_handle) = helpers::waker::waker(); + let mut context = task::Context::from_waker(&waker); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + + assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty)); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 2); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 0); + }) +} + +#[cfg(feature = "async")] +#[test] +fn poll_then_try_recv_while_sending() { + loom::model(|| { + let (sender, mut receiver) = oneshot::channel::<u128>(); + + let (waker, waker_handle) = helpers::waker::waker(); + let mut context = task::Context::from_waker(&waker); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + + let t = thread::spawn(move || { + sender.send(1234).unwrap(); + }); + + let msg = loop { + match receiver.try_recv() { + Ok(msg) => break msg, + Err(TryRecvError::Empty) => hint::spin_loop(), + Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"), + } + }; + assert_eq!(msg, 1234); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 1); + + t.join().unwrap(); + }) +} diff --git a/third_party/rust/oneshot-uniffi/tests/sync.rs b/third_party/rust/oneshot-uniffi/tests/sync.rs new file mode 100644 index 0000000000..c6ba081c66 --- /dev/null +++ b/third_party/rust/oneshot-uniffi/tests/sync.rs @@ -0,0 +1,343 @@ +use core::mem; +use oneshot::TryRecvError; + +#[cfg(feature = "std")] +use oneshot::{RecvError, RecvTimeoutError}; +#[cfg(feature = "std")] +use std::time::{Duration, Instant}; + +#[cfg(feature = "std")] +mod thread { + #[cfg(loom)] + pub use loom::thread::spawn; + #[cfg(not(loom))] + pub use std::thread::{sleep, spawn}; + + #[cfg(loom)] + pub fn sleep(_timeout: core::time::Duration) { + loom::thread::yield_now() + } +} + +mod helpers; +use helpers::{maybe_loom_model, DropCounter}; + +#[test] +fn send_before_try_recv() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + + assert_eq!(receiver.try_recv(), Ok(19i128)); + assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); + #[cfg(feature = "std")] + { + assert_eq!(receiver.recv_ref(), Err(RecvError)); + assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err()); + } + }) +} + +#[cfg(feature = "std")] +#[test] +fn send_before_recv() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<()>(); + assert!(sender.send(()).is_ok()); + assert_eq!(receiver.recv(), Ok(())); + }); + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<u8>(); + assert!(sender.send(19).is_ok()); + assert_eq!(receiver.recv(), Ok(19)); + }); + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<u64>(); + assert!(sender.send(21).is_ok()); + assert_eq!(receiver.recv(), Ok(21)); + }); + // FIXME: This test does not work with loom. There is something that happens after the + // channel object becomes larger than ~500 bytes and that makes an atomic read from the state + // result in "signal: 10, SIGBUS: access to undefined memory" + #[cfg(not(loom))] + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<[u8; 4096]>(); + assert!(sender.send([0b10101010; 4096]).is_ok()); + assert!(receiver.recv().unwrap()[..] == [0b10101010; 4096][..]); + }); +} + +#[cfg(feature = "std")] +#[test] +fn send_before_recv_ref() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + + assert_eq!(receiver.recv_ref(), Ok(19i128)); + assert_eq!(receiver.recv_ref(), Err(RecvError)); + assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); + assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err()); + }) +} + +#[cfg(feature = "std")] +#[test] +fn send_before_recv_timeout() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + + let start = Instant::now(); + let timeout = Duration::from_secs(1); + assert_eq!(receiver.recv_timeout(timeout), Ok(19i128)); + assert!(start.elapsed() < Duration::from_millis(100)); + + assert!(receiver.recv_timeout(timeout).is_err()); + assert!(receiver.try_recv().is_err()); + assert!(receiver.recv().is_err()); + }) +} + +#[test] +fn send_then_drop_receiver() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + mem::drop(receiver); + }) +} + +#[test] +fn send_with_dropped_receiver() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + mem::drop(receiver); + let send_error = sender.send(5u128).unwrap_err(); + assert_eq!(*send_error.as_inner(), 5); + assert_eq!(send_error.into_inner(), 5); + }) +} + +#[test] +fn try_recv_with_dropped_sender() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + mem::drop(sender); + receiver.try_recv().unwrap_err(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_with_dropped_sender() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + mem::drop(sender); + receiver.recv().unwrap_err(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_before_send() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv(), Ok(9)); + t.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_timeout_before_send() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv_timeout(Duration::from_secs(1)), Ok(9)); + t.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_before_send_then_drop_sender() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(10)); + mem::drop(sender); + }); + assert!(receiver.recv().is_err()); + t.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_timeout_before_send_then_drop_sender() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(10)); + mem::drop(sender); + }); + assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err()); + t.join().unwrap(); + }) +} + +#[test] +fn try_recv() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty)); + mem::drop(sender) + }) +} + +#[cfg(feature = "std")] +#[test] +fn try_recv_then_drop_receiver() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<u128>(); + let t1 = thread::spawn(move || { + let _ = sender.send(42); + }); + let t2 = thread::spawn(move || { + assert!(matches!( + receiver.try_recv(), + Ok(42) | Err(TryRecvError::Empty) + )); + mem::drop(receiver); + }); + t1.join().unwrap(); + t2.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_deadline_and_timeout_no_time() { + maybe_loom_model(|| { + let (_sender, receiver) = oneshot::channel::<u128>(); + + let start = Instant::now(); + assert_eq!( + receiver.recv_deadline(start), + Err(RecvTimeoutError::Timeout) + ); + assert!(start.elapsed() < Duration::from_millis(200)); + + let start = Instant::now(); + assert_eq!( + receiver.recv_timeout(Duration::from_millis(0)), + Err(RecvTimeoutError::Timeout) + ); + assert!(start.elapsed() < Duration::from_millis(200)); + }) +} + +// This test doesn't give meaningful results when run with oneshot_test_delay and loom +#[cfg(all(feature = "std", not(all(oneshot_test_delay, loom))))] +#[test] +fn recv_deadline_time_should_elapse() { + maybe_loom_model(|| { + let (_sender, receiver) = oneshot::channel::<u128>(); + + let start = Instant::now(); + #[cfg(not(loom))] + let timeout = Duration::from_millis(100); + #[cfg(loom)] + let timeout = Duration::from_millis(1); + assert_eq!( + receiver.recv_deadline(start + timeout), + Err(RecvTimeoutError::Timeout) + ); + assert!(start.elapsed() > timeout); + assert!(start.elapsed() < timeout * 3); + }) +} + +#[cfg(all(feature = "std", not(all(oneshot_test_delay, loom))))] +#[test] +fn recv_timeout_time_should_elapse() { + maybe_loom_model(|| { + let (_sender, receiver) = oneshot::channel::<u128>(); + + let start = Instant::now(); + #[cfg(not(loom))] + let timeout = Duration::from_millis(100); + #[cfg(loom)] + let timeout = Duration::from_millis(1); + + assert_eq!( + receiver.recv_timeout(timeout), + Err(RecvTimeoutError::Timeout) + ); + assert!(start.elapsed() > timeout); + assert!(start.elapsed() < timeout * 3); + }) +} + +#[cfg(not(loom))] +#[test] +fn non_send_type_can_be_used_on_same_thread() { + use std::ptr; + + #[derive(Debug, Eq, PartialEq)] + struct NotSend(*mut ()); + + let (sender, receiver) = oneshot::channel(); + sender.send(NotSend(ptr::null_mut())).unwrap(); + let reply = receiver.try_recv().unwrap(); + assert_eq!(reply, NotSend(ptr::null_mut())); +} + +#[test] +fn message_in_channel_dropped_on_receiver_drop() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + let (message, counter) = DropCounter::new(()); + assert_eq!(counter.count(), 0); + sender.send(message).unwrap(); + assert_eq!(counter.count(), 0); + mem::drop(receiver); + assert_eq!(counter.count(), 1); + }) +} + +#[test] +fn send_error_drops_message_correctly() { + maybe_loom_model(|| { + let (sender, _) = oneshot::channel(); + let (message, counter) = DropCounter::new(()); + + let send_error = sender.send(message).unwrap_err(); + assert_eq!(counter.count(), 0); + mem::drop(send_error); + assert_eq!(counter.count(), 1); + }); +} + +#[test] +fn send_error_drops_message_correctly_on_into_inner() { + maybe_loom_model(|| { + let (sender, _) = oneshot::channel(); + let (message, counter) = DropCounter::new(()); + + let send_error = sender.send(message).unwrap_err(); + assert_eq!(counter.count(), 0); + let message = send_error.into_inner(); + assert_eq!(counter.count(), 0); + mem::drop(message); + assert_eq!(counter.count(), 1); + }); +} |