summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/sync/watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/src/sync/watch.rs')
-rw-r--r--vendor/tokio/src/sync/watch.rs591
1 files changed, 591 insertions, 0 deletions
diff --git a/vendor/tokio/src/sync/watch.rs b/vendor/tokio/src/sync/watch.rs
new file mode 100644
index 000000000..7852b0cb1
--- /dev/null
+++ b/vendor/tokio/src/sync/watch.rs
@@ -0,0 +1,591 @@
+#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
+
+//! A single-producer, multi-consumer channel that only retains the *last* sent
+//! value.
+//!
+//! This channel is useful for watching for changes to a value from multiple
+//! points in the code base, for example, changes to configuration values.
+//!
+//! # Usage
+//!
+//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
+//! and sender halves of the channel. The channel is created with an initial
+//! value. The **latest** value stored in the channel is accessed with
+//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
+//! value to sent by the [`Sender`] half.
+//!
+//! # Examples
+//!
+//! ```
+//! use tokio::sync::watch;
+//!
+//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+//! let (tx, mut rx) = watch::channel("hello");
+//!
+//! tokio::spawn(async move {
+//! while rx.changed().await.is_ok() {
+//! println!("received = {:?}", *rx.borrow());
+//! }
+//! });
+//!
+//! tx.send("world")?;
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! # Closing
+//!
+//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
+//! when all [`Receiver`] handles have been dropped. This indicates that there
+//! is no further interest in the values being produced and work can be stopped.
+//!
+//! # Thread safety
+//!
+//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
+//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
+//! handles may be moved to separate threads and also used concurrently.
+//!
+//! [`Sender`]: crate::sync::watch::Sender
+//! [`Receiver`]: crate::sync::watch::Receiver
+//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
+//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
+//! [`channel`]: crate::sync::watch::channel
+//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
+//! [`Sender::closed`]: crate::sync::watch::Sender::closed
+
+use crate::sync::notify::Notify;
+
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst};
+use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
+use std::ops;
+
+/// Receives values from the associated [`Sender`](struct@Sender).
+///
+/// Instances are created by the [`channel`](fn@channel) function.
+///
+/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
+/// wrapper.
+///
+/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
+#[derive(Debug)]
+pub struct Receiver<T> {
+ /// Pointer to the shared state
+ shared: Arc<Shared<T>>,
+
+ /// Last observed version
+ version: usize,
+}
+
+/// Sends values to the associated [`Receiver`](struct@Receiver).
+///
+/// Instances are created by the [`channel`](fn@channel) function.
+#[derive(Debug)]
+pub struct Sender<T> {
+ shared: Arc<Shared<T>>,
+}
+
+/// Returns a reference to the inner value
+///
+/// Outstanding borrows hold a read lock on the inner value. This means that
+/// long lived borrows could cause the produce half to block. It is recommended
+/// to keep the borrow as short lived as possible.
+#[derive(Debug)]
+pub struct Ref<'a, T> {
+ inner: RwLockReadGuard<'a, T>,
+}
+
+#[derive(Debug)]
+struct Shared<T> {
+ /// The most recent value
+ value: RwLock<T>,
+
+ /// The current version
+ ///
+ /// The lowest bit represents a "closed" state. The rest of the bits
+ /// represent the current version.
+ version: AtomicUsize,
+
+ /// Tracks the number of `Receiver` instances
+ ref_count_rx: AtomicUsize,
+
+ /// Notifies waiting receivers that the value changed.
+ notify_rx: Notify,
+
+ /// Notifies any task listening for `Receiver` dropped events
+ notify_tx: Notify,
+}
+
+pub mod error {
+ //! Watch error types
+
+ use std::fmt;
+
+ /// Error produced when sending a value fails.
+ #[derive(Debug)]
+ pub struct SendError<T> {
+ pub(crate) inner: T,
+ }
+
+ // ===== impl SendError =====
+
+ impl<T: fmt::Debug> fmt::Display for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+ }
+
+ impl<T: fmt::Debug> std::error::Error for SendError<T> {}
+
+ /// Error produced when receiving a change notification.
+ #[derive(Debug)]
+ pub struct RecvError(pub(super) ());
+
+ // ===== impl RecvError =====
+
+ impl fmt::Display for RecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+ }
+
+ impl std::error::Error for RecvError {}
+}
+
+const CLOSED: usize = 1;
+
+/// Creates a new watch channel, returning the "send" and "receive" handles.
+///
+/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
+/// Only the last value sent is made available to the [`Receiver`] half. All
+/// intermediate values are dropped.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::watch;
+///
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// let (tx, mut rx) = watch::channel("hello");
+///
+/// tokio::spawn(async move {
+/// while rx.changed().await.is_ok() {
+/// println!("received = {:?}", *rx.borrow());
+/// }
+/// });
+///
+/// tx.send("world")?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// [`Sender`]: struct@Sender
+/// [`Receiver`]: struct@Receiver
+pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
+ let shared = Arc::new(Shared {
+ value: RwLock::new(init),
+ version: AtomicUsize::new(0),
+ ref_count_rx: AtomicUsize::new(1),
+ notify_rx: Notify::new(),
+ notify_tx: Notify::new(),
+ });
+
+ let tx = Sender {
+ shared: shared.clone(),
+ };
+
+ let rx = Receiver { shared, version: 0 };
+
+ (tx, rx)
+}
+
+impl<T> Receiver<T> {
+ fn from_shared(version: usize, shared: Arc<Shared<T>>) -> Self {
+ // No synchronization necessary as this is only used as a counter and
+ // not memory access.
+ shared.ref_count_rx.fetch_add(1, Relaxed);
+
+ Self { shared, version }
+ }
+
+ /// Returns a reference to the most recently sent value.
+ ///
+ /// This method does not mark the returned value as seen, so future calls to
+ /// [`changed`] may return immediately even if you have already seen the
+ /// value with a call to `borrow`.
+ ///
+ /// Outstanding borrows hold a read lock. This means that long lived borrows
+ /// could cause the send half to block. It is recommended to keep the borrow
+ /// as short lived as possible.
+ ///
+ /// [`changed`]: Receiver::changed
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// let (_, rx) = watch::channel("hello");
+ /// assert_eq!(*rx.borrow(), "hello");
+ /// ```
+ pub fn borrow(&self) -> Ref<'_, T> {
+ let inner = self.shared.value.read().unwrap();
+ Ref { inner }
+ }
+
+ /// Returns a reference to the most recently sent value and mark that value
+ /// as seen.
+ ///
+ /// This method marks the value as seen, so [`changed`] will not return
+ /// immediately if the newest value is one previously returned by
+ /// `borrow_and_update`.
+ ///
+ /// Outstanding borrows hold a read lock. This means that long lived borrows
+ /// could cause the send half to block. It is recommended to keep the borrow
+ /// as short lived as possible.
+ ///
+ /// [`changed`]: Receiver::changed
+ pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
+ let inner = self.shared.value.read().unwrap();
+ self.version = self.shared.version.load(SeqCst) & !CLOSED;
+ Ref { inner }
+ }
+
+ /// Wait for a change notification, then mark the newest value as seen.
+ ///
+ /// If the newest value in the channel has not yet been marked seen when
+ /// this method is called, the method marks that value seen and returns
+ /// immediately. If the newest value has already been marked seen, then the
+ /// method sleeps until a new message is sent by the [`Sender`] connected to
+ /// this `Receiver`, or until the [`Sender`] is dropped.
+ ///
+ /// This method returns an error if and only if the [`Sender`] is dropped.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If you use it as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, then it is guaranteed that no values have been marked
+ /// seen by this call to `changed`.
+ ///
+ /// [`Sender`]: struct@Sender
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = watch::channel("hello");
+ ///
+ /// tokio::spawn(async move {
+ /// tx.send("goodbye").unwrap();
+ /// });
+ ///
+ /// assert!(rx.changed().await.is_ok());
+ /// assert_eq!(*rx.borrow(), "goodbye");
+ ///
+ /// // The `tx` handle has been dropped
+ /// assert!(rx.changed().await.is_err());
+ /// }
+ /// ```
+ pub async fn changed(&mut self) -> Result<(), error::RecvError> {
+ loop {
+ // In order to avoid a race condition, we first request a notification,
+ // **then** check the current value's version. If a new version exists,
+ // the notification request is dropped.
+ let notified = self.shared.notify_rx.notified();
+
+ if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
+ return ret;
+ }
+
+ notified.await;
+ // loop around again in case the wake-up was spurious
+ }
+ }
+
+ cfg_process_driver! {
+ pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
+ maybe_changed(&self.shared, &mut self.version)
+ }
+ }
+}
+
+fn maybe_changed<T>(
+ shared: &Shared<T>,
+ version: &mut usize,
+) -> Option<Result<(), error::RecvError>> {
+ // Load the version from the state
+ let state = shared.version.load(SeqCst);
+ let new_version = state & !CLOSED;
+
+ if *version != new_version {
+ // Observe the new version and return
+ *version = new_version;
+ return Some(Ok(()));
+ }
+
+ if CLOSED == state & CLOSED {
+ // All receivers have dropped.
+ return Some(Err(error::RecvError(())));
+ }
+
+ None
+}
+
+impl<T> Clone for Receiver<T> {
+ fn clone(&self) -> Self {
+ let version = self.version;
+ let shared = self.shared.clone();
+
+ Self::from_shared(version, shared)
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ // No synchronization necessary as this is only used as a counter and
+ // not memory access.
+ if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
+ // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
+ self.shared.notify_tx.notify_waiters();
+ }
+ }
+}
+
+impl<T> Sender<T> {
+ /// Sends a new value via the channel, notifying all receivers.
+ pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
+ // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
+ if 0 == self.shared.ref_count_rx.load(Relaxed) {
+ return Err(error::SendError { inner: value });
+ }
+
+ {
+ // Acquire the write lock and update the value.
+ let mut lock = self.shared.value.write().unwrap();
+ *lock = value;
+
+ // Update the version. 2 is used so that the CLOSED bit is not set.
+ self.shared.version.fetch_add(2, SeqCst);
+
+ // Release the write lock.
+ //
+ // Incrementing the version counter while holding the lock ensures
+ // that receivers are able to figure out the version number of the
+ // value they are currently looking at.
+ drop(lock);
+ }
+
+ // Notify all watchers
+ self.shared.notify_rx.notify_waiters();
+
+ Ok(())
+ }
+
+ /// Returns a reference to the most recently sent value
+ ///
+ /// Outstanding borrows hold a read lock. This means that long lived borrows
+ /// could cause the send half to block. It is recommended to keep the borrow
+ /// as short lived as possible.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// let (tx, _) = watch::channel("hello");
+ /// assert_eq!(*tx.borrow(), "hello");
+ /// ```
+ pub fn borrow(&self) -> Ref<'_, T> {
+ let inner = self.shared.value.read().unwrap();
+ Ref { inner }
+ }
+
+ /// Checks if the channel has been closed. This happens when all receivers
+ /// have dropped.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// let (tx, rx) = tokio::sync::watch::channel(());
+ /// assert!(!tx.is_closed());
+ ///
+ /// drop(rx);
+ /// assert!(tx.is_closed());
+ /// ```
+ pub fn is_closed(&self) -> bool {
+ self.shared.ref_count_rx.load(Relaxed) == 0
+ }
+
+ /// Completes when all receivers have dropped.
+ ///
+ /// This allows the producer to get notified when interest in the produced
+ /// values is canceled and immediately stop doing work.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once the channel is closed, it stays closed
+ /// forever and all future calls to `closed` will return immediately.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx) = watch::channel("hello");
+ ///
+ /// tokio::spawn(async move {
+ /// // use `rx`
+ /// drop(rx);
+ /// });
+ ///
+ /// // Waits for `rx` to drop
+ /// tx.closed().await;
+ /// println!("the `rx` handles dropped")
+ /// }
+ /// ```
+ pub async fn closed(&self) {
+ let notified = self.shared.notify_tx.notified();
+
+ if self.shared.ref_count_rx.load(Relaxed) == 0 {
+ return;
+ }
+
+ notified.await;
+ debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed));
+ }
+
+ cfg_signal_internal! {
+ pub(crate) fn subscribe(&self) -> Receiver<T> {
+ let shared = self.shared.clone();
+ let version = shared.version.load(SeqCst);
+
+ Receiver::from_shared(version, shared)
+ }
+ }
+
+ /// Returns the number of receivers that currently exist
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx1) = watch::channel("hello");
+ ///
+ /// assert_eq!(1, tx.receiver_count());
+ ///
+ /// let mut _rx2 = rx1.clone();
+ ///
+ /// assert_eq!(2, tx.receiver_count());
+ /// }
+ /// ```
+ pub fn receiver_count(&self) -> usize {
+ self.shared.ref_count_rx.load(Relaxed)
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ self.shared.version.fetch_or(CLOSED, SeqCst);
+ self.shared.notify_rx.notify_waiters();
+ }
+}
+
+// ===== impl Ref =====
+
+impl<T> ops::Deref for Ref<'_, T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ self.inner.deref()
+ }
+}
+
+#[cfg(all(test, loom))]
+mod tests {
+ use futures::future::FutureExt;
+ use loom::thread;
+
+ // test for https://github.com/tokio-rs/tokio/issues/3168
+ #[test]
+ fn watch_spurious_wakeup() {
+ loom::model(|| {
+ let (send, mut recv) = crate::sync::watch::channel(0i32);
+
+ send.send(1).unwrap();
+
+ let send_thread = thread::spawn(move || {
+ send.send(2).unwrap();
+ send
+ });
+
+ recv.changed().now_or_never();
+
+ let send = send_thread.join().unwrap();
+ let recv_thread = thread::spawn(move || {
+ recv.changed().now_or_never();
+ recv.changed().now_or_never();
+ recv
+ });
+
+ send.send(3).unwrap();
+
+ let mut recv = recv_thread.join().unwrap();
+ let send_thread = thread::spawn(move || {
+ send.send(2).unwrap();
+ });
+
+ recv.changed().now_or_never();
+
+ send_thread.join().unwrap();
+ });
+ }
+
+ #[test]
+ fn watch_borrow() {
+ loom::model(|| {
+ let (send, mut recv) = crate::sync::watch::channel(0i32);
+
+ assert!(send.borrow().eq(&0));
+ assert!(recv.borrow().eq(&0));
+
+ send.send(1).unwrap();
+ assert!(send.borrow().eq(&1));
+
+ let send_thread = thread::spawn(move || {
+ send.send(2).unwrap();
+ send
+ });
+
+ recv.changed().now_or_never();
+
+ let send = send_thread.join().unwrap();
+ let recv_thread = thread::spawn(move || {
+ recv.changed().now_or_never();
+ recv.changed().now_or_never();
+ recv
+ });
+
+ send.send(3).unwrap();
+
+ let recv = recv_thread.join().unwrap();
+ assert!(recv.borrow().eq(&3));
+ assert!(send.borrow().eq(&3));
+
+ send.send(2).unwrap();
+
+ thread::spawn(move || {
+ assert!(recv.borrow().eq(&2));
+ });
+ assert!(send.borrow().eq(&2));
+ });
+ }
+}