#![doc(html_root_url = "https://docs.rs/want/0.3.0")] #![deny(warnings)] #![deny(missing_docs)] #![deny(missing_debug_implementations)] //! A Futures channel-like utility to signal when a value is wanted. //! //! Futures are supposed to be lazy, and only starting work if `Future::poll` //! is called. The same is true of `Stream`s, but when using a channel as //! a `Stream`, it can be hard to know if the receiver is ready for the next //! value. //! //! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`, //! how can the sender (`tx`) know when the receiver (`rx`) actually wants more //! work to be produced? Just because there is room in the channel buffer //! doesn't mean the work would be used by the receiver. //! //! This is where something like `want` comes in. Added to a channel, you can //! make sure that the `tx` only creates the message and sends it when the `rx` //! has `poll()` for it, and the buffer was empty. //! //! # Example //! //! ```nightly //! # //#![feature(async_await)] //! extern crate want; //! //! # fn spawn(_t: T) {} //! # fn we_still_want_message() -> bool { true } //! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) } //! # struct Tx; //! # impl Tx { fn send(&mut self, _: T) {} } //! # struct Rx; //! # impl Rx { async fn recv(&mut self) -> Option { Some(Expensive) } } //! //! // Some message that is expensive to produce. //! struct Expensive; //! //! // Some futures-aware MPSC channel... //! let (mut tx, mut rx) = mpsc_channel(); //! //! // And our `want` channel! //! let (mut gv, mut tk) = want::new(); //! //! //! // Our receiving task... //! spawn(async move { //! // Maybe something comes up that prevents us from ever //! // using the expensive message. //! // //! // Without `want`, the "send" task may have started to //! // produce the expensive message even though we wouldn't //! // be able to use it. //! if !we_still_want_message() { //! return; //! } //! //! // But we can use it! So tell the `want` channel. //! tk.want(); //! //! match rx.recv().await { //! Some(_msg) => println!("got a message"), //! None => println!("DONE"), //! } //! }); //! //! // Our sending task //! spawn(async move { //! // It's expensive to create a new message, so we wait until the //! // receiving end truly *wants* the message. //! if let Err(_closed) = gv.want().await { //! // Looks like they will never want it... //! return; //! } //! //! // They want it, let's go! //! tx.send(Expensive); //! }); //! //! # fn main() {} //! ``` #[macro_use] extern crate log; use std::fmt; use std::future::Future; use std::mem; use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicUsize; // SeqCst is the only ordering used to ensure accessing the state and // TryLock are never re-ordered. use std::sync::atomic::Ordering::SeqCst; use std::task::{self, Poll, Waker}; use try_lock::TryLock; /// Create a new `want` channel. pub fn new() -> (Giver, Taker) { let inner = Arc::new(Inner { state: AtomicUsize::new(State::Idle.into()), task: TryLock::new(None), }); let inner2 = inner.clone(); ( Giver { inner: inner, }, Taker { inner: inner2, }, ) } /// An entity that gives a value when wanted. pub struct Giver { inner: Arc, } /// An entity that wants a value. pub struct Taker { inner: Arc, } /// A cloneable `Giver`. /// /// It differs from `Giver` in that you cannot poll for `want`. It's only /// usable as a cancellation watcher. #[derive(Clone)] pub struct SharedGiver { inner: Arc, } /// The `Taker` has canceled its interest in a value. pub struct Closed { _inner: (), } #[derive(Clone, Copy, Debug)] enum State { Idle, Want, Give, Closed, } impl From for usize { fn from(s: State) -> usize { match s { State::Idle => 0, State::Want => 1, State::Give => 2, State::Closed => 3, } } } impl From for State { fn from(num: usize) -> State { match num { 0 => State::Idle, 1 => State::Want, 2 => State::Give, 3 => State::Closed, _ => unreachable!("unknown state: {}", num), } } } struct Inner { state: AtomicUsize, task: TryLock>, } // ===== impl Giver ====== impl Giver { /// Returns a `Future` that fulfills when the `Taker` has done some action. pub fn want<'a>(&'a mut self) -> impl Future> + 'a { Want(self) } /// Poll whether the `Taker` has registered interest in another value. /// /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`. /// - If the `Taker` has not called `want()` since last poll, this /// returns `Async::NotReady`, and parks the current task to be notified /// when the `Taker` does call `want()`. /// - If the `Taker` has canceled (or dropped), this returns `Closed`. /// /// After knowing that the Taker is wanting, the state can be reset by /// calling [`give`](Giver::give). pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll> { loop { let state = self.inner.state.load(SeqCst).into(); match state { State::Want => { trace!("poll_want: taker wants!"); return Poll::Ready(Ok(())); }, State::Closed => { trace!("poll_want: closed"); return Poll::Ready(Err(Closed { _inner: () })); }, State::Idle | State::Give => { // Taker doesn't want anything yet, so park. if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) { // While we have the lock, try to set to GIVE. let old = self.inner.state.compare_and_swap( state.into(), State::Give.into(), SeqCst, ); // If it's still the first state (Idle or Give), park current task. if old == state.into() { let park = locked.as_ref() .map(|w| !w.will_wake(cx.waker())) .unwrap_or(true); if park { let old = mem::replace(&mut *locked, Some(cx.waker().clone())); drop(locked); old.map(|prev_task| { // there was an old task parked here. // it might be waiting to be notified, // so poke it before dropping. prev_task.wake(); }); } return Poll::Pending; } // Otherwise, something happened! Go around the loop again. } else { // if we couldn't take the lock, then a Taker has it. // The *ONLY* reason is because it is in the process of notifying us // of its want. // // We need to loop again to see what state it was changed to. } }, } } } /// Mark the state as idle, if the Taker currently is wanting. /// /// Returns true if Taker was wanting, false otherwise. #[inline] pub fn give(&self) -> bool { // only set to IDLE if it is still Want self.inner.state.compare_and_swap( State::Want.into(), State::Idle.into(), SeqCst, ) == State::Want.into() } /// Check if the `Taker` has called `want()` without parking a task. /// /// This is safe to call outside of a futures task context, but other /// means of being notified is left to the user. #[inline] pub fn is_wanting(&self) -> bool { self.inner.state.load(SeqCst) == State::Want.into() } /// Check if the `Taker` has canceled interest without parking a task. #[inline] pub fn is_canceled(&self) -> bool { self.inner.state.load(SeqCst) == State::Closed.into() } /// Converts this into a `SharedGiver`. #[inline] pub fn shared(self) -> SharedGiver { SharedGiver { inner: self.inner, } } } impl fmt::Debug for Giver { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Giver") .field("state", &self.inner.state()) .finish() } } // ===== impl SharedGiver ====== impl SharedGiver { /// Check if the `Taker` has called `want()` without parking a task. /// /// This is safe to call outside of a futures task context, but other /// means of being notified is left to the user. #[inline] pub fn is_wanting(&self) -> bool { self.inner.state.load(SeqCst) == State::Want.into() } /// Check if the `Taker` has canceled interest without parking a task. #[inline] pub fn is_canceled(&self) -> bool { self.inner.state.load(SeqCst) == State::Closed.into() } } impl fmt::Debug for SharedGiver { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("SharedGiver") .field("state", &self.inner.state()) .finish() } } // ===== impl Taker ====== impl Taker { /// Signal to the `Giver` that the want is canceled. /// /// This is useful to tell that the channel is closed if you cannot /// drop the value yet. #[inline] pub fn cancel(&mut self) { trace!("signal: {:?}", State::Closed); self.signal(State::Closed) } /// Signal to the `Giver` that a value is wanted. #[inline] pub fn want(&mut self) { debug_assert!( self.inner.state.load(SeqCst) != State::Closed.into(), "want called after cancel" ); trace!("signal: {:?}", State::Want); self.signal(State::Want) } #[inline] fn signal(&mut self, state: State) { let old_state = self.inner.state.swap(state.into(), SeqCst).into(); match old_state { State::Idle | State::Want | State::Closed => (), State::Give => { loop { if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) { if let Some(task) = locked.take() { drop(locked); trace!("signal found waiting giver, notifying"); task.wake(); } return; } else { // if we couldn't take the lock, then a Giver has it. // The *ONLY* reason is because it is in the process of parking. // // We need to loop and take the lock so we can notify this task. } } }, } } } impl Drop for Taker { #[inline] fn drop(&mut self) { self.signal(State::Closed); } } impl fmt::Debug for Taker { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Taker") .field("state", &self.inner.state()) .finish() } } // ===== impl Closed ====== impl fmt::Debug for Closed { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Closed") .finish() } } // ===== impl Inner ====== impl Inner { #[inline] fn state(&self) -> State { self.state.load(SeqCst).into() } } // ===== impl PollFn ====== struct Want<'a>(&'a mut Giver); impl Future for Want<'_> { type Output = Result<(), Closed>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { self.0.poll_want(cx) } } #[cfg(test)] mod tests { use std::thread; use tokio_sync::oneshot; use super::*; fn block_on(f: F) -> F::Output { tokio_executor::enter() .expect("block_on enter") .block_on(f) } #[test] fn want_ready() { let (mut gv, mut tk) = new(); tk.want(); block_on(gv.want()).unwrap(); } #[test] fn want_notify_0() { let (mut gv, mut tk) = new(); let (tx, rx) = oneshot::channel(); thread::spawn(move || { tk.want(); // use a oneshot to keep this thread alive // until other thread was notified of want block_on(rx).expect("rx"); }); block_on(gv.want()).expect("want"); assert!(gv.is_wanting(), "still wanting after poll_want success"); assert!(gv.give(), "give is true when wanting"); assert!(!gv.is_wanting(), "no longer wanting after give"); assert!(!gv.is_canceled(), "give doesn't cancel"); assert!(!gv.give(), "give is false if not wanting"); tx.send(()).expect("tx"); } /* /// This tests that if the Giver moves tasks after parking, /// it will still wake up the correct task. #[test] fn want_notify_moving_tasks() { use std::sync::Arc; use futures::executor::{spawn, Notify, NotifyHandle}; struct WantNotify; impl Notify for WantNotify { fn notify(&self, _id: usize) { } } fn n() -> NotifyHandle { Arc::new(WantNotify).into() } let (mut gv, mut tk) = new(); let mut s = spawn(poll_fn(move || { gv.poll_want() })); // Register with t1 as the task::current() let t1 = n(); assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready()); thread::spawn(move || { thread::sleep(::std::time::Duration::from_millis(100)); tk.want(); }); // And now, move to a ThreadNotify task. s.into_inner().wait().expect("poll_want"); } */ #[test] fn cancel() { // explicit let (mut gv, mut tk) = new(); assert!(!gv.is_canceled()); tk.cancel(); assert!(gv.is_canceled()); block_on(gv.want()).unwrap_err(); // implicit let (mut gv, tk) = new(); assert!(!gv.is_canceled()); drop(tk); assert!(gv.is_canceled()); block_on(gv.want()).unwrap_err(); // notifies let (mut gv, tk) = new(); thread::spawn(move || { let _tk = tk; // and dropped }); block_on(gv.want()).unwrap_err(); } /* #[test] fn stress() { let nthreads = 5; let nwants = 100; for _ in 0..nthreads { let (mut gv, mut tk) = new(); let (mut tx, mut rx) = mpsc::channel(0); // rx thread thread::spawn(move || { let mut cnt = 0; poll_fn(move || { while cnt < nwants { let n = match rx.poll().expect("rx poll") { Async::Ready(n) => n.expect("rx opt"), Async::NotReady => { tk.want(); return Ok(Async::NotReady); }, }; assert_eq!(cnt, n); cnt += 1; } Ok::<_, ()>(Async::Ready(())) }).wait().expect("rx wait"); }); // tx thread thread::spawn(move || { let mut cnt = 0; let nsent = poll_fn(move || { loop { while let Ok(()) = tx.try_send(cnt) { cnt += 1; } match gv.poll_want() { Ok(Async::Ready(_)) => (), Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady), Err(_) => return Ok(Async::Ready(cnt)), } } }).wait().expect("tx wait"); assert_eq!(nsent, nwants); }).join().expect("thread join"); } } */ }