diff options
Diffstat (limited to 'vendor/tokio/src/sync/watch.rs')
-rw-r--r-- | vendor/tokio/src/sync/watch.rs | 591 |
1 files changed, 591 insertions, 0 deletions
diff --git a/vendor/tokio/src/sync/watch.rs b/vendor/tokio/src/sync/watch.rs new file mode 100644 index 000000000..7852b0cb1 --- /dev/null +++ b/vendor/tokio/src/sync/watch.rs @@ -0,0 +1,591 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] + +//! A single-producer, multi-consumer channel that only retains the *last* sent +//! value. +//! +//! This channel is useful for watching for changes to a value from multiple +//! points in the code base, for example, changes to configuration values. +//! +//! # Usage +//! +//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer +//! and sender halves of the channel. The channel is created with an initial +//! value. The **latest** value stored in the channel is accessed with +//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new +//! value to sent by the [`Sender`] half. +//! +//! # Examples +//! +//! ``` +//! use tokio::sync::watch; +//! +//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +//! let (tx, mut rx) = watch::channel("hello"); +//! +//! tokio::spawn(async move { +//! while rx.changed().await.is_ok() { +//! println!("received = {:?}", *rx.borrow()); +//! } +//! }); +//! +//! tx.send("world")?; +//! # Ok(()) +//! # } +//! ``` +//! +//! # Closing +//! +//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect +//! when all [`Receiver`] handles have been dropped. This indicates that there +//! is no further interest in the values being produced and work can be stopped. +//! +//! # Thread safety +//! +//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other +//! threads and can be used in a concurrent environment. Clones of [`Receiver`] +//! handles may be moved to separate threads and also used concurrently. +//! +//! [`Sender`]: crate::sync::watch::Sender +//! [`Receiver`]: crate::sync::watch::Receiver +//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed +//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow +//! [`channel`]: crate::sync::watch::channel +//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed +//! [`Sender::closed`]: crate::sync::watch::Sender::closed + +use crate::sync::notify::Notify; + +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst}; +use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; +use std::ops; + +/// Receives values from the associated [`Sender`](struct@Sender). +/// +/// Instances are created by the [`channel`](fn@channel) function. +/// +/// To turn this receiver into a `Stream`, you can use the [`WatchStream`] +/// wrapper. +/// +/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html +#[derive(Debug)] +pub struct Receiver<T> { + /// Pointer to the shared state + shared: Arc<Shared<T>>, + + /// Last observed version + version: usize, +} + +/// Sends values to the associated [`Receiver`](struct@Receiver). +/// +/// Instances are created by the [`channel`](fn@channel) function. +#[derive(Debug)] +pub struct Sender<T> { + shared: Arc<Shared<T>>, +} + +/// Returns a reference to the inner value +/// +/// Outstanding borrows hold a read lock on the inner value. This means that +/// long lived borrows could cause the produce half to block. It is recommended +/// to keep the borrow as short lived as possible. +#[derive(Debug)] +pub struct Ref<'a, T> { + inner: RwLockReadGuard<'a, T>, +} + +#[derive(Debug)] +struct Shared<T> { + /// The most recent value + value: RwLock<T>, + + /// The current version + /// + /// The lowest bit represents a "closed" state. The rest of the bits + /// represent the current version. + version: AtomicUsize, + + /// Tracks the number of `Receiver` instances + ref_count_rx: AtomicUsize, + + /// Notifies waiting receivers that the value changed. + notify_rx: Notify, + + /// Notifies any task listening for `Receiver` dropped events + notify_tx: Notify, +} + +pub mod error { + //! Watch error types + + use std::fmt; + + /// Error produced when sending a value fails. + #[derive(Debug)] + pub struct SendError<T> { + pub(crate) inner: T, + } + + // ===== impl SendError ===== + + impl<T: fmt::Debug> fmt::Display for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } + } + + impl<T: fmt::Debug> std::error::Error for SendError<T> {} + + /// Error produced when receiving a change notification. + #[derive(Debug)] + pub struct RecvError(pub(super) ()); + + // ===== impl RecvError ===== + + impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } + } + + impl std::error::Error for RecvError {} +} + +const CLOSED: usize = 1; + +/// Creates a new watch channel, returning the "send" and "receive" handles. +/// +/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles. +/// Only the last value sent is made available to the [`Receiver`] half. All +/// intermediate values are dropped. +/// +/// # Examples +/// +/// ``` +/// use tokio::sync::watch; +/// +/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +/// let (tx, mut rx) = watch::channel("hello"); +/// +/// tokio::spawn(async move { +/// while rx.changed().await.is_ok() { +/// println!("received = {:?}", *rx.borrow()); +/// } +/// }); +/// +/// tx.send("world")?; +/// # Ok(()) +/// # } +/// ``` +/// +/// [`Sender`]: struct@Sender +/// [`Receiver`]: struct@Receiver +pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) { + let shared = Arc::new(Shared { + value: RwLock::new(init), + version: AtomicUsize::new(0), + ref_count_rx: AtomicUsize::new(1), + notify_rx: Notify::new(), + notify_tx: Notify::new(), + }); + + let tx = Sender { + shared: shared.clone(), + }; + + let rx = Receiver { shared, version: 0 }; + + (tx, rx) +} + +impl<T> Receiver<T> { + fn from_shared(version: usize, shared: Arc<Shared<T>>) -> Self { + // No synchronization necessary as this is only used as a counter and + // not memory access. + shared.ref_count_rx.fetch_add(1, Relaxed); + + Self { shared, version } + } + + /// Returns a reference to the most recently sent value. + /// + /// This method does not mark the returned value as seen, so future calls to + /// [`changed`] may return immediately even if you have already seen the + /// value with a call to `borrow`. + /// + /// Outstanding borrows hold a read lock. This means that long lived borrows + /// could cause the send half to block. It is recommended to keep the borrow + /// as short lived as possible. + /// + /// [`changed`]: Receiver::changed + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// let (_, rx) = watch::channel("hello"); + /// assert_eq!(*rx.borrow(), "hello"); + /// ``` + pub fn borrow(&self) -> Ref<'_, T> { + let inner = self.shared.value.read().unwrap(); + Ref { inner } + } + + /// Returns a reference to the most recently sent value and mark that value + /// as seen. + /// + /// This method marks the value as seen, so [`changed`] will not return + /// immediately if the newest value is one previously returned by + /// `borrow_and_update`. + /// + /// Outstanding borrows hold a read lock. This means that long lived borrows + /// could cause the send half to block. It is recommended to keep the borrow + /// as short lived as possible. + /// + /// [`changed`]: Receiver::changed + pub fn borrow_and_update(&mut self) -> Ref<'_, T> { + let inner = self.shared.value.read().unwrap(); + self.version = self.shared.version.load(SeqCst) & !CLOSED; + Ref { inner } + } + + /// Wait for a change notification, then mark the newest value as seen. + /// + /// If the newest value in the channel has not yet been marked seen when + /// this method is called, the method marks that value seen and returns + /// immediately. If the newest value has already been marked seen, then the + /// method sleeps until a new message is sent by the [`Sender`] connected to + /// this `Receiver`, or until the [`Sender`] is dropped. + /// + /// This method returns an error if and only if the [`Sender`] is dropped. + /// + /// # Cancel safety + /// + /// This method is cancel safe. If you use it as the event in a + /// [`tokio::select!`](crate::select) statement and some other branch + /// completes first, then it is guaranteed that no values have been marked + /// seen by this call to `changed`. + /// + /// [`Sender`]: struct@Sender + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = watch::channel("hello"); + /// + /// tokio::spawn(async move { + /// tx.send("goodbye").unwrap(); + /// }); + /// + /// assert!(rx.changed().await.is_ok()); + /// assert_eq!(*rx.borrow(), "goodbye"); + /// + /// // The `tx` handle has been dropped + /// assert!(rx.changed().await.is_err()); + /// } + /// ``` + pub async fn changed(&mut self) -> Result<(), error::RecvError> { + loop { + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. + let notified = self.shared.notify_rx.notified(); + + if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { + return ret; + } + + notified.await; + // loop around again in case the wake-up was spurious + } + } + + cfg_process_driver! { + pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> { + maybe_changed(&self.shared, &mut self.version) + } + } +} + +fn maybe_changed<T>( + shared: &Shared<T>, + version: &mut usize, +) -> Option<Result<(), error::RecvError>> { + // Load the version from the state + let state = shared.version.load(SeqCst); + let new_version = state & !CLOSED; + + if *version != new_version { + // Observe the new version and return + *version = new_version; + return Some(Ok(())); + } + + if CLOSED == state & CLOSED { + // All receivers have dropped. + return Some(Err(error::RecvError(()))); + } + + None +} + +impl<T> Clone for Receiver<T> { + fn clone(&self) -> Self { + let version = self.version; + let shared = self.shared.clone(); + + Self::from_shared(version, shared) + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + // No synchronization necessary as this is only used as a counter and + // not memory access. + if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) { + // This is the last `Receiver` handle, tasks waiting on `Sender::closed()` + self.shared.notify_tx.notify_waiters(); + } + } +} + +impl<T> Sender<T> { + /// Sends a new value via the channel, notifying all receivers. + pub fn send(&self, value: T) -> Result<(), error::SendError<T>> { + // This is pretty much only useful as a hint anyway, so synchronization isn't critical. + if 0 == self.shared.ref_count_rx.load(Relaxed) { + return Err(error::SendError { inner: value }); + } + + { + // Acquire the write lock and update the value. + let mut lock = self.shared.value.write().unwrap(); + *lock = value; + + // Update the version. 2 is used so that the CLOSED bit is not set. + self.shared.version.fetch_add(2, SeqCst); + + // Release the write lock. + // + // Incrementing the version counter while holding the lock ensures + // that receivers are able to figure out the version number of the + // value they are currently looking at. + drop(lock); + } + + // Notify all watchers + self.shared.notify_rx.notify_waiters(); + + Ok(()) + } + + /// Returns a reference to the most recently sent value + /// + /// Outstanding borrows hold a read lock. This means that long lived borrows + /// could cause the send half to block. It is recommended to keep the borrow + /// as short lived as possible. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// let (tx, _) = watch::channel("hello"); + /// assert_eq!(*tx.borrow(), "hello"); + /// ``` + pub fn borrow(&self) -> Ref<'_, T> { + let inner = self.shared.value.read().unwrap(); + Ref { inner } + } + + /// Checks if the channel has been closed. This happens when all receivers + /// have dropped. + /// + /// # Examples + /// + /// ``` + /// let (tx, rx) = tokio::sync::watch::channel(()); + /// assert!(!tx.is_closed()); + /// + /// drop(rx); + /// assert!(tx.is_closed()); + /// ``` + pub fn is_closed(&self) -> bool { + self.shared.ref_count_rx.load(Relaxed) == 0 + } + + /// Completes when all receivers have dropped. + /// + /// This allows the producer to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once the channel is closed, it stays closed + /// forever and all future calls to `closed` will return immediately. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = watch::channel("hello"); + /// + /// tokio::spawn(async move { + /// // use `rx` + /// drop(rx); + /// }); + /// + /// // Waits for `rx` to drop + /// tx.closed().await; + /// println!("the `rx` handles dropped") + /// } + /// ``` + pub async fn closed(&self) { + let notified = self.shared.notify_tx.notified(); + + if self.shared.ref_count_rx.load(Relaxed) == 0 { + return; + } + + notified.await; + debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed)); + } + + cfg_signal_internal! { + pub(crate) fn subscribe(&self) -> Receiver<T> { + let shared = self.shared.clone(); + let version = shared.version.load(SeqCst); + + Receiver::from_shared(version, shared) + } + } + + /// Returns the number of receivers that currently exist + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx1) = watch::channel("hello"); + /// + /// assert_eq!(1, tx.receiver_count()); + /// + /// let mut _rx2 = rx1.clone(); + /// + /// assert_eq!(2, tx.receiver_count()); + /// } + /// ``` + pub fn receiver_count(&self) -> usize { + self.shared.ref_count_rx.load(Relaxed) + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + self.shared.version.fetch_or(CLOSED, SeqCst); + self.shared.notify_rx.notify_waiters(); + } +} + +// ===== impl Ref ===== + +impl<T> ops::Deref for Ref<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + self.inner.deref() + } +} + +#[cfg(all(test, loom))] +mod tests { + use futures::future::FutureExt; + use loom::thread; + + // test for https://github.com/tokio-rs/tokio/issues/3168 + #[test] + fn watch_spurious_wakeup() { + loom::model(|| { + let (send, mut recv) = crate::sync::watch::channel(0i32); + + send.send(1).unwrap(); + + let send_thread = thread::spawn(move || { + send.send(2).unwrap(); + send + }); + + recv.changed().now_or_never(); + + let send = send_thread.join().unwrap(); + let recv_thread = thread::spawn(move || { + recv.changed().now_or_never(); + recv.changed().now_or_never(); + recv + }); + + send.send(3).unwrap(); + + let mut recv = recv_thread.join().unwrap(); + let send_thread = thread::spawn(move || { + send.send(2).unwrap(); + }); + + recv.changed().now_or_never(); + + send_thread.join().unwrap(); + }); + } + + #[test] + fn watch_borrow() { + loom::model(|| { + let (send, mut recv) = crate::sync::watch::channel(0i32); + + assert!(send.borrow().eq(&0)); + assert!(recv.borrow().eq(&0)); + + send.send(1).unwrap(); + assert!(send.borrow().eq(&1)); + + let send_thread = thread::spawn(move || { + send.send(2).unwrap(); + send + }); + + recv.changed().now_or_never(); + + let send = send_thread.join().unwrap(); + let recv_thread = thread::spawn(move || { + recv.changed().now_or_never(); + recv.changed().now_or_never(); + recv + }); + + send.send(3).unwrap(); + + let recv = recv_thread.join().unwrap(); + assert!(recv.borrow().eq(&3)); + assert!(send.borrow().eq(&3)); + + send.send(2).unwrap(); + + thread::spawn(move || { + assert!(recv.borrow().eq(&2)); + }); + assert!(send.borrow().eq(&2)); + }); + } +} |