diff options
Diffstat (limited to 'third_party/rust/want')
-rw-r--r-- | third_party/rust/want/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/want/Cargo.toml | 32 | ||||
-rw-r--r-- | third_party/rust/want/LICENSE | 20 | ||||
-rw-r--r-- | third_party/rust/want/README.md | 24 | ||||
-rw-r--r-- | third_party/rust/want/benches/throughput.rs | 14 | ||||
-rw-r--r-- | third_party/rust/want/src/lib.rs | 585 |
6 files changed, 676 insertions, 0 deletions
diff --git a/third_party/rust/want/.cargo-checksum.json b/third_party/rust/want/.cargo-checksum.json new file mode 100644 index 0000000000..00b647d0bf --- /dev/null +++ b/third_party/rust/want/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"aa3ff35ee3a13df045f467b87cca65a8cc3bf05fd390a1347f0d2d9673894708","LICENSE":"a65f5d0a945d267751344c95665945b90c030ea107faf5c85d518929886187da","README.md":"316ed49131b031a3b323a3d4eab8327e4207ff2728c661c3c72556b63457038b","benches/throughput.rs":"d183d648db263d8b82fa9496132820d8ab56bacc637a06c7c8746e4fadaefb49","src/lib.rs":"2d5bf55eae742f96b490838e079b05c9efb2765e7e6aa72d7fa31baa448a0fc0"},"package":"1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"}
\ No newline at end of file diff --git a/third_party/rust/want/Cargo.toml b/third_party/rust/want/Cargo.toml new file mode 100644 index 0000000000..f03cb726f5 --- /dev/null +++ b/third_party/rust/want/Cargo.toml @@ -0,0 +1,32 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +edition = "2018" +name = "want" +version = "0.3.0" +authors = ["Sean McArthur <sean@seanmonstar.com>"] +description = "Detect when another Future wants a result." +documentation = "https://docs.rs/want" +keywords = ["futures", "channel", "async"] +license = "MIT" +repository = "https://github.com/seanmonstar/want" +[dependencies.log] +version = "0.4" + +[dependencies.try-lock] +version = "0.2" +[dev-dependencies.tokio-executor] +version = "0.2.0-alpha.2" + +[dev-dependencies.tokio-sync] +version = "0.2.0-alpha.2" diff --git a/third_party/rust/want/LICENSE b/third_party/rust/want/LICENSE new file mode 100644 index 0000000000..e0f0f8ac76 --- /dev/null +++ b/third_party/rust/want/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2018-2019 Sean McArthur + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + diff --git a/third_party/rust/want/README.md b/third_party/rust/want/README.md new file mode 100644 index 0000000000..bd71b52b69 --- /dev/null +++ b/third_party/rust/want/README.md @@ -0,0 +1,24 @@ +# Want + +- [Crates.io](https://crates.io/crates/want) +- [Docs](https://docs.rs/want) + +A `Future`s channel-like utility to signal when a value is wanted. + +Futures are supposed to be lazy, and only starting work if `Future::poll` +is called. The same is true of `Stream`s, but when using a channel as +a `Stream`, it can be hard to know if the receiver is ready for the next +value. + +Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`, +how can the sender (`tx`) know when the receiver (`rx`) actually wants more +work to be produced? Just because there is room in the channel buffer +doesn't mean the work would be used by the receiver. + +This is where something like `want` comes in. Added to a channel, you can +make sure that the `tx` only creates the message and sends it when the `rx` +has `poll()` for it, and the buffer was empty. + +## License + +`want` is provided under the MIT license. See [LICENSE](LICENSE). diff --git a/third_party/rust/want/benches/throughput.rs b/third_party/rust/want/benches/throughput.rs new file mode 100644 index 0000000000..5acca6b1ac --- /dev/null +++ b/third_party/rust/want/benches/throughput.rs @@ -0,0 +1,14 @@ +#![feature(test)] + +extern crate test; +extern crate want; + +#[bench] +fn throughput(b: &mut test::Bencher) { + let (mut gv, mut tk) = want::new(); + + b.iter(move || { + tk.want(); + assert!(gv.poll_want().unwrap().is_ready()); + }); +} diff --git a/third_party/rust/want/src/lib.rs b/third_party/rust/want/src/lib.rs new file mode 100644 index 0000000000..7d8369894f --- /dev/null +++ b/third_party/rust/want/src/lib.rs @@ -0,0 +1,585 @@ +#![doc(html_root_url = "https://docs.rs/want/0.3.0")] +#![deny(warnings)] +#![deny(missing_docs)] +#![deny(missing_debug_implementations)] + +//! A Futures channel-like utility to signal when a value is wanted. +//! +//! Futures are supposed to be lazy, and only starting work if `Future::poll` +//! is called. The same is true of `Stream`s, but when using a channel as +//! a `Stream`, it can be hard to know if the receiver is ready for the next +//! value. +//! +//! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`, +//! how can the sender (`tx`) know when the receiver (`rx`) actually wants more +//! work to be produced? Just because there is room in the channel buffer +//! doesn't mean the work would be used by the receiver. +//! +//! This is where something like `want` comes in. Added to a channel, you can +//! make sure that the `tx` only creates the message and sends it when the `rx` +//! has `poll()` for it, and the buffer was empty. +//! +//! # Example +//! +//! ```nightly +//! # //#![feature(async_await)] +//! extern crate want; +//! +//! # fn spawn<T>(_t: T) {} +//! # fn we_still_want_message() -> bool { true } +//! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) } +//! # struct Tx; +//! # impl Tx { fn send<T>(&mut self, _: T) {} } +//! # struct Rx; +//! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } } +//! +//! // Some message that is expensive to produce. +//! struct Expensive; +//! +//! // Some futures-aware MPSC channel... +//! let (mut tx, mut rx) = mpsc_channel(); +//! +//! // And our `want` channel! +//! let (mut gv, mut tk) = want::new(); +//! +//! +//! // Our receiving task... +//! spawn(async move { +//! // Maybe something comes up that prevents us from ever +//! // using the expensive message. +//! // +//! // Without `want`, the "send" task may have started to +//! // produce the expensive message even though we wouldn't +//! // be able to use it. +//! if !we_still_want_message() { +//! return; +//! } +//! +//! // But we can use it! So tell the `want` channel. +//! tk.want(); +//! +//! match rx.recv().await { +//! Some(_msg) => println!("got a message"), +//! None => println!("DONE"), +//! } +//! }); +//! +//! // Our sending task +//! spawn(async move { +//! // It's expensive to create a new message, so we wait until the +//! // receiving end truly *wants* the message. +//! if let Err(_closed) = gv.want().await { +//! // Looks like they will never want it... +//! return; +//! } +//! +//! // They want it, let's go! +//! tx.send(Expensive); +//! }); +//! +//! # fn main() {} +//! ``` + +#[macro_use] +extern crate log; + +use std::fmt; +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +// SeqCst is the only ordering used to ensure accessing the state and +// TryLock are never re-ordered. +use std::sync::atomic::Ordering::SeqCst; +use std::task::{self, Poll, Waker}; + + +use try_lock::TryLock; + +/// Create a new `want` channel. +pub fn new() -> (Giver, Taker) { + let inner = Arc::new(Inner { + state: AtomicUsize::new(State::Idle.into()), + task: TryLock::new(None), + }); + let inner2 = inner.clone(); + ( + Giver { + inner: inner, + }, + Taker { + inner: inner2, + }, + ) +} + +/// An entity that gives a value when wanted. +pub struct Giver { + inner: Arc<Inner>, +} + +/// An entity that wants a value. +pub struct Taker { + inner: Arc<Inner>, +} + +/// A cloneable `Giver`. +/// +/// It differs from `Giver` in that you cannot poll for `want`. It's only +/// usable as a cancellation watcher. +#[derive(Clone)] +pub struct SharedGiver { + inner: Arc<Inner>, +} + +/// The `Taker` has canceled its interest in a value. +pub struct Closed { + _inner: (), +} + +#[derive(Clone, Copy, Debug)] +enum State { + Idle, + Want, + Give, + Closed, +} + +impl From<State> for usize { + fn from(s: State) -> usize { + match s { + State::Idle => 0, + State::Want => 1, + State::Give => 2, + State::Closed => 3, + } + } +} + +impl From<usize> for State { + fn from(num: usize) -> State { + match num { + 0 => State::Idle, + 1 => State::Want, + 2 => State::Give, + 3 => State::Closed, + _ => unreachable!("unknown state: {}", num), + } + } +} + +struct Inner { + state: AtomicUsize, + task: TryLock<Option<Waker>>, +} + +// ===== impl Giver ====== + +impl Giver { + /// Returns a `Future` that fulfills when the `Taker` has done some action. + pub fn want<'a>(&'a mut self) -> impl Future<Output = Result<(), Closed>> + 'a { + Want(self) + } + + /// Poll whether the `Taker` has registered interest in another value. + /// + /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`. + /// - If the `Taker` has not called `want()` since last poll, this + /// returns `Async::NotReady`, and parks the current task to be notified + /// when the `Taker` does call `want()`. + /// - If the `Taker` has canceled (or dropped), this returns `Closed`. + /// + /// After knowing that the Taker is wanting, the state can be reset by + /// calling [`give`](Giver::give). + pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> { + loop { + let state = self.inner.state.load(SeqCst).into(); + match state { + State::Want => { + trace!("poll_want: taker wants!"); + return Poll::Ready(Ok(())); + }, + State::Closed => { + trace!("poll_want: closed"); + return Poll::Ready(Err(Closed { _inner: () })); + }, + State::Idle | State::Give => { + // Taker doesn't want anything yet, so park. + if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) { + + // While we have the lock, try to set to GIVE. + let old = self.inner.state.compare_and_swap( + state.into(), + State::Give.into(), + SeqCst, + ); + // If it's still the first state (Idle or Give), park current task. + if old == state.into() { + let park = locked.as_ref() + .map(|w| !w.will_wake(cx.waker())) + .unwrap_or(true); + if park { + let old = mem::replace(&mut *locked, Some(cx.waker().clone())); + drop(locked); + old.map(|prev_task| { + // there was an old task parked here. + // it might be waiting to be notified, + // so poke it before dropping. + prev_task.wake(); + }); + } + return Poll::Pending; + } + // Otherwise, something happened! Go around the loop again. + } else { + // if we couldn't take the lock, then a Taker has it. + // The *ONLY* reason is because it is in the process of notifying us + // of its want. + // + // We need to loop again to see what state it was changed to. + } + }, + } + } + } + + /// Mark the state as idle, if the Taker currently is wanting. + /// + /// Returns true if Taker was wanting, false otherwise. + #[inline] + pub fn give(&self) -> bool { + // only set to IDLE if it is still Want + self.inner.state.compare_and_swap( + State::Want.into(), + State::Idle.into(), + SeqCst, + ) == State::Want.into() + } + + /// Check if the `Taker` has called `want()` without parking a task. + /// + /// This is safe to call outside of a futures task context, but other + /// means of being notified is left to the user. + #[inline] + pub fn is_wanting(&self) -> bool { + self.inner.state.load(SeqCst) == State::Want.into() + } + + + /// Check if the `Taker` has canceled interest without parking a task. + #[inline] + pub fn is_canceled(&self) -> bool { + self.inner.state.load(SeqCst) == State::Closed.into() + } + + /// Converts this into a `SharedGiver`. + #[inline] + pub fn shared(self) -> SharedGiver { + SharedGiver { + inner: self.inner, + } + } +} + +impl fmt::Debug for Giver { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Giver") + .field("state", &self.inner.state()) + .finish() + } +} + +// ===== impl SharedGiver ====== + +impl SharedGiver { + /// Check if the `Taker` has called `want()` without parking a task. + /// + /// This is safe to call outside of a futures task context, but other + /// means of being notified is left to the user. + #[inline] + pub fn is_wanting(&self) -> bool { + self.inner.state.load(SeqCst) == State::Want.into() + } + + + /// Check if the `Taker` has canceled interest without parking a task. + #[inline] + pub fn is_canceled(&self) -> bool { + self.inner.state.load(SeqCst) == State::Closed.into() + } +} + +impl fmt::Debug for SharedGiver { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SharedGiver") + .field("state", &self.inner.state()) + .finish() + } +} + +// ===== impl Taker ====== + +impl Taker { + /// Signal to the `Giver` that the want is canceled. + /// + /// This is useful to tell that the channel is closed if you cannot + /// drop the value yet. + #[inline] + pub fn cancel(&mut self) { + trace!("signal: {:?}", State::Closed); + self.signal(State::Closed) + } + + /// Signal to the `Giver` that a value is wanted. + #[inline] + pub fn want(&mut self) { + debug_assert!( + self.inner.state.load(SeqCst) != State::Closed.into(), + "want called after cancel" + ); + trace!("signal: {:?}", State::Want); + self.signal(State::Want) + } + + #[inline] + fn signal(&mut self, state: State) { + let old_state = self.inner.state.swap(state.into(), SeqCst).into(); + match old_state { + State::Idle | State::Want | State::Closed => (), + State::Give => { + loop { + if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) { + if let Some(task) = locked.take() { + drop(locked); + trace!("signal found waiting giver, notifying"); + task.wake(); + } + return; + } else { + // if we couldn't take the lock, then a Giver has it. + // The *ONLY* reason is because it is in the process of parking. + // + // We need to loop and take the lock so we can notify this task. + } + } + }, + } + } +} + +impl Drop for Taker { + #[inline] + fn drop(&mut self) { + self.signal(State::Closed); + } +} + +impl fmt::Debug for Taker { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Taker") + .field("state", &self.inner.state()) + .finish() + } +} + +// ===== impl Closed ====== + +impl fmt::Debug for Closed { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Closed") + .finish() + } +} + +// ===== impl Inner ====== + +impl Inner { + #[inline] + fn state(&self) -> State { + self.state.load(SeqCst).into() + } +} + +// ===== impl PollFn ====== + +struct Want<'a>(&'a mut Giver); + + +impl Future for Want<'_> { + type Output = Result<(), Closed>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + self.0.poll_want(cx) + } +} + +#[cfg(test)] +mod tests { + use std::thread; + use tokio_sync::oneshot; + use super::*; + + fn block_on<F: Future>(f: F) -> F::Output { + tokio_executor::enter() + .expect("block_on enter") + .block_on(f) + } + + #[test] + fn want_ready() { + let (mut gv, mut tk) = new(); + + tk.want(); + + block_on(gv.want()).unwrap(); + } + + #[test] + fn want_notify_0() { + let (mut gv, mut tk) = new(); + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + tk.want(); + // use a oneshot to keep this thread alive + // until other thread was notified of want + block_on(rx).expect("rx"); + }); + + block_on(gv.want()).expect("want"); + + assert!(gv.is_wanting(), "still wanting after poll_want success"); + assert!(gv.give(), "give is true when wanting"); + + assert!(!gv.is_wanting(), "no longer wanting after give"); + assert!(!gv.is_canceled(), "give doesn't cancel"); + + assert!(!gv.give(), "give is false if not wanting"); + + tx.send(()).expect("tx"); + } + + /* + /// This tests that if the Giver moves tasks after parking, + /// it will still wake up the correct task. + #[test] + fn want_notify_moving_tasks() { + use std::sync::Arc; + use futures::executor::{spawn, Notify, NotifyHandle}; + + struct WantNotify; + + impl Notify for WantNotify { + fn notify(&self, _id: usize) { + } + } + + fn n() -> NotifyHandle { + Arc::new(WantNotify).into() + } + + let (mut gv, mut tk) = new(); + + let mut s = spawn(poll_fn(move || { + gv.poll_want() + })); + + // Register with t1 as the task::current() + let t1 = n(); + assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready()); + + thread::spawn(move || { + thread::sleep(::std::time::Duration::from_millis(100)); + tk.want(); + }); + + // And now, move to a ThreadNotify task. + s.into_inner().wait().expect("poll_want"); + } + */ + + #[test] + fn cancel() { + // explicit + let (mut gv, mut tk) = new(); + + assert!(!gv.is_canceled()); + + tk.cancel(); + + assert!(gv.is_canceled()); + block_on(gv.want()).unwrap_err(); + + // implicit + let (mut gv, tk) = new(); + + assert!(!gv.is_canceled()); + + drop(tk); + + assert!(gv.is_canceled()); + block_on(gv.want()).unwrap_err(); + + // notifies + let (mut gv, tk) = new(); + + thread::spawn(move || { + let _tk = tk; + // and dropped + }); + + block_on(gv.want()).unwrap_err(); + } + + /* + #[test] + fn stress() { + let nthreads = 5; + let nwants = 100; + + for _ in 0..nthreads { + let (mut gv, mut tk) = new(); + let (mut tx, mut rx) = mpsc::channel(0); + + // rx thread + thread::spawn(move || { + let mut cnt = 0; + poll_fn(move || { + while cnt < nwants { + let n = match rx.poll().expect("rx poll") { + Async::Ready(n) => n.expect("rx opt"), + Async::NotReady => { + tk.want(); + return Ok(Async::NotReady); + }, + }; + assert_eq!(cnt, n); + cnt += 1; + } + Ok::<_, ()>(Async::Ready(())) + }).wait().expect("rx wait"); + }); + + // tx thread + thread::spawn(move || { + let mut cnt = 0; + let nsent = poll_fn(move || { + loop { + while let Ok(()) = tx.try_send(cnt) { + cnt += 1; + } + match gv.poll_want() { + Ok(Async::Ready(_)) => (), + Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady), + Err(_) => return Ok(Async::Ready(cnt)), + } + } + }).wait().expect("tx wait"); + + assert_eq!(nsent, nwants); + }).join().expect("thread join"); + } + } + */ +} |