diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-channel | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-channel')
18 files changed, 3941 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/.cargo-checksum.json b/third_party/rust/futures-channel/.cargo-checksum.json new file mode 100644 index 0000000000..6d5256b624 --- /dev/null +++ b/third_party/rust/futures-channel/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"2e3760b2b6299bb98bbbb290e567e2cf3247b31079bdaa8eec5064d7917cd7bb","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"fb9330147e41a15b5e569b8bad7692628be89b5fc219a5323a57fa63024c1684","benches/sync_mpsc.rs":"1019dd027f104f58883f396ff70efc3dd69b3a7d62df17af090e07b2b05eaf66","build.rs":"5b263bd2bd587511a9c8daef580b05e0613c15a6c5f800b1e5bc145fa013d99e","no_atomic_cas.rs":"7ae747b83b08dd926c1696faf4ecab9399c652ae77d5179221258c73b8eecb6f","src/lib.rs":"2955e70d292208747fbb29810ef88f390f0f1b22b112fa59d60f95480d470e75","src/lock.rs":"38655a797456ea4f67d132c42055cf74f18195e875c3b337fc81a12901f79292","src/mpsc/mod.rs":"71c8fb3ac645bc587684a9e115b8859044acbade540299a1f9dd952aa27d6ba5","src/mpsc/queue.rs":"8822f466e7fe5a8d25ba994b7022ad7c14bcfd473d354a6cd0490240d3e170e7","src/mpsc/sink_impl.rs":"c9977b530187e82c912fcd46e08316e48ed246e77bb2419d53020e69e403d086","src/oneshot.rs":"d1170289b39656ea5f0d5f42b905ddbd5fa9c1202aa3297c9f25280a48229910","tests/channel.rs":"88f4a41d82b5c1b01e153d071a2bf48e0697355908c55ca42342ed45e63fdec8","tests/mpsc-close.rs":"cb3a427403051a731701de5d2a489f8a7b7a5eaceb5edfafef4a539e63588d3c","tests/mpsc.rs":"8b0caa7a6c45c0878e0384485b848ac496e4bcd75a0de598e0aee5246348a71b","tests/oneshot.rs":"0f97d28852a1fd1327211772f43322c93916a639be3f2581e49ad37c9f8a2f88"},"package":"52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"}
\ No newline at end of file diff --git a/third_party/rust/futures-channel/Cargo.toml b/third_party/rust/futures-channel/Cargo.toml new file mode 100644 index 0000000000..155347dacd --- /dev/null +++ b/third_party/rust/futures-channel/Cargo.toml @@ -0,0 +1,52 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +rust-version = "1.45" +name = "futures-channel" +version = "0.3.25" +description = """ +Channels for asynchronous communication using futures-rs. +""" +homepage = "https://rust-lang.github.io/futures-rs" +readme = "README.md" +license = "MIT OR Apache-2.0" +repository = "https://github.com/rust-lang/futures-rs" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = [ + "--cfg", + "docsrs", +] + +[dependencies.futures-core] +version = "0.3.25" +default-features = false + +[dependencies.futures-sink] +version = "0.3.25" +optional = true +default-features = false + +[dev-dependencies] + +[features] +alloc = ["futures-core/alloc"] +cfg-target-has-atomic = [] +default = ["std"] +sink = ["futures-sink"] +std = [ + "alloc", + "futures-core/std", +] +unstable = [] diff --git a/third_party/rust/futures-channel/LICENSE-APACHE b/third_party/rust/futures-channel/LICENSE-APACHE new file mode 100644 index 0000000000..9eb0b097f5 --- /dev/null +++ b/third_party/rust/futures-channel/LICENSE-APACHE @@ -0,0 +1,202 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright (c) 2016 Alex Crichton +Copyright (c) 2017 The Tokio Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/third_party/rust/futures-channel/LICENSE-MIT b/third_party/rust/futures-channel/LICENSE-MIT new file mode 100644 index 0000000000..8ad082ec4f --- /dev/null +++ b/third_party/rust/futures-channel/LICENSE-MIT @@ -0,0 +1,26 @@ +Copyright (c) 2016 Alex Crichton +Copyright (c) 2017 The Tokio Authors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/futures-channel/README.md b/third_party/rust/futures-channel/README.md new file mode 100644 index 0000000000..3287be924c --- /dev/null +++ b/third_party/rust/futures-channel/README.md @@ -0,0 +1,23 @@ +# futures-channel + +Channels for asynchronous communication using futures-rs. + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +futures-channel = "0.3" +``` + +The current `futures-channel` requires Rust 1.45 or later. + +## License + +Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or +[MIT license](LICENSE-MIT) at your option. + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall +be dual licensed as above, without any additional terms or conditions. diff --git a/third_party/rust/futures-channel/benches/sync_mpsc.rs b/third_party/rust/futures-channel/benches/sync_mpsc.rs new file mode 100644 index 0000000000..7c3c3d3a80 --- /dev/null +++ b/third_party/rust/futures-channel/benches/sync_mpsc.rs @@ -0,0 +1,135 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use { + futures::{ + channel::mpsc::{self, Sender, UnboundedSender}, + ready, + sink::Sink, + stream::{Stream, StreamExt}, + task::{Context, Poll}, + }, + futures_test::task::noop_context, + std::pin::Pin, +}; + +/// Single producer, single consumer +#[bench] +fn unbounded_1_tx(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + let (tx, mut rx) = mpsc::unbounded(); + + // 1000 iterations to avoid measuring overhead of initialization + // Result should be divided by 1000 + for i in 0..1000 { + // Poll, not ready, park + assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); + + UnboundedSender::unbounded_send(&tx, i).unwrap(); + + // Now poll ready + assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); + } + }) +} + +/// 100 producers, single consumer +#[bench] +fn unbounded_100_tx(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + let (tx, mut rx) = mpsc::unbounded(); + + let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect(); + + // 1000 send/recv operations total, result should be divided by 1000 + for _ in 0..10 { + for (i, x) in tx.iter().enumerate() { + assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); + + UnboundedSender::unbounded_send(x, i).unwrap(); + + assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); + } + } + }) +} + +#[bench] +fn unbounded_uncontended(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + let (tx, mut rx) = mpsc::unbounded(); + + for i in 0..1000 { + UnboundedSender::unbounded_send(&tx, i).expect("send"); + // No need to create a task, because poll is not going to park. + assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); + } + }) +} + +/// A Stream that continuously sends incrementing number of the queue +struct TestSender { + tx: Sender<u32>, + last: u32, // Last number sent +} + +// Could be a Future, it doesn't matter +impl Stream for TestSender { + type Item = u32; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let this = &mut *self; + let mut tx = Pin::new(&mut this.tx); + + ready!(tx.as_mut().poll_ready(cx)).unwrap(); + tx.as_mut().start_send(this.last + 1).unwrap(); + this.last += 1; + assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx)); + Poll::Ready(Some(this.last)) + } +} + +/// Single producers, single consumer +#[bench] +fn bounded_1_tx(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + let (tx, mut rx) = mpsc::channel(0); + + let mut tx = TestSender { tx, last: 0 }; + + for i in 0..1000 { + assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); + } + }) +} + +/// 100 producers, single consumer +#[bench] +fn bounded_100_tx(b: &mut Bencher) { + let mut cx = noop_context(); + b.iter(|| { + // Each sender can send one item after specified capacity + let (tx, mut rx) = mpsc::channel(0); + + let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect(); + + for i in 0..10 { + for x in &mut tx { + // Send an item + assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx)); + // Then block + assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx)); + // Recv the item + assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); + } + } + }) +} diff --git a/third_party/rust/futures-channel/build.rs b/third_party/rust/futures-channel/build.rs new file mode 100644 index 0000000000..05e0496d94 --- /dev/null +++ b/third_party/rust/futures-channel/build.rs @@ -0,0 +1,41 @@ +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `futures_no_atomic_cas` +// Assume the target does *not* support atomic CAS operations. +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg emitted by the build +// script are *not* public API. + +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use std::env; + +include!("no_atomic_cas.rs"); + +fn main() { + let target = match env::var("TARGET") { + Ok(target) => target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't + // run. This is needed for compatibility with non-cargo build systems that + // don't run the build script. + if NO_ATOMIC_CAS.contains(&&*target) { + println!("cargo:rustc-cfg=futures_no_atomic_cas"); + } + + println!("cargo:rerun-if-changed=no_atomic_cas.rs"); +} diff --git a/third_party/rust/futures-channel/no_atomic_cas.rs b/third_party/rust/futures-channel/no_atomic_cas.rs new file mode 100644 index 0000000000..16ec628cdf --- /dev/null +++ b/third_party/rust/futures-channel/no_atomic_cas.rs @@ -0,0 +1,17 @@ +// This file is @generated by no_atomic_cas.sh. +// It is not intended for manual editing. + +const NO_ATOMIC_CAS: &[&str] = &[ + "armv4t-none-eabi", + "armv5te-none-eabi", + "avr-unknown-gnu-atmega328", + "bpfeb-unknown-none", + "bpfel-unknown-none", + "msp430-none-elf", + "riscv32i-unknown-none-elf", + "riscv32im-unknown-none-elf", + "riscv32imc-unknown-none-elf", + "thumbv4t-none-eabi", + "thumbv5te-none-eabi", + "thumbv6m-none-eabi", +]; diff --git a/third_party/rust/futures-channel/src/lib.rs b/third_party/rust/futures-channel/src/lib.rs new file mode 100644 index 0000000000..4cd936d552 --- /dev/null +++ b/third_party/rust/futures-channel/src/lib.rs @@ -0,0 +1,42 @@ +//! Asynchronous channels. +//! +//! Like threads, concurrent tasks sometimes need to communicate with each +//! other. This module contains two basic abstractions for doing so: +//! +//! - [oneshot], a way of sending a single value from one task to another. +//! - [mpsc], a multi-producer, single-consumer channel for sending values +//! between tasks, analogous to the similarly-named structure in the standard +//! library. +//! +//! All items are only available when the `std` or `alloc` feature of this +//! library is activated, and it is activated by default. + +#![cfg_attr(not(feature = "std"), no_std)] +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + single_use_lifetimes, + unreachable_pub +)] +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms, single_use_lifetimes), + allow(dead_code, unused_assignments, unused_variables) + ) +))] + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +extern crate alloc; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod lock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "std")] +pub mod mpsc; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub mod oneshot; diff --git a/third_party/rust/futures-channel/src/lock.rs b/third_party/rust/futures-channel/src/lock.rs new file mode 100644 index 0000000000..b328d0f7dd --- /dev/null +++ b/third_party/rust/futures-channel/src/lock.rs @@ -0,0 +1,102 @@ +//! A "mutex" which only supports `try_lock` +//! +//! As a futures library the eventual call to an event loop should be the only +//! thing that ever blocks, so this is assisted with a fast user-space +//! implementation of a lock that can only have a `try_lock` operation. + +use core::cell::UnsafeCell; +use core::ops::{Deref, DerefMut}; +use core::sync::atomic::AtomicBool; +use core::sync::atomic::Ordering::SeqCst; + +/// A "mutex" around a value, similar to `std::sync::Mutex<T>`. +/// +/// This lock only supports the `try_lock` operation, however, and does not +/// implement poisoning. +#[derive(Debug)] +pub(crate) struct Lock<T> { + locked: AtomicBool, + data: UnsafeCell<T>, +} + +/// Sentinel representing an acquired lock through which the data can be +/// accessed. +pub(crate) struct TryLock<'a, T> { + __ptr: &'a Lock<T>, +} + +// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are +// intended to mirror the standard library's corresponding impls for `Mutex<T>`. +// +// If a `T` is sendable across threads, so is the lock, and `T` must be sendable +// across threads to be `Sync` because it allows mutable access from multiple +// threads. +unsafe impl<T: Send> Send for Lock<T> {} +unsafe impl<T: Send> Sync for Lock<T> {} + +impl<T> Lock<T> { + /// Creates a new lock around the given value. + pub(crate) fn new(t: T) -> Self { + Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) } + } + + /// Attempts to acquire this lock, returning whether the lock was acquired or + /// not. + /// + /// If `Some` is returned then the data this lock protects can be accessed + /// through the sentinel. This sentinel allows both mutable and immutable + /// access. + /// + /// If `None` is returned then the lock is already locked, either elsewhere + /// on this thread or on another thread. + pub(crate) fn try_lock(&self) -> Option<TryLock<'_, T>> { + if !self.locked.swap(true, SeqCst) { + Some(TryLock { __ptr: self }) + } else { + None + } + } +} + +impl<T> Deref for TryLock<'_, T> { + type Target = T; + fn deref(&self) -> &T { + // The existence of `TryLock` represents that we own the lock, so we + // can safely access the data here. + unsafe { &*self.__ptr.data.get() } + } +} + +impl<T> DerefMut for TryLock<'_, T> { + fn deref_mut(&mut self) -> &mut T { + // The existence of `TryLock` represents that we own the lock, so we + // can safely access the data here. + // + // Additionally, we're the *only* `TryLock` in existence so mutable + // access should be ok. + unsafe { &mut *self.__ptr.data.get() } + } +} + +impl<T> Drop for TryLock<'_, T> { + fn drop(&mut self) { + self.__ptr.locked.store(false, SeqCst); + } +} + +#[cfg(test)] +mod tests { + use super::Lock; + + #[test] + fn smoke() { + let a = Lock::new(1); + let mut a1 = a.try_lock().unwrap(); + assert!(a.try_lock().is_none()); + assert_eq!(*a1, 1); + *a1 = 2; + drop(a1); + assert_eq!(*a.try_lock().unwrap(), 2); + assert_eq!(*a.try_lock().unwrap(), 2); + } +} diff --git a/third_party/rust/futures-channel/src/mpsc/mod.rs b/third_party/rust/futures-channel/src/mpsc/mod.rs new file mode 100644 index 0000000000..44834b7c95 --- /dev/null +++ b/third_party/rust/futures-channel/src/mpsc/mod.rs @@ -0,0 +1,1308 @@ +//! A multi-producer, single-consumer queue for sending values across +//! asynchronous tasks. +//! +//! Similarly to the `std`, channel creation provides [`Receiver`] and +//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to +//! read values out of the channel. If there is no message to read from the +//! channel, the current task will be notified when a new value is sent. +//! [`Sender`] implements the `Sink` trait and allows a task to send messages into +//! the channel. If the channel is at capacity, the send will be rejected and +//! the task will be notified when additional capacity is available. In other +//! words, the channel provides backpressure. +//! +//! Unbounded channels are also available using the `unbounded` constructor. +//! +//! # Disconnection +//! +//! When all [`Sender`] handles have been dropped, it is no longer +//! possible to send values into the channel. This is considered the termination +//! event of the stream. As such, [`Receiver::poll_next`] +//! will return `Ok(Ready(None))`. +//! +//! If the [`Receiver`] handle is dropped, then messages can no longer +//! be read out of the channel. In this case, all further attempts to send will +//! result in an error. +//! +//! # Clean Shutdown +//! +//! If the [`Receiver`] is simply dropped, then it is possible for +//! there to be messages still in the channel that will not be processed. As +//! such, it is usually desirable to perform a "clean" shutdown. To do this, the +//! receiver will first call `close`, which will prevent any further messages to +//! be sent into the channel. Then, the receiver consumes the channel to +//! completion, at which point the receiver can be dropped. +//! +//! [`Sender`]: struct.Sender.html +//! [`Receiver`]: struct.Receiver.html +//! [`Stream`]: ../../futures_core/stream/trait.Stream.html +//! [`Receiver::poll_next`]: +//! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next + +// At the core, the channel uses an atomic FIFO queue for message passing. This +// queue is used as the primary coordination primitive. In order to enforce +// capacity limits and handle back pressure, a secondary FIFO queue is used to +// send parked task handles. +// +// The general idea is that the channel is created with a `buffer` size of `n`. +// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" +// slot to hold a message. This allows `Sender` to know for a fact that a send +// will succeed *before* starting to do the actual work of sending the value. +// Since most of this work is lock-free, once the work starts, it is impossible +// to safely revert. +// +// If the sender is unable to process a send operation, then the current +// task is parked and the handle is sent on the parked task queue. +// +// Note that the implementation guarantees that the channel capacity will never +// exceed the configured limit, however there is no *strict* guarantee that the +// receiver will wake up a parked task *immediately* when a slot becomes +// available. However, it will almost always unpark a task when a slot becomes +// available and it is *guaranteed* that a sender will be unparked when the +// message that caused the sender to become parked is read out of the channel. +// +// The steps for sending a message are roughly: +// +// 1) Increment the channel message count +// 2) If the channel is at capacity, push the task handle onto the wait queue +// 3) Push the message onto the message queue. +// +// The steps for receiving a message are roughly: +// +// 1) Pop a message from the message queue +// 2) Pop a task handle from the wait queue +// 3) Decrement the channel message count. +// +// It's important for the order of operations on lock-free structures to happen +// in reverse order between the sender and receiver. This makes the message +// queue the primary coordination structure and establishes the necessary +// happens-before semantics required for the acquire / release semantics used +// by the queue structure. + +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::__internal::AtomicWaker; +use futures_core::task::{Context, Poll, Waker}; +use std::fmt; +use std::pin::Pin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; +use std::thread; + +use crate::mpsc::queue::Queue; + +mod queue; +#[cfg(feature = "sink")] +mod sink_impl; + +#[derive(Debug)] +struct UnboundedSenderInner<T> { + // Channel state shared between the sender and receiver. + inner: Arc<UnboundedInner<T>>, +} + +#[derive(Debug)] +struct BoundedSenderInner<T> { + // Channel state shared between the sender and receiver. + inner: Arc<BoundedInner<T>>, + + // Handle to the task that is blocked on this sender. This handle is sent + // to the receiver half in order to be notified when the sender becomes + // unblocked. + sender_task: Arc<Mutex<SenderTask>>, + + // `true` if the sender might be blocked. This is an optimization to avoid + // having to lock the mutex most of the time. + maybe_parked: bool, +} + +// We never project Pin<&mut SenderInner> to `Pin<&mut T>` +impl<T> Unpin for UnboundedSenderInner<T> {} +impl<T> Unpin for BoundedSenderInner<T> {} + +/// The transmission end of a bounded mpsc channel. +/// +/// This value is created by the [`channel`](channel) function. +#[derive(Debug)] +pub struct Sender<T>(Option<BoundedSenderInner<T>>); + +/// The transmission end of an unbounded mpsc channel. +/// +/// This value is created by the [`unbounded`](unbounded) function. +#[derive(Debug)] +pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>); + +trait AssertKinds: Send + Sync + Clone {} +impl AssertKinds for UnboundedSender<u32> {} + +/// The receiving end of a bounded mpsc channel. +/// +/// This value is created by the [`channel`](channel) function. +#[derive(Debug)] +pub struct Receiver<T> { + inner: Option<Arc<BoundedInner<T>>>, +} + +/// The receiving end of an unbounded mpsc channel. +/// +/// This value is created by the [`unbounded`](unbounded) function. +#[derive(Debug)] +pub struct UnboundedReceiver<T> { + inner: Option<Arc<UnboundedInner<T>>>, +} + +// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>` +impl<T> Unpin for UnboundedReceiver<T> {} + +/// The error type for [`Sender`s](Sender) used as `Sink`s. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SendError { + kind: SendErrorKind, +} + +/// The error type returned from [`try_send`](Sender::try_send). +#[derive(Clone, PartialEq, Eq)] +pub struct TrySendError<T> { + err: SendError, + val: T, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +enum SendErrorKind { + Full, + Disconnected, +} + +/// The error type returned from [`try_next`](Receiver::try_next). +pub struct TryRecvError { + _priv: (), +} + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.is_full() { + write!(f, "send failed because channel is full") + } else { + write!(f, "send failed because receiver is gone") + } + } +} + +impl std::error::Error for SendError {} + +impl SendError { + /// Returns `true` if this error is a result of the channel being full. + pub fn is_full(&self) -> bool { + match self.kind { + SendErrorKind::Full => true, + _ => false, + } + } + + /// Returns `true` if this error is a result of the receiver being dropped. + pub fn is_disconnected(&self) -> bool { + match self.kind { + SendErrorKind::Disconnected => true, + _ => false, + } + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.is_full() { + write!(f, "send failed because channel is full") + } else { + write!(f, "send failed because receiver is gone") + } + } +} + +impl<T: core::any::Any> std::error::Error for TrySendError<T> {} + +impl<T> TrySendError<T> { + /// Returns `true` if this error is a result of the channel being full. + pub fn is_full(&self) -> bool { + self.err.is_full() + } + + /// Returns `true` if this error is a result of the receiver being dropped. + pub fn is_disconnected(&self) -> bool { + self.err.is_disconnected() + } + + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.val + } + + /// Drops the message and converts into a `SendError`. + pub fn into_send_error(self) -> SendError { + self.err + } +} + +impl fmt::Debug for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("TryRecvError").finish() + } +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "receiver channel is empty") + } +} + +impl std::error::Error for TryRecvError {} + +#[derive(Debug)] +struct UnboundedInner<T> { + // Internal channel state. Consists of the number of messages stored in the + // channel as well as a flag signalling that the channel is closed. + state: AtomicUsize, + + // Atomic, FIFO queue used to send messages to the receiver + message_queue: Queue<T>, + + // Number of senders in existence + num_senders: AtomicUsize, + + // Handle to the receiver's task. + recv_task: AtomicWaker, +} + +#[derive(Debug)] +struct BoundedInner<T> { + // Max buffer size of the channel. If `None` then the channel is unbounded. + buffer: usize, + + // Internal channel state. Consists of the number of messages stored in the + // channel as well as a flag signalling that the channel is closed. + state: AtomicUsize, + + // Atomic, FIFO queue used to send messages to the receiver + message_queue: Queue<T>, + + // Atomic, FIFO queue used to send parked task handles to the receiver. + parked_queue: Queue<Arc<Mutex<SenderTask>>>, + + // Number of senders in existence + num_senders: AtomicUsize, + + // Handle to the receiver's task. + recv_task: AtomicWaker, +} + +// Struct representation of `Inner::state`. +#[derive(Debug, Clone, Copy)] +struct State { + // `true` when the channel is open + is_open: bool, + + // Number of messages in the channel + num_messages: usize, +} + +// The `is_open` flag is stored in the left-most bit of `Inner::state` +const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1); + +// When a new channel is created, it is created in the open state with no +// pending messages. +const INIT_STATE: usize = OPEN_MASK; + +// The maximum number of messages that a channel can track is `usize::max_value() >> 1` +const MAX_CAPACITY: usize = !(OPEN_MASK); + +// The maximum requested buffer size must be less than the maximum capacity of +// a channel. This is because each sender gets a guaranteed slot. +const MAX_BUFFER: usize = MAX_CAPACITY >> 1; + +// Sent to the consumer to wake up blocked producers +#[derive(Debug)] +struct SenderTask { + task: Option<Waker>, + is_parked: bool, +} + +impl SenderTask { + fn new() -> Self { + Self { task: None, is_parked: false } + } + + fn notify(&mut self) { + self.is_parked = false; + + if let Some(task) = self.task.take() { + task.wake(); + } + } +} + +/// Creates a bounded mpsc channel for communicating between asynchronous tasks. +/// +/// Being bounded, this channel provides backpressure to ensure that the sender +/// outpaces the receiver by only a limited amount. The channel's capacity is +/// equal to `buffer + num-senders`. In other words, each sender gets a +/// guaranteed slot in the channel capacity, and on top of that there are +/// `buffer` "first come, first serve" slots available to all senders. +/// +/// The [`Receiver`](Receiver) returned implements the +/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements +/// `Sink`. +pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { + // Check that the requested buffer size does not exceed the maximum buffer + // size permitted by the system. + assert!(buffer < MAX_BUFFER, "requested buffer size too large"); + + let inner = Arc::new(BoundedInner { + buffer, + state: AtomicUsize::new(INIT_STATE), + message_queue: Queue::new(), + parked_queue: Queue::new(), + num_senders: AtomicUsize::new(1), + recv_task: AtomicWaker::new(), + }); + + let tx = BoundedSenderInner { + inner: inner.clone(), + sender_task: Arc::new(Mutex::new(SenderTask::new())), + maybe_parked: false, + }; + + let rx = Receiver { inner: Some(inner) }; + + (Sender(Some(tx)), rx) +} + +/// Creates an unbounded mpsc channel for communicating between asynchronous +/// tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { + let inner = Arc::new(UnboundedInner { + state: AtomicUsize::new(INIT_STATE), + message_queue: Queue::new(), + num_senders: AtomicUsize::new(1), + recv_task: AtomicWaker::new(), + }); + + let tx = UnboundedSenderInner { inner: inner.clone() }; + + let rx = UnboundedReceiver { inner: Some(inner) }; + + (UnboundedSender(Some(tx)), rx) +} + +/* + * + * ===== impl Sender ===== + * + */ + +impl<T> UnboundedSenderInner<T> { + fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> { + let state = decode_state(self.inner.state.load(SeqCst)); + if state.is_open { + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) + } + } + + // Push message to the queue and signal to the receiver + fn queue_push_and_signal(&self, msg: T) { + // Push the message onto the message queue + self.inner.message_queue.push(msg); + + // Signal to the receiver that a message has been enqueued. If the + // receiver is parked, this will unpark the task. + self.inner.recv_task.wake(); + } + + // Increment the number of queued messages. Returns the resulting number. + fn inc_num_messages(&self) -> Option<usize> { + let mut curr = self.inner.state.load(SeqCst); + + loop { + let mut state = decode_state(curr); + + // The receiver end closed the channel. + if !state.is_open { + return None; + } + + // This probably is never hit? Odds are the process will run out of + // memory first. It may be worth to return something else in this + // case? + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); + + state.num_messages += 1; + + let next = encode_state(&state); + match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => return Some(state.num_messages), + Err(actual) => curr = actual, + } + } + } + + /// Returns whether the senders send to the same receiver. + fn same_receiver(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } + + /// Returns whether the sender send to this receiver. + fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool { + Arc::ptr_eq(&self.inner, inner) + } + + /// Returns pointer to the Arc containing sender + /// + /// The returned pointer is not referenced and should be only used for hashing! + fn ptr(&self) -> *const UnboundedInner<T> { + &*self.inner + } + + /// Returns whether this channel is closed without needing a context. + fn is_closed(&self) -> bool { + !decode_state(self.inner.state.load(SeqCst)).is_open + } + + /// Closes this channel from the sender side, preventing any new messages. + fn close_channel(&self) { + // There's no need to park this sender, its dropping, + // and we don't want to check for capacity, so skip + // that stuff from `do_send`. + + self.inner.set_closed(); + self.inner.recv_task.wake(); + } +} + +impl<T> BoundedSenderInner<T> { + /// Attempts to send a message on this `Sender`, returning the message + /// if there was an error. + fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { + // If the sender is currently blocked, reject the message + if !self.poll_unparked(None).is_ready() { + return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); + } + + // The channel has capacity to accept the message, so send it + self.do_send_b(msg) + } + + // Do the send without failing. + // Can be called only by bounded sender. + fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { + // Anyone calling do_send *should* make sure there is room first, + // but assert here for tests as a sanity check. + debug_assert!(self.poll_unparked(None).is_ready()); + + // First, increment the number of messages contained by the channel. + // This operation will also atomically determine if the sender task + // should be parked. + // + // `None` is returned in the case that the channel has been closed by the + // receiver. This happens when `Receiver::close` is called or the + // receiver is dropped. + let park_self = match self.inc_num_messages() { + Some(num_messages) => { + // Block if the current number of pending messages has exceeded + // the configured buffer size + num_messages > self.inner.buffer + } + None => { + return Err(TrySendError { + err: SendError { kind: SendErrorKind::Disconnected }, + val: msg, + }) + } + }; + + // If the channel has reached capacity, then the sender task needs to + // be parked. This will send the task handle on the parked task queue. + // + // However, when `do_send` is called while dropping the `Sender`, + // `task::current()` can't be called safely. In this case, in order to + // maintain internal consistency, a blank message is pushed onto the + // parked task queue. + if park_self { + self.park(); + } + + self.queue_push_and_signal(msg); + + Ok(()) + } + + // Push message to the queue and signal to the receiver + fn queue_push_and_signal(&self, msg: T) { + // Push the message onto the message queue + self.inner.message_queue.push(msg); + + // Signal to the receiver that a message has been enqueued. If the + // receiver is parked, this will unpark the task. + self.inner.recv_task.wake(); + } + + // Increment the number of queued messages. Returns the resulting number. + fn inc_num_messages(&self) -> Option<usize> { + let mut curr = self.inner.state.load(SeqCst); + + loop { + let mut state = decode_state(curr); + + // The receiver end closed the channel. + if !state.is_open { + return None; + } + + // This probably is never hit? Odds are the process will run out of + // memory first. It may be worth to return something else in this + // case? + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); + + state.num_messages += 1; + + let next = encode_state(&state); + match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => return Some(state.num_messages), + Err(actual) => curr = actual, + } + } + } + + fn park(&mut self) { + { + let mut sender = self.sender_task.lock().unwrap(); + sender.task = None; + sender.is_parked = true; + } + + // Send handle over queue + let t = self.sender_task.clone(); + self.inner.parked_queue.push(t); + + // Check to make sure we weren't closed after we sent our task on the + // queue + let state = decode_state(self.inner.state.load(SeqCst)); + self.maybe_parked = state.is_open; + } + + /// Polls the channel to determine if there is guaranteed capacity to send + /// at least one item without waiting. + /// + /// # Return value + /// + /// This method returns: + /// + /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; + /// - `Poll::Pending` if the channel may not have + /// capacity, in which case the current task is queued to be notified once + /// capacity is available; + /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let state = decode_state(self.inner.state.load(SeqCst)); + if !state.is_open { + return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); + } + + self.poll_unparked(Some(cx)).map(Ok) + } + + /// Returns whether the senders send to the same receiver. + fn same_receiver(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } + + /// Returns whether the sender send to this receiver. + fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool { + Arc::ptr_eq(&self.inner, receiver) + } + + /// Returns pointer to the Arc containing sender + /// + /// The returned pointer is not referenced and should be only used for hashing! + fn ptr(&self) -> *const BoundedInner<T> { + &*self.inner + } + + /// Returns whether this channel is closed without needing a context. + fn is_closed(&self) -> bool { + !decode_state(self.inner.state.load(SeqCst)).is_open + } + + /// Closes this channel from the sender side, preventing any new messages. + fn close_channel(&self) { + // There's no need to park this sender, its dropping, + // and we don't want to check for capacity, so skip + // that stuff from `do_send`. + + self.inner.set_closed(); + self.inner.recv_task.wake(); + } + + fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> { + // First check the `maybe_parked` variable. This avoids acquiring the + // lock in most cases + if self.maybe_parked { + // Get a lock on the task handle + let mut task = self.sender_task.lock().unwrap(); + + if !task.is_parked { + self.maybe_parked = false; + return Poll::Ready(()); + } + + // At this point, an unpark request is pending, so there will be an + // unpark sometime in the future. We just need to make sure that + // the correct task will be notified. + // + // Update the task in case the `Sender` has been moved to another + // task + task.task = cx.map(|cx| cx.waker().clone()); + + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +impl<T> Sender<T> { + /// Attempts to send a message on this `Sender`, returning the message + /// if there was an error. + pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { + if let Some(inner) = &mut self.0 { + inner.try_send(msg) + } else { + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) + } + } + + /// Send a message on the channel. + /// + /// This function should only be called after + /// [`poll_ready`](Sender::poll_ready) has reported that the channel is + /// ready to receive a message. + pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { + self.try_send(msg).map_err(|e| e.err) + } + + /// Polls the channel to determine if there is guaranteed capacity to send + /// at least one item without waiting. + /// + /// # Return value + /// + /// This method returns: + /// + /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; + /// - `Poll::Pending` if the channel may not have + /// capacity, in which case the current task is queued to be notified once + /// capacity is available; + /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; + inner.poll_ready(cx) + } + + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true) + } + + /// Closes this channel from the sender side, preventing any new messages. + pub fn close_channel(&mut self) { + if let Some(inner) = &mut self.0 { + inner.close_channel(); + } + } + + /// Disconnects this sender from the channel, closing it if there are no more senders left. + pub fn disconnect(&mut self) { + self.0 = None; + } + + /// Returns whether the senders send to the same receiver. + pub fn same_receiver(&self, other: &Self) -> bool { + match (&self.0, &other.0) { + (Some(inner), Some(other)) => inner.same_receiver(other), + _ => false, + } + } + + /// Returns whether the sender send to this receiver. + pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { + match (&self.0, &receiver.inner) { + (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), + _ => false, + } + } + + /// Hashes the receiver into the provided hasher + pub fn hash_receiver<H>(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { + use std::hash::Hash; + + let ptr = self.0.as_ref().map(|inner| inner.ptr()); + ptr.hash(hasher); + } +} + +impl<T> UnboundedSender<T> { + /// Check if the channel is ready to receive a message. + pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; + inner.poll_ready_nb() + } + + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true) + } + + /// Closes this channel from the sender side, preventing any new messages. + pub fn close_channel(&self) { + if let Some(inner) = &self.0 { + inner.close_channel(); + } + } + + /// Disconnects this sender from the channel, closing it if there are no more senders left. + pub fn disconnect(&mut self) { + self.0 = None; + } + + // Do the send without parking current task. + fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> { + if let Some(inner) = &self.0 { + if inner.inc_num_messages().is_some() { + inner.queue_push_and_signal(msg); + return Ok(()); + } + } + + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) + } + + /// Send a message on the channel. + /// + /// This method should only be called after `poll_ready` has been used to + /// verify that the channel is ready to receive a message. + pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { + self.do_send_nb(msg).map_err(|e| e.err) + } + + /// Sends a message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> { + self.do_send_nb(msg) + } + + /// Returns whether the senders send to the same receiver. + pub fn same_receiver(&self, other: &Self) -> bool { + match (&self.0, &other.0) { + (Some(inner), Some(other)) => inner.same_receiver(other), + _ => false, + } + } + + /// Returns whether the sender send to this receiver. + pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool { + match (&self.0, &receiver.inner) { + (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), + _ => false, + } + } + + /// Hashes the receiver into the provided hasher + pub fn hash_receiver<H>(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { + use std::hash::Hash; + + let ptr = self.0.as_ref().map(|inner| inner.ptr()); + ptr.hash(hasher); + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl<T> Clone for UnboundedSender<T> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl<T> Clone for UnboundedSenderInner<T> { + fn clone(&self) -> Self { + // Since this atomic op isn't actually guarding any memory and we don't + // care about any orderings besides the ordering on the single atomic + // variable, a relaxed ordering is acceptable. + let mut curr = self.inner.num_senders.load(SeqCst); + + loop { + // If the maximum number of senders has been reached, then fail + if curr == MAX_BUFFER { + panic!("cannot clone `Sender` -- too many outstanding senders"); + } + + debug_assert!(curr < MAX_BUFFER); + + let next = curr + 1; + match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => { + // The ABA problem doesn't matter here. We only care that the + // number of senders never exceeds the maximum. + return Self { inner: self.inner.clone() }; + } + Err(actual) => curr = actual, + } + } + } +} + +impl<T> Clone for BoundedSenderInner<T> { + fn clone(&self) -> Self { + // Since this atomic op isn't actually guarding any memory and we don't + // care about any orderings besides the ordering on the single atomic + // variable, a relaxed ordering is acceptable. + let mut curr = self.inner.num_senders.load(SeqCst); + + loop { + // If the maximum number of senders has been reached, then fail + if curr == self.inner.max_senders() { + panic!("cannot clone `Sender` -- too many outstanding senders"); + } + + debug_assert!(curr < self.inner.max_senders()); + + let next = curr + 1; + match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => { + // The ABA problem doesn't matter here. We only care that the + // number of senders never exceeds the maximum. + return Self { + inner: self.inner.clone(), + sender_task: Arc::new(Mutex::new(SenderTask::new())), + maybe_parked: false, + }; + } + Err(actual) => curr = actual, + } + } + } +} + +impl<T> Drop for UnboundedSenderInner<T> { + fn drop(&mut self) { + // Ordering between variables don't matter here + let prev = self.inner.num_senders.fetch_sub(1, SeqCst); + + if prev == 1 { + self.close_channel(); + } + } +} + +impl<T> Drop for BoundedSenderInner<T> { + fn drop(&mut self) { + // Ordering between variables don't matter here + let prev = self.inner.num_senders.fetch_sub(1, SeqCst); + + if prev == 1 { + self.close_channel(); + } + } +} + +/* + * + * ===== impl Receiver ===== + * + */ + +impl<T> Receiver<T> { + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + if let Some(inner) = &mut self.inner { + inner.set_closed(); + + // Wake up any threads waiting as they'll see that we've closed the + // channel and will continue on their merry way. + while let Some(task) = unsafe { inner.parked_queue.pop_spin() } { + task.lock().unwrap().notify(); + } + } + } + + /// Tries to receive the next message without notifying a context if empty. + /// + /// It is not recommended to call this function from inside of a future, + /// only when you've otherwise arranged to be notified when the channel is + /// no longer empty. + /// + /// This function returns: + /// * `Ok(Some(t))` when message is fetched + /// * `Ok(None)` when channel is closed and no messages left in the queue + /// * `Err(e)` when there are no messages available, but channel is not yet closed + pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { + match self.next_message() { + Poll::Ready(msg) => Ok(msg), + Poll::Pending => Err(TryRecvError { _priv: () }), + } + } + + fn next_message(&mut self) -> Poll<Option<T>> { + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; + // Pop off a message + match unsafe { inner.message_queue.pop_spin() } { + Some(msg) => { + // If there are any parked task handles in the parked queue, + // pop one and unpark it. + self.unpark_one(); + + // Decrement number of messages + self.dec_num_messages(); + + Poll::Ready(Some(msg)) + } + None => { + let state = decode_state(inner.state.load(SeqCst)); + if state.is_closed() { + // If closed flag is set AND there are no pending messages + // it means end of stream + self.inner = None; + Poll::Ready(None) + } else { + // If queue is open, we need to return Pending + // to be woken up when new messages arrive. + // If queue is closed but num_messages is non-zero, + // it means that senders updated the state, + // but didn't put message to queue yet, + // so we need to park until sender unparks the task + // after queueing the message. + Poll::Pending + } + } + } + } + + // Unpark a single task handle if there is one pending in the parked queue + fn unpark_one(&mut self) { + if let Some(inner) = &mut self.inner { + if let Some(task) = unsafe { inner.parked_queue.pop_spin() } { + task.lock().unwrap().notify(); + } + } + } + + fn dec_num_messages(&self) { + if let Some(inner) = &self.inner { + // OPEN_MASK is highest bit, so it's unaffected by subtraction + // unless there's underflow, and we know there's no underflow + // because number of messages at this point is always > 0. + inner.state.fetch_sub(1, SeqCst); + } + } +} + +// The receiver does not ever take a Pin to the inner T +impl<T> Unpin for Receiver<T> {} + +impl<T> FusedStream for Receiver<T> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<T> Stream for Receiver<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + // Try to read a message off of the message queue. + match self.next_message() { + Poll::Ready(msg) => { + if msg.is_none() { + self.inner = None; + } + Poll::Ready(msg) + } + Poll::Pending => { + // There are no messages to read, in this case, park. + self.inner.as_ref().unwrap().recv_task.register(cx.waker()); + // Check queue again after parking to prevent race condition: + // a message could be added to the queue after previous `next_message` + // before `register` call. + self.next_message() + } + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + // Drain the channel of all pending messages + self.close(); + if self.inner.is_some() { + loop { + match self.next_message() { + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => break, + Poll::Pending => { + let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); + + // If the channel is closed, then there is no need to park. + if state.is_closed() { + break; + } + + // TODO: Spinning isn't ideal, it might be worth + // investigating using a condvar or some other strategy + // here. That said, if this case is hit, then another thread + // is about to push the value into the queue and this isn't + // the only spinlock in the impl right now. + thread::yield_now(); + } + } + } + } + } +} + +impl<T> UnboundedReceiver<T> { + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + if let Some(inner) = &mut self.inner { + inner.set_closed(); + } + } + + /// Tries to receive the next message without notifying a context if empty. + /// + /// It is not recommended to call this function from inside of a future, + /// only when you've otherwise arranged to be notified when the channel is + /// no longer empty. + /// + /// This function returns: + /// * `Ok(Some(t))` when message is fetched + /// * `Ok(None)` when channel is closed and no messages left in the queue + /// * `Err(e)` when there are no messages available, but channel is not yet closed + pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { + match self.next_message() { + Poll::Ready(msg) => Ok(msg), + Poll::Pending => Err(TryRecvError { _priv: () }), + } + } + + fn next_message(&mut self) -> Poll<Option<T>> { + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; + // Pop off a message + match unsafe { inner.message_queue.pop_spin() } { + Some(msg) => { + // Decrement number of messages + self.dec_num_messages(); + + Poll::Ready(Some(msg)) + } + None => { + let state = decode_state(inner.state.load(SeqCst)); + if state.is_closed() { + // If closed flag is set AND there are no pending messages + // it means end of stream + self.inner = None; + Poll::Ready(None) + } else { + // If queue is open, we need to return Pending + // to be woken up when new messages arrive. + // If queue is closed but num_messages is non-zero, + // it means that senders updated the state, + // but didn't put message to queue yet, + // so we need to park until sender unparks the task + // after queueing the message. + Poll::Pending + } + } + } + } + + fn dec_num_messages(&self) { + if let Some(inner) = &self.inner { + // OPEN_MASK is highest bit, so it's unaffected by subtraction + // unless there's underflow, and we know there's no underflow + // because number of messages at this point is always > 0. + inner.state.fetch_sub(1, SeqCst); + } + } +} + +impl<T> FusedStream for UnboundedReceiver<T> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<T> Stream for UnboundedReceiver<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + // Try to read a message off of the message queue. + match self.next_message() { + Poll::Ready(msg) => { + if msg.is_none() { + self.inner = None; + } + Poll::Ready(msg) + } + Poll::Pending => { + // There are no messages to read, in this case, park. + self.inner.as_ref().unwrap().recv_task.register(cx.waker()); + // Check queue again after parking to prevent race condition: + // a message could be added to the queue after previous `next_message` + // before `register` call. + self.next_message() + } + } + } +} + +impl<T> Drop for UnboundedReceiver<T> { + fn drop(&mut self) { + // Drain the channel of all pending messages + self.close(); + if self.inner.is_some() { + loop { + match self.next_message() { + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => break, + Poll::Pending => { + let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); + + // If the channel is closed, then there is no need to park. + if state.is_closed() { + break; + } + + // TODO: Spinning isn't ideal, it might be worth + // investigating using a condvar or some other strategy + // here. That said, if this case is hit, then another thread + // is about to push the value into the queue and this isn't + // the only spinlock in the impl right now. + thread::yield_now(); + } + } + } + } + } +} + +/* + * + * ===== impl Inner ===== + * + */ + +impl<T> UnboundedInner<T> { + // Clear `open` flag in the state, keep `num_messages` intact. + fn set_closed(&self) { + let curr = self.state.load(SeqCst); + if !decode_state(curr).is_open { + return; + } + + self.state.fetch_and(!OPEN_MASK, SeqCst); + } +} + +impl<T> BoundedInner<T> { + // The return value is such that the total number of messages that can be + // enqueued into the channel will never exceed MAX_CAPACITY + fn max_senders(&self) -> usize { + MAX_CAPACITY - self.buffer + } + + // Clear `open` flag in the state, keep `num_messages` intact. + fn set_closed(&self) { + let curr = self.state.load(SeqCst); + if !decode_state(curr).is_open { + return; + } + + self.state.fetch_and(!OPEN_MASK, SeqCst); + } +} + +unsafe impl<T: Send> Send for UnboundedInner<T> {} +unsafe impl<T: Send> Sync for UnboundedInner<T> {} + +unsafe impl<T: Send> Send for BoundedInner<T> {} +unsafe impl<T: Send> Sync for BoundedInner<T> {} + +impl State { + fn is_closed(&self) -> bool { + !self.is_open && self.num_messages == 0 + } +} + +/* + * + * ===== Helpers ===== + * + */ + +fn decode_state(num: usize) -> State { + State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } +} + +fn encode_state(state: &State) -> usize { + let mut num = state.num_messages; + + if state.is_open { + num |= OPEN_MASK; + } + + num +} diff --git a/third_party/rust/futures-channel/src/mpsc/queue.rs b/third_party/rust/futures-channel/src/mpsc/queue.rs new file mode 100644 index 0000000000..57dc7f5654 --- /dev/null +++ b/third_party/rust/futures-channel/src/mpsc/queue.rs @@ -0,0 +1,176 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue for sending +//! messages between asynchronous tasks. +//! +//! The queue implementation is essentially the same one used for mpsc channels +//! in the standard library. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue may not be appropriate for all use-cases. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + +// NOTE: this implementation is lifted from the standard library and only +// slightly modified + +pub(super) use self::PopResult::*; + +use std::cell::UnsafeCell; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::thread; + +/// A result of the `pop` function. +pub(super) enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} + +#[derive(Debug)] +struct Node<T> { + next: AtomicPtr<Self>, + value: Option<T>, +} + +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +#[derive(Debug)] +pub(super) struct Queue<T> { + head: AtomicPtr<Node<T>>, + tail: UnsafeCell<*mut Node<T>>, +} + +unsafe impl<T: Send> Send for Queue<T> {} +unsafe impl<T: Send> Sync for Queue<T> {} + +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Self { + Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v })) + } +} + +impl<T> Queue<T> { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub(super) fn new() -> Self { + let stub = unsafe { Node::new(None) }; + Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } + } + + /// Pushes a new value onto this queue. + pub(super) fn push(&self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, Ordering::AcqRel); + (*prev).next.store(n, Ordering::Release); + } + } + + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option<T>`. It is possible for this queue to be in an + /// inconsistent state where many pushes have succeeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is preempted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + /// + /// This function is unsafe because only one thread can call it at a time. + pub(super) unsafe fn pop(&self) -> PopResult<T> { + let tail = *self.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + + if !next.is_null() { + *self.tail.get() = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take().unwrap(); + drop(Box::from_raw(tail)); + return Data(ret); + } + + if self.head.load(Ordering::Acquire) == tail { + Empty + } else { + Inconsistent + } + } + + /// Pop an element similarly to `pop` function, but spin-wait on inconsistent + /// queue state instead of returning `Inconsistent`. + /// + /// This function is unsafe because only one thread can call it at a time. + pub(super) unsafe fn pop_spin(&self) -> Option<T> { + loop { + match self.pop() { + Empty => return None, + Data(t) => return Some(t), + // Inconsistent means that there will be a message to pop + // in a short time. This branch can only be reached if + // values are being produced from another thread, so there + // are a few ways that we can deal with this: + // + // 1) Spin + // 2) thread::yield_now() + // 3) task::current().unwrap() & return Pending + // + // For now, thread::yield_now() is used, but it would + // probably be better to spin a few times then yield. + Inconsistent => { + thread::yield_now(); + } + } + } + } +} + +impl<T> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let mut cur = *self.tail.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + drop(Box::from_raw(cur)); + cur = next; + } + } + } +} diff --git a/third_party/rust/futures-channel/src/mpsc/sink_impl.rs b/third_party/rust/futures-channel/src/mpsc/sink_impl.rs new file mode 100644 index 0000000000..1be20162c2 --- /dev/null +++ b/third_party/rust/futures-channel/src/mpsc/sink_impl.rs @@ -0,0 +1,73 @@ +use super::{SendError, Sender, TrySendError, UnboundedSender}; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use std::pin::Pin; + +impl<T> Sink<T> for Sender<T> { + type Error = SendError; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + (*self).poll_ready(cx) + } + + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + (*self).start_send(msg) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match (*self).poll_ready(cx) { + Poll::Ready(Err(ref e)) if e.is_disconnected() => { + // If the receiver disconnected, we consider the sink to be flushed. + Poll::Ready(Ok(())) + } + x => x, + } + } + + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.disconnect(); + Poll::Ready(Ok(())) + } +} + +impl<T> Sink<T> for UnboundedSender<T> { + type Error = SendError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Self::poll_ready(&*self, cx) + } + + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + Self::start_send(&mut *self, msg) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.disconnect(); + Poll::Ready(Ok(())) + } +} + +impl<T> Sink<T> for &UnboundedSender<T> { + type Error = SendError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + UnboundedSender::poll_ready(*self, cx) + } + + fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.unbounded_send(msg).map_err(TrySendError::into_send_error) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.close_channel(); + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-channel/src/oneshot.rs b/third_party/rust/futures-channel/src/oneshot.rs new file mode 100644 index 0000000000..5af651b913 --- /dev/null +++ b/third_party/rust/futures-channel/src/oneshot.rs @@ -0,0 +1,488 @@ +//! A channel for sending a single message between asynchronous tasks. +//! +//! This is a single-producer, single-consumer channel. + +use alloc::sync::Arc; +use core::fmt; +use core::pin::Pin; +use core::sync::atomic::AtomicBool; +use core::sync::atomic::Ordering::SeqCst; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll, Waker}; + +use crate::lock::Lock; + +/// A future for a value that will be provided by another asynchronous task. +/// +/// This is created by the [`channel`](channel) function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Receiver<T> { + inner: Arc<Inner<T>>, +} + +/// A means of transmitting a single value to another task. +/// +/// This is created by the [`channel`](channel) function. +pub struct Sender<T> { + inner: Arc<Inner<T>>, +} + +// The channels do not ever project Pin to the inner T +impl<T> Unpin for Receiver<T> {} +impl<T> Unpin for Sender<T> {} + +/// Internal state of the `Receiver`/`Sender` pair above. This is all used as +/// the internal synchronization between the two for send/recv operations. +struct Inner<T> { + /// Indicates whether this oneshot is complete yet. This is filled in both + /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it + /// appropriately. + /// + /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is + /// unlocked and ready to be inspected. + /// + /// For `Sender` if this is `true` then the oneshot has gone away and it + /// can return ready from `poll_canceled`. + complete: AtomicBool, + + /// The actual data being transferred as part of this `Receiver`. This is + /// filled in by `Sender::complete` and read by `Receiver::poll`. + /// + /// Note that this is protected by `Lock`, but it is in theory safe to + /// replace with an `UnsafeCell` as it's actually protected by `complete` + /// above. I wouldn't recommend doing this, however, unless someone is + /// supremely confident in the various atomic orderings here and there. + data: Lock<Option<T>>, + + /// Field to store the task which is blocked in `Receiver::poll`. + /// + /// This is filled in when a oneshot is polled but not ready yet. Note that + /// the `Lock` here, unlike in `data` above, is important to resolve races. + /// Both the `Receiver` and the `Sender` halves understand that if they + /// can't acquire the lock then some important interference is happening. + rx_task: Lock<Option<Waker>>, + + /// Like `rx_task` above, except for the task blocked in + /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`. + tx_task: Lock<Option<Waker>>, +} + +/// Creates a new one-shot channel for sending a single value across asynchronous tasks. +/// +/// The channel works for a spsc (single-producer, single-consumer) scheme. +/// +/// This function is similar to Rust's channel constructor found in the standard +/// library. Two halves are returned, the first of which is a `Sender` handle, +/// used to signal the end of a computation and provide its value. The second +/// half is a `Receiver` which implements the `Future` trait, resolving to the +/// value that was given to the `Sender` handle. +/// +/// Each half can be separately owned and sent across tasks. +/// +/// # Examples +/// +/// ``` +/// use futures::channel::oneshot; +/// use std::{thread, time::Duration}; +/// +/// let (sender, receiver) = oneshot::channel::<i32>(); +/// +/// thread::spawn(|| { +/// println!("THREAD: sleeping zzz..."); +/// thread::sleep(Duration::from_millis(1000)); +/// println!("THREAD: i'm awake! sending."); +/// sender.send(3).unwrap(); +/// }); +/// +/// println!("MAIN: doing some useful stuff"); +/// +/// futures::executor::block_on(async { +/// println!("MAIN: waiting for msg..."); +/// println!("MAIN: got: {:?}", receiver.await) +/// }); +/// ``` +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let inner = Arc::new(Inner::new()); + let receiver = Receiver { inner: inner.clone() }; + let sender = Sender { inner }; + (sender, receiver) +} + +impl<T> Inner<T> { + fn new() -> Self { + Self { + complete: AtomicBool::new(false), + data: Lock::new(None), + rx_task: Lock::new(None), + tx_task: Lock::new(None), + } + } + + fn send(&self, t: T) -> Result<(), T> { + if self.complete.load(SeqCst) { + return Err(t); + } + + // Note that this lock acquisition may fail if the receiver + // is closed and sets the `complete` flag to `true`, whereupon + // the receiver may call `poll()`. + if let Some(mut slot) = self.data.try_lock() { + assert!(slot.is_none()); + *slot = Some(t); + drop(slot); + + // If the receiver called `close()` between the check at the + // start of the function, and the lock being released, then + // the receiver may not be around to receive it, so try to + // pull it back out. + if self.complete.load(SeqCst) { + // If lock acquisition fails, then receiver is actually + // receiving it, so we're good. + if let Some(mut slot) = self.data.try_lock() { + if let Some(t) = slot.take() { + return Err(t); + } + } + } + Ok(()) + } else { + // Must have been closed + Err(t) + } + } + + fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> { + // Fast path up first, just read the flag and see if our other half is + // gone. This flag is set both in our destructor and the oneshot + // destructor, but our destructor hasn't run yet so if it's set then the + // oneshot is gone. + if self.complete.load(SeqCst) { + return Poll::Ready(()); + } + + // If our other half is not gone then we need to park our current task + // and move it into the `tx_task` slot to get notified when it's + // actually gone. + // + // If `try_lock` fails, then the `Receiver` is in the process of using + // it, so we can deduce that it's now in the process of going away and + // hence we're canceled. If it succeeds then we just store our handle. + // + // Crucially we then check `complete` *again* before we return. + // While we were storing our handle inside `tx_task` the + // `Receiver` may have been dropped. The first thing it does is set the + // flag, and if it fails to acquire the lock it assumes that we'll see + // the flag later on. So... we then try to see the flag later on! + let handle = cx.waker().clone(); + match self.tx_task.try_lock() { + Some(mut p) => *p = Some(handle), + None => return Poll::Ready(()), + } + if self.complete.load(SeqCst) { + Poll::Ready(()) + } else { + Poll::Pending + } + } + + fn is_canceled(&self) -> bool { + self.complete.load(SeqCst) + } + + fn drop_tx(&self) { + // Flag that we're a completed `Sender` and try to wake up a receiver. + // Whether or not we actually stored any data will get picked up and + // translated to either an item or cancellation. + // + // Note that if we fail to acquire the `rx_task` lock then that means + // we're in one of two situations: + // + // 1. The receiver is trying to block in `poll` + // 2. The receiver is being dropped + // + // In the first case it'll check the `complete` flag after it's done + // blocking to see if it succeeded. In the latter case we don't need to + // wake up anyone anyway. So in both cases it's ok to ignore the `None` + // case of `try_lock` and bail out. + // + // The first case crucially depends on `Lock` using `SeqCst` ordering + // under the hood. If it instead used `Release` / `Acquire` ordering, + // then it would not necessarily synchronize with `inner.complete` + // and deadlock might be possible, as was observed in + // https://github.com/rust-lang/futures-rs/pull/219. + self.complete.store(true, SeqCst); + + if let Some(mut slot) = self.rx_task.try_lock() { + if let Some(task) = slot.take() { + drop(slot); + task.wake(); + } + } + + // If we registered a task for cancel notification drop it to reduce + // spurious wakeups + if let Some(mut slot) = self.tx_task.try_lock() { + drop(slot.take()); + } + } + + fn close_rx(&self) { + // Flag our completion and then attempt to wake up the sender if it's + // blocked. See comments in `drop` below for more info + self.complete.store(true, SeqCst); + if let Some(mut handle) = self.tx_task.try_lock() { + if let Some(task) = handle.take() { + drop(handle); + task.wake() + } + } + } + + fn try_recv(&self) -> Result<Option<T>, Canceled> { + // If we're complete, either `::close_rx` or `::drop_tx` was called. + // We can assume a successful send if data is present. + if self.complete.load(SeqCst) { + if let Some(mut slot) = self.data.try_lock() { + if let Some(data) = slot.take() { + return Ok(Some(data)); + } + } + Err(Canceled) + } else { + Ok(None) + } + } + + fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { + // Check to see if some data has arrived. If it hasn't then we need to + // block our task. + // + // Note that the acquisition of the `rx_task` lock might fail below, but + // the only situation where this can happen is during `Sender::drop` + // when we are indeed completed already. If that's happening then we + // know we're completed so keep going. + let done = if self.complete.load(SeqCst) { + true + } else { + let task = cx.waker().clone(); + match self.rx_task.try_lock() { + Some(mut slot) => { + *slot = Some(task); + false + } + None => true, + } + }; + + // If we're `done` via one of the paths above, then look at the data and + // figure out what the answer is. If, however, we stored `rx_task` + // successfully above we need to check again if we're completed in case + // a message was sent while `rx_task` was locked and couldn't notify us + // otherwise. + // + // If we're not done, and we're not complete, though, then we've + // successfully blocked our task and we return `Pending`. + if done || self.complete.load(SeqCst) { + // If taking the lock fails, the sender will realise that the we're + // `done` when it checks the `complete` flag on the way out, and + // will treat the send as a failure. + if let Some(mut slot) = self.data.try_lock() { + if let Some(data) = slot.take() { + return Poll::Ready(Ok(data)); + } + } + Poll::Ready(Err(Canceled)) + } else { + Poll::Pending + } + } + + fn drop_rx(&self) { + // Indicate to the `Sender` that we're done, so any future calls to + // `poll_canceled` are weeded out. + self.complete.store(true, SeqCst); + + // If we've blocked a task then there's no need for it to stick around, + // so we need to drop it. If this lock acquisition fails, though, then + // it's just because our `Sender` is trying to take the task, so we + // let them take care of that. + if let Some(mut slot) = self.rx_task.try_lock() { + let task = slot.take(); + drop(slot); + drop(task); + } + + // Finally, if our `Sender` wants to get notified of us going away, it + // would have stored something in `tx_task`. Here we try to peel that + // out and unpark it. + // + // Note that the `try_lock` here may fail, but only if the `Sender` is + // in the process of filling in the task. If that happens then we + // already flagged `complete` and they'll pick that up above. + if let Some(mut handle) = self.tx_task.try_lock() { + if let Some(task) = handle.take() { + drop(handle); + task.wake() + } + } + } +} + +impl<T> Sender<T> { + /// Completes this oneshot with a successful result. + /// + /// This function will consume `self` and indicate to the other end, the + /// [`Receiver`](Receiver), that the value provided is the result of the + /// computation this represents. + /// + /// If the value is successfully enqueued for the remote end to receive, + /// then `Ok(())` is returned. If the receiving end was dropped before + /// this function was called, however, then `Err(t)` is returned. + pub fn send(self, t: T) -> Result<(), T> { + self.inner.send(t) + } + + /// Polls this `Sender` half to detect whether its associated + /// [`Receiver`](Receiver) has been dropped. + /// + /// # Return values + /// + /// If `Ready(())` is returned then the associated `Receiver` has been + /// dropped, which means any work required for sending should be canceled. + /// + /// If `Pending` is returned then the associated `Receiver` is still + /// alive and may be able to receive a message if sent. The current task, + /// however, is scheduled to receive a notification if the corresponding + /// `Receiver` goes away. + pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { + self.inner.poll_canceled(cx) + } + + /// Creates a future that resolves when this `Sender`'s corresponding + /// [`Receiver`](Receiver) half has hung up. + /// + /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled) + /// to expose a [`Future`](core::future::Future). + pub fn cancellation(&mut self) -> Cancellation<'_, T> { + Cancellation { inner: self } + } + + /// Tests to see whether this `Sender`'s corresponding `Receiver` + /// has been dropped. + /// + /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not + /// enqueue a task for wakeup upon cancellation, but merely reports the + /// current state, which may be subject to concurrent modification. + pub fn is_canceled(&self) -> bool { + self.inner.is_canceled() + } + + /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether + /// they were created by the same call to `channel`. + pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { + Arc::ptr_eq(&self.inner, &receiver.inner) + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + self.inner.drop_tx() + } +} + +impl<T: fmt::Debug> fmt::Debug for Sender<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Sender").field("complete", &self.inner.complete).finish() + } +} + +/// A future that resolves when the receiving end of a channel has hung up. +/// +/// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct Cancellation<'a, T> { + inner: &'a mut Sender<T>, +} + +impl<T> Future for Cancellation<'_, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + self.inner.poll_canceled(cx) + } +} + +/// Error returned from a [`Receiver`](Receiver) when the corresponding +/// [`Sender`](Sender) is dropped. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct Canceled; + +impl fmt::Display for Canceled { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "oneshot canceled") + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Canceled {} + +impl<T> Receiver<T> { + /// Gracefully close this receiver, preventing any subsequent attempts to + /// send to it. + /// + /// Any `send` operation which happens after this method returns is + /// guaranteed to fail. After calling this method, you can use + /// [`Receiver::poll`](core::future::Future::poll) to determine whether a + /// message had previously been sent. + pub fn close(&mut self) { + self.inner.close_rx() + } + + /// Attempts to receive a message outside of the context of a task. + /// + /// Does not schedule a task wakeup or have any other side effects. + /// + /// A return value of `None` must be considered immediately stale (out of + /// date) unless [`close`](Receiver::close) has been called first. + /// + /// Returns an error if the sender was dropped. + pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> { + self.inner.try_recv() + } +} + +impl<T> Future for Receiver<T> { + type Output = Result<T, Canceled>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { + self.inner.recv(cx) + } +} + +impl<T> FusedFuture for Receiver<T> { + fn is_terminated(&self) -> bool { + if self.inner.complete.load(SeqCst) { + if let Some(slot) = self.inner.data.try_lock() { + if slot.is_some() { + return false; + } + } + true + } else { + false + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + self.inner.drop_rx() + } +} + +impl<T: fmt::Debug> fmt::Debug for Receiver<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Receiver").field("complete", &self.inner.complete).finish() + } +} diff --git a/third_party/rust/futures-channel/tests/channel.rs b/third_party/rust/futures-channel/tests/channel.rs new file mode 100644 index 0000000000..5f01a8ef4c --- /dev/null +++ b/third_party/rust/futures-channel/tests/channel.rs @@ -0,0 +1,66 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::poll_fn; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; + +#[test] +fn sequence() { + let (tx, rx) = mpsc::channel(1); + + let amt = 20; + let t = thread::spawn(move || block_on(send_sequence(amt, tx))); + let list: Vec<_> = block_on(rx.collect()); + let mut list = list.into_iter(); + for i in (1..=amt).rev() { + assert_eq!(list.next(), Some(i)); + } + assert_eq!(list.next(), None); + + t.join().unwrap(); +} + +async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) { + for x in 0..n { + sender.send(n - x).await.unwrap(); + } +} + +#[test] +fn drop_sender() { + let (tx, mut rx) = mpsc::channel::<u32>(1); + drop(tx); + let f = poll_fn(|cx| rx.poll_next_unpin(cx)); + assert_eq!(block_on(f), None) +} + +#[test] +fn drop_rx() { + let (mut tx, rx) = mpsc::channel::<u32>(1); + block_on(tx.send(1)).unwrap(); + drop(rx); + assert!(block_on(tx.send(1)).is_err()); +} + +#[test] +fn drop_order() { + static DROPS: AtomicUsize = AtomicUsize::new(0); + let (mut tx, rx) = mpsc::channel(1); + + struct A; + + impl Drop for A { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + block_on(tx.send(A)).unwrap(); + assert_eq!(DROPS.load(Ordering::SeqCst), 0); + drop(rx); + assert_eq!(DROPS.load(Ordering::SeqCst), 1); + assert!(block_on(tx.send(A)).is_err()); + assert_eq!(DROPS.load(Ordering::SeqCst), 2); +} diff --git a/third_party/rust/futures-channel/tests/mpsc-close.rs b/third_party/rust/futures-channel/tests/mpsc-close.rs new file mode 100644 index 0000000000..1a14067eca --- /dev/null +++ b/third_party/rust/futures-channel/tests/mpsc-close.rs @@ -0,0 +1,299 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::Future; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; +use std::pin::Pin; +use std::sync::{Arc, Weak}; +use std::thread; +use std::time::{Duration, Instant}; + +#[test] +fn smoke() { + let (mut sender, receiver) = mpsc::channel(1); + + let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {}); + + // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join. + block_on(receiver.take(3).for_each(|_| futures::future::ready(()))); + + t.join().unwrap() +} + +#[test] +fn multiple_senders_disconnect() { + { + let (mut tx1, mut rx) = mpsc::channel(1); + let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone()); + + // disconnect, dropping and Sink::poll_close should all close this sender but leave the + // channel open for other senders + tx1.disconnect(); + drop(tx2); + block_on(tx3.close()).unwrap(); + + assert!(tx1.is_closed()); + assert!(tx3.is_closed()); + assert!(!tx4.is_closed()); + + block_on(tx4.send(5)).unwrap(); + assert_eq!(block_on(rx.next()), Some(5)); + + // dropping the final sender will close the channel + drop(tx4); + assert_eq!(block_on(rx.next()), None); + } + + { + let (mut tx1, mut rx) = mpsc::unbounded(); + let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone()); + + // disconnect, dropping and Sink::poll_close should all close this sender but leave the + // channel open for other senders + tx1.disconnect(); + drop(tx2); + block_on(tx3.close()).unwrap(); + + assert!(tx1.is_closed()); + assert!(tx3.is_closed()); + assert!(!tx4.is_closed()); + + block_on(tx4.send(5)).unwrap(); + assert_eq!(block_on(rx.next()), Some(5)); + + // dropping the final sender will close the channel + drop(tx4); + assert_eq!(block_on(rx.next()), None); + } +} + +#[test] +fn multiple_senders_close_channel() { + { + let (mut tx1, mut rx) = mpsc::channel(1); + let mut tx2 = tx1.clone(); + + // close_channel should shut down the whole channel + tx1.close_channel(); + + assert!(tx1.is_closed()); + assert!(tx2.is_closed()); + + let err = block_on(tx2.send(5)).unwrap_err(); + assert!(err.is_disconnected()); + + assert_eq!(block_on(rx.next()), None); + } + + { + let (tx1, mut rx) = mpsc::unbounded(); + let mut tx2 = tx1.clone(); + + // close_channel should shut down the whole channel + tx1.close_channel(); + + assert!(tx1.is_closed()); + assert!(tx2.is_closed()); + + let err = block_on(tx2.send(5)).unwrap_err(); + assert!(err.is_disconnected()); + + assert_eq!(block_on(rx.next()), None); + } +} + +#[test] +fn single_receiver_drop_closes_channel_and_drains() { + { + let ref_count = Arc::new(0); + let weak_ref = Arc::downgrade(&ref_count); + + let (sender, receiver) = mpsc::unbounded(); + sender.unbounded_send(ref_count).expect("failed to send"); + + // Verify that the sent message is still live. + assert!(weak_ref.upgrade().is_some()); + + drop(receiver); + + // The sender should know the channel is closed. + assert!(sender.is_closed()); + + // Verify that the sent message has been dropped. + assert!(weak_ref.upgrade().is_none()); + } + + { + let ref_count = Arc::new(0); + let weak_ref = Arc::downgrade(&ref_count); + + let (mut sender, receiver) = mpsc::channel(1); + sender.try_send(ref_count).expect("failed to send"); + + // Verify that the sent message is still live. + assert!(weak_ref.upgrade().is_some()); + + drop(receiver); + + // The sender should know the channel is closed. + assert!(sender.is_closed()); + + // Verify that the sent message has been dropped. + assert!(weak_ref.upgrade().is_none()); + assert!(sender.is_closed()); + } +} + +// Stress test that `try_send()`s occurring concurrently with receiver +// close/drops don't appear as successful sends. +#[cfg_attr(miri, ignore)] // Miri is too slow +#[test] +fn stress_try_send_as_receiver_closes() { + const AMT: usize = 10000; + // To provide variable timing characteristics (in the hopes of + // reproducing the collision that leads to a race), we busy-re-poll + // the test MPSC receiver a variable number of times before actually + // stopping. We vary this countdown between 1 and the following + // value. + const MAX_COUNTDOWN: usize = 20; + // When we detect that a successfully sent item is still in the + // queue after a disconnect, we spin for up to 100ms to confirm that + // it is a persistent condition and not a concurrency illusion. + const SPIN_TIMEOUT_S: u64 = 10; + const SPIN_SLEEP_MS: u64 = 10; + struct TestRx { + rx: mpsc::Receiver<Arc<()>>, + // The number of times to query `rx` before dropping it. + poll_count: usize, + } + struct TestTask { + command_rx: mpsc::Receiver<TestRx>, + test_rx: Option<mpsc::Receiver<Arc<()>>>, + countdown: usize, + } + impl TestTask { + /// Create a new TestTask + fn new() -> (TestTask, mpsc::Sender<TestRx>) { + let (command_tx, command_rx) = mpsc::channel::<TestRx>(0); + ( + TestTask { + command_rx, + test_rx: None, + countdown: 0, // 0 means no countdown is in progress. + }, + command_tx, + ) + } + } + impl Future for TestTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // Poll the test channel, if one is present. + if let Some(rx) = &mut self.test_rx { + if let Poll::Ready(v) = rx.poll_next_unpin(cx) { + let _ = v.expect("test finished unexpectedly!"); + } + self.countdown -= 1; + // Busy-poll until the countdown is finished. + cx.waker().wake_by_ref(); + } + // Accept any newly submitted MPSC channels for testing. + match self.command_rx.poll_next_unpin(cx) { + Poll::Ready(Some(TestRx { rx, poll_count })) => { + self.test_rx = Some(rx); + self.countdown = poll_count; + cx.waker().wake_by_ref(); + } + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => {} + } + if self.countdown == 0 { + // Countdown complete -- drop the Receiver. + self.test_rx = None; + } + Poll::Pending + } + } + let (f, mut cmd_tx) = TestTask::new(); + let bg = thread::spawn(move || block_on(f)); + for i in 0..AMT { + let (mut test_tx, rx) = mpsc::channel(0); + let poll_count = i % MAX_COUNTDOWN; + cmd_tx.try_send(TestRx { rx, poll_count }).unwrap(); + let mut prev_weak: Option<Weak<()>> = None; + let mut attempted_sends = 0; + let mut successful_sends = 0; + loop { + // Create a test item. + let item = Arc::new(()); + let weak = Arc::downgrade(&item); + match test_tx.try_send(item) { + Ok(_) => { + prev_weak = Some(weak); + successful_sends += 1; + } + Err(ref e) if e.is_full() => {} + Err(ref e) if e.is_disconnected() => { + // Test for evidence of the race condition. + if let Some(prev_weak) = prev_weak { + if prev_weak.upgrade().is_some() { + // The previously sent item is still allocated. + // However, there appears to be some aspect of the + // concurrency that can legitimately cause the Arc + // to be momentarily valid. Spin for up to 100ms + // waiting for the previously sent item to be + // dropped. + let t0 = Instant::now(); + let mut spins = 0; + loop { + if prev_weak.upgrade().is_none() { + break; + } + assert!( + t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), + "item not dropped on iteration {} after \ + {} sends ({} successful). spin=({})", + i, + attempted_sends, + successful_sends, + spins + ); + spins += 1; + thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); + } + } + } + break; + } + Err(ref e) => panic!("unexpected error: {}", e), + } + attempted_sends += 1; + } + } + drop(cmd_tx); + bg.join().expect("background thread join"); +} + +#[test] +fn unbounded_try_next_after_none() { + let (tx, mut rx) = mpsc::unbounded::<String>(); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); +} + +#[test] +fn bounded_try_next_after_none() { + let (tx, mut rx) = mpsc::channel::<String>(17); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); +} diff --git a/third_party/rust/futures-channel/tests/mpsc.rs b/third_party/rust/futures-channel/tests/mpsc.rs new file mode 100644 index 0000000000..444c8e10fd --- /dev/null +++ b/third_party/rust/futures-channel/tests/mpsc.rs @@ -0,0 +1,634 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{poll_fn, FutureExt}; +use futures::pin_mut; +use futures::sink::{Sink, SinkExt}; +use futures::stream::{Stream, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::task::{new_count_waker, noop_context}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; + +trait AssertSend: Send {} +impl AssertSend for mpsc::Sender<i32> {} +impl AssertSend for mpsc::Receiver<i32> {} + +#[test] +fn send_recv() { + let (mut tx, rx) = mpsc::channel::<i32>(16); + + block_on(tx.send(1)).unwrap(); + drop(tx); + let v: Vec<_> = block_on(rx.collect()); + assert_eq!(v, vec![1]); +} + +#[test] +fn send_recv_no_buffer() { + // Run on a task context + block_on(poll_fn(move |cx| { + let (tx, rx) = mpsc::channel::<i32>(0); + pin_mut!(tx, rx); + + assert!(tx.as_mut().poll_flush(cx).is_ready()); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + // Send first message + assert!(tx.as_mut().start_send(1).is_ok()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // poll_ready said Pending, so no room in buffer, therefore new sends + // should get rejected with is_full. + assert!(tx.as_mut().start_send(0).unwrap_err().is_full()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // Take the value + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + // Send second message + assert!(tx.as_mut().poll_ready(cx).is_ready()); + assert!(tx.as_mut().start_send(2).is_ok()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // Take the value + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + Poll::Ready(()) + })); +} + +#[test] +fn send_shared_recv() { + let (mut tx1, rx) = mpsc::channel::<i32>(16); + let mut rx = block_on_stream(rx); + let mut tx2 = tx1.clone(); + + block_on(tx1.send(1)).unwrap(); + assert_eq!(rx.next(), Some(1)); + + block_on(tx2.send(2)).unwrap(); + assert_eq!(rx.next(), Some(2)); +} + +#[test] +fn send_recv_threads() { + let (mut tx, rx) = mpsc::channel::<i32>(16); + + let t = thread::spawn(move || { + block_on(tx.send(1)).unwrap(); + }); + + let v: Vec<_> = block_on(rx.take(1).collect()); + assert_eq!(v, vec![1]); + + t.join().unwrap(); +} + +#[test] +fn send_recv_threads_no_capacity() { + let (mut tx, rx) = mpsc::channel::<i32>(0); + + let t = thread::spawn(move || { + block_on(tx.send(1)).unwrap(); + block_on(tx.send(2)).unwrap(); + }); + + let v: Vec<_> = block_on(rx.collect()); + assert_eq!(v, vec![1, 2]); + + t.join().unwrap(); +} + +#[test] +fn recv_close_gets_none() { + let (mut tx, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + block_on(poll_fn(move |cx| { + rx.close(); + + assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None)); + match tx.poll_ready(cx) { + Poll::Pending | Poll::Ready(Ok(_)) => panic!(), + Poll::Ready(Err(e)) => assert!(e.is_disconnected()), + }; + + Poll::Ready(()) + })); +} + +#[test] +fn tx_close_gets_none() { + let (_, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + block_on(poll_fn(move |cx| { + assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None)); + Poll::Ready(()) + })); +} + +// #[test] +// fn spawn_sends_items() { +// let core = local_executor::Core::new(); +// let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1)))); +// let rx = mpsc::spawn(stream, &core, 1); +// assert_eq!(core.run(rx.take(4).collect()).unwrap(), +// [0, 1, 2, 3]); +// } + +// #[test] +// fn spawn_kill_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a stream which never returns anything (maybe a remote end isn't +// // responding), but dropping it leads to observable side effects +// // (like closing connections, releasing limited resources, ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Stream for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let stream = Dead{done: done_tx}; +// let rx = mpsc::spawn(stream, &core, 1); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // now drop the spawned stream: maybe some timeout exceeded, +// // or some connection on this end was closed by the remote +// // end. +// drop(rx); +// // and wait for the spawned stream to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => (), +// _ => { +// panic!("dead stream wasn't canceled"); +// }, +// } +// } + +#[test] +fn stress_shared_unbounded() { + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::unbounded::<i32>(); + + let t = thread::spawn(move || { + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len(), (AMT * NTHREADS) as usize); + for item in result { + assert_eq!(item, 1); + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + + thread::spawn(move || { + for _ in 0..AMT { + tx.unbounded_send(1).unwrap(); + } + }); + } + + drop(tx); + + t.join().ok().unwrap(); +} + +#[test] +fn stress_shared_bounded_hard() { + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::channel::<i32>(0); + + let t = thread::spawn(move || { + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len(), (AMT * NTHREADS) as usize); + for item in result { + assert_eq!(item, 1); + } + }); + + for _ in 0..NTHREADS { + let mut tx = tx.clone(); + + thread::spawn(move || { + for _ in 0..AMT { + block_on(tx.send(1)).unwrap(); + } + }); + } + + drop(tx); + + t.join().unwrap(); +} + +#[allow(clippy::same_item_push)] +#[test] +fn stress_receiver_multi_task_bounded_hard() { + const AMT: usize = if cfg!(miri) { 100 } else { 10_000 }; + const NTHREADS: u32 = 2; + + let (mut tx, rx) = mpsc::channel::<usize>(0); + let rx = Arc::new(Mutex::new(Some(rx))); + let n = Arc::new(AtomicUsize::new(0)); + + let mut th = vec![]; + + for _ in 0..NTHREADS { + let rx = rx.clone(); + let n = n.clone(); + + let t = thread::spawn(move || { + let mut i = 0; + + loop { + i += 1; + let mut rx_opt = rx.lock().unwrap(); + if let Some(rx) = &mut *rx_opt { + if i % 5 == 0 { + let item = block_on(rx.next()); + + if item.is_none() { + *rx_opt = None; + break; + } + + n.fetch_add(1, Ordering::Relaxed); + } else { + // Just poll + let n = n.clone(); + match rx.poll_next_unpin(&mut noop_context()) { + Poll::Ready(Some(_)) => { + n.fetch_add(1, Ordering::Relaxed); + } + Poll::Ready(None) => { + *rx_opt = None; + break; + } + Poll::Pending => {} + } + } + } else { + break; + } + } + }); + + th.push(t); + } + + for i in 0..AMT { + block_on(tx.send(i)).unwrap(); + } + drop(tx); + + for t in th { + t.join().unwrap(); + } + + assert_eq!(AMT, n.load(Ordering::Relaxed)); +} + +/// Stress test that receiver properly receives all the messages +/// after sender dropped. +#[test] +fn stress_drop_sender() { + const ITER: usize = if cfg!(miri) { 100 } else { 10000 }; + + fn list() -> impl Stream<Item = i32> { + let (tx, rx) = mpsc::channel(1); + thread::spawn(move || { + block_on(send_one_two_three(tx)); + }); + rx + } + + for _ in 0..ITER { + let v: Vec<_> = block_on(list().collect()); + assert_eq!(v, vec![1, 2, 3]); + } +} + +async fn send_one_two_three(mut tx: mpsc::Sender<i32>) { + for i in 1..=3 { + tx.send(i).await.unwrap(); + } +} + +/// Stress test that after receiver dropped, +/// no messages are lost. +fn stress_close_receiver_iter() { + let (tx, rx) = mpsc::unbounded(); + let mut rx = block_on_stream(rx); + let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel(); + let th = thread::spawn(move || { + for i in 1.. { + if tx.unbounded_send(i).is_err() { + unwritten_tx.send(i).expect("unwritten_tx"); + return; + } + } + }); + + // Read one message to make sure thread effectively started + assert_eq!(Some(1), rx.next()); + + rx.close(); + + for i in 2.. { + match rx.next() { + Some(r) => assert!(i == r), + None => { + let unwritten = unwritten_rx.recv().expect("unwritten_rx"); + assert_eq!(unwritten, i); + th.join().unwrap(); + return; + } + } + } +} + +#[test] +fn stress_close_receiver() { + const ITER: usize = if cfg!(miri) { 50 } else { 10000 }; + + for _ in 0..ITER { + stress_close_receiver_iter(); + } +} + +async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) { + for i in (1..=count).rev() { + sender.send(i).await.unwrap(); + } +} + +/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting. +#[allow(clippy::same_item_push)] +#[test] +fn stress_poll_ready() { + const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; + const NTHREADS: u32 = 8; + + /// Run a stress test using the specified channel capacity. + fn stress(capacity: usize) { + let (tx, rx) = mpsc::channel(capacity); + let mut threads = Vec::new(); + for _ in 0..NTHREADS { + let sender = tx.clone(); + threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT)))); + } + drop(tx); + + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len() as u32, AMT * NTHREADS); + + for thread in threads { + thread.join().unwrap(); + } + } + + stress(0); + stress(1); + stress(8); + stress(16); +} + +#[test] +fn try_send_1() { + const N: usize = if cfg!(miri) { 100 } else { 3000 }; + let (mut tx, rx) = mpsc::channel(0); + + let t = thread::spawn(move || { + for i in 0..N { + loop { + if tx.try_send(i).is_ok() { + break; + } + } + } + }); + + let result: Vec<_> = block_on(rx.collect()); + for (i, j) in result.into_iter().enumerate() { + assert_eq!(i, j); + } + + t.join().unwrap(); +} + +#[test] +fn try_send_2() { + let (mut tx, rx) = mpsc::channel(0); + let mut rx = block_on_stream(rx); + + tx.try_send("hello").unwrap(); + + let (readytx, readyrx) = oneshot::channel::<()>(); + + let th = thread::spawn(move || { + block_on(poll_fn(|cx| { + assert!(tx.poll_ready(cx).is_pending()); + Poll::Ready(()) + })); + + drop(readytx); + block_on(tx.send("goodbye")).unwrap(); + }); + + let _ = block_on(readyrx); + assert_eq!(rx.next(), Some("hello")); + assert_eq!(rx.next(), Some("goodbye")); + assert_eq!(rx.next(), None); + + th.join().unwrap(); +} + +#[test] +fn try_send_fail() { + let (mut tx, rx) = mpsc::channel(0); + let mut rx = block_on_stream(rx); + + tx.try_send("hello").unwrap(); + + // This should fail + assert!(tx.try_send("fail").is_err()); + + assert_eq!(rx.next(), Some("hello")); + + tx.try_send("goodbye").unwrap(); + drop(tx); + + assert_eq!(rx.next(), Some("goodbye")); + assert_eq!(rx.next(), None); +} + +#[test] +fn try_send_recv() { + let (mut tx, mut rx) = mpsc::channel(1); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap_err(); // should be full + rx.try_next().unwrap(); + rx.try_next().unwrap(); + rx.try_next().unwrap_err(); // should be empty + tx.try_send("hello").unwrap(); + rx.try_next().unwrap(); + rx.try_next().unwrap_err(); // should be empty +} + +#[test] +fn same_receiver() { + let (mut txa1, _) = mpsc::channel::<i32>(1); + let txa2 = txa1.clone(); + + let (mut txb1, _) = mpsc::channel::<i32>(1); + let txb2 = txb1.clone(); + + assert!(txa1.same_receiver(&txa2)); + assert!(txb1.same_receiver(&txb2)); + assert!(!txa1.same_receiver(&txb1)); + + txa1.disconnect(); + txb1.close_channel(); + + assert!(!txa1.same_receiver(&txa2)); + assert!(txb1.same_receiver(&txb2)); +} + +#[test] +fn is_connected_to() { + let (txa, rxa) = mpsc::channel::<i32>(1); + let (txb, rxb) = mpsc::channel::<i32>(1); + + assert!(txa.is_connected_to(&rxa)); + assert!(txb.is_connected_to(&rxb)); + assert!(!txa.is_connected_to(&rxb)); + assert!(!txb.is_connected_to(&rxa)); +} + +#[test] +fn hash_receiver() { + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + + let mut hasher_a1 = DefaultHasher::new(); + let mut hasher_a2 = DefaultHasher::new(); + let mut hasher_b1 = DefaultHasher::new(); + let mut hasher_b2 = DefaultHasher::new(); + let (mut txa1, _) = mpsc::channel::<i32>(1); + let txa2 = txa1.clone(); + + let (mut txb1, _) = mpsc::channel::<i32>(1); + let txb2 = txb1.clone(); + + txa1.hash_receiver(&mut hasher_a1); + let hash_a1 = hasher_a1.finish(); + txa2.hash_receiver(&mut hasher_a2); + let hash_a2 = hasher_a2.finish(); + txb1.hash_receiver(&mut hasher_b1); + let hash_b1 = hasher_b1.finish(); + txb2.hash_receiver(&mut hasher_b2); + let hash_b2 = hasher_b2.finish(); + + assert_eq!(hash_a1, hash_a2); + assert_eq!(hash_b1, hash_b2); + assert!(hash_a1 != hash_b1); + + txa1.disconnect(); + txb1.close_channel(); + + let mut hasher_a1 = DefaultHasher::new(); + let mut hasher_a2 = DefaultHasher::new(); + let mut hasher_b1 = DefaultHasher::new(); + let mut hasher_b2 = DefaultHasher::new(); + + txa1.hash_receiver(&mut hasher_a1); + let hash_a1 = hasher_a1.finish(); + txa2.hash_receiver(&mut hasher_a2); + let hash_a2 = hasher_a2.finish(); + txb1.hash_receiver(&mut hasher_b1); + let hash_b1 = hasher_b1.finish(); + txb2.hash_receiver(&mut hasher_b2); + let hash_b2 = hasher_b2.finish(); + + assert!(hash_a1 != hash_a2); + assert_eq!(hash_b1, hash_b2); +} + +#[test] +fn send_backpressure() { + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let (mut tx, mut rx) = mpsc::channel(1); + block_on(tx.send(1)).unwrap(); + + let mut task = tx.send(2); + assert_eq!(task.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(counter, 0); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 1); + assert_eq!(counter, 1); + assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(()))); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 2); +} + +#[test] +fn send_backpressure_multi_senders() { + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let (mut tx1, mut rx) = mpsc::channel(1); + let mut tx2 = tx1.clone(); + block_on(tx1.send(1)).unwrap(); + + let mut task = tx2.send(2); + assert_eq!(task.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(counter, 0); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 1); + assert_eq!(counter, 1); + assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(()))); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 2); +} diff --git a/third_party/rust/futures-channel/tests/oneshot.rs b/third_party/rust/futures-channel/tests/oneshot.rs new file mode 100644 index 0000000000..6b48376dc0 --- /dev/null +++ b/third_party/rust/futures-channel/tests/oneshot.rs @@ -0,0 +1,256 @@ +use futures::channel::oneshot::{self, Sender}; +use futures::executor::block_on; +use futures::future::{poll_fn, FutureExt}; +use futures::task::{Context, Poll}; +use futures_test::task::panic_waker_ref; +use std::sync::mpsc; +use std::thread; + +#[test] +fn smoke_poll() { + let (mut tx, rx) = oneshot::channel::<u32>(); + let mut rx = Some(rx); + let f = poll_fn(|cx| { + assert!(tx.poll_canceled(cx).is_pending()); + assert!(tx.poll_canceled(cx).is_pending()); + drop(rx.take()); + assert!(tx.poll_canceled(cx).is_ready()); + assert!(tx.poll_canceled(cx).is_ready()); + Poll::Ready(()) + }); + + block_on(f); +} + +#[test] +fn cancel_notifies() { + let (mut tx, rx) = oneshot::channel::<u32>(); + + let t = thread::spawn(move || { + block_on(tx.cancellation()); + }); + drop(rx); + t.join().unwrap(); +} + +#[test] +fn cancel_lots() { + const N: usize = if cfg!(miri) { 100 } else { 20000 }; + + let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); + let t = thread::spawn(move || { + for (mut tx, tx2) in rx { + block_on(tx.cancellation()); + tx2.send(()).unwrap(); + } + }); + + for _ in 0..N { + let (otx, orx) = oneshot::channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + tx.send((otx, tx2)).unwrap(); + drop(orx); + rx2.recv().unwrap(); + } + drop(tx); + + t.join().unwrap(); +} + +#[test] +fn cancel_after_sender_drop_doesnt_notify() { + let (mut tx, rx) = oneshot::channel::<u32>(); + let mut cx = Context::from_waker(panic_waker_ref()); + assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending); + drop(tx); + drop(rx); +} + +#[test] +fn close() { + let (mut tx, mut rx) = oneshot::channel::<u32>(); + rx.close(); + block_on(poll_fn(|cx| { + match rx.poll_unpin(cx) { + Poll::Ready(Err(_)) => {} + _ => panic!(), + }; + assert!(tx.poll_canceled(cx).is_ready()); + Poll::Ready(()) + })); +} + +#[test] +fn close_wakes() { + let (mut tx, mut rx) = oneshot::channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + let t = thread::spawn(move || { + rx.close(); + rx2.recv().unwrap(); + }); + block_on(tx.cancellation()); + tx2.send(()).unwrap(); + t.join().unwrap(); +} + +#[test] +fn is_canceled() { + let (tx, rx) = oneshot::channel::<u32>(); + assert!(!tx.is_canceled()); + drop(rx); + assert!(tx.is_canceled()); +} + +#[test] +fn cancel_sends() { + const N: usize = if cfg!(miri) { 100 } else { 20000 }; + + let (tx, rx) = mpsc::channel::<Sender<_>>(); + let t = thread::spawn(move || { + for otx in rx { + let _ = otx.send(42); + } + }); + + for _ in 0..N { + let (otx, mut orx) = oneshot::channel::<u32>(); + tx.send(otx).unwrap(); + + orx.close(); + let _ = block_on(orx); + } + + drop(tx); + t.join().unwrap(); +} + +// #[test] +// fn spawn_sends_items() { +// let core = local_executor::Core::new(); +// let future = ok::<_, ()>(1); +// let rx = spawn(future, &core); +// assert_eq!(core.run(rx).unwrap(), 1); +// } +// +// #[test] +// fn spawn_kill_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a future which never returns anything (forever accepting incoming +// // connections), but dropping it leads to observable side effects +// // (like closing listening sockets, releasing limited resources, +// // ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Future for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Self::Item, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let future = Dead{done: done_tx}; +// let rx = spawn(future, &core); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // now drop the spawned future: maybe some timeout exceeded, +// // or some connection on this end was closed by the remote +// // end. +// drop(rx); +// // and wait for the spawned future to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => (), +// Ok(Either::B(((), _))) => { +// panic!("dead future wasn't canceled (timeout)"); +// }, +// _ => { +// panic!("dead future wasn't canceled (unexpected result)"); +// }, +// } +// } +// +// #[test] +// fn spawn_dont_kill_forgot_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a future which never returns anything (forever accepting incoming +// // connections), but dropping it leads to observable side effects +// // (like closing listening sockets, releasing limited resources, +// // ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Future for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Self::Item, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let future = Dead{done: done_tx}; +// let rx = spawn(future, &core); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // forget the spawned future: should keep running, i.e. hit +// // the timeout below. +// rx.forget(); +// // and wait for the spawned future to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => { +// panic!("forgotten dead future was canceled"); +// }, +// Ok(Either::B(((), _))) => (), // reached timeout +// _ => { +// panic!("forgotten dead future was canceled (unexpected result)"); +// }, +// } +// } |