#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] //! A single-producer, multi-consumer channel that only retains the *last* sent //! value. //! //! This channel is useful for watching for changes to a value from multiple //! points in the code base, for example, changes to configuration values. //! //! # Usage //! //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer //! and consumer halves of the channel. The channel is created with an initial //! value. The **latest** value stored in the channel is accessed with //! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new //! value to be sent by the [`Sender`] half. //! //! # Examples //! //! ``` //! use tokio::sync::watch; //! //! # async fn dox() -> Result<(), Box> { //! let (tx, mut rx) = watch::channel("hello"); //! //! tokio::spawn(async move { //! while rx.changed().await.is_ok() { //! println!("received = {:?}", *rx.borrow()); //! } //! }); //! //! tx.send("world")?; //! # Ok(()) //! # } //! ``` //! //! # Closing //! //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect //! when all [`Receiver`] handles have been dropped. This indicates that there //! is no further interest in the values being produced and work can be stopped. //! //! The value in the channel will not be dropped until the sender and all receivers //! have been dropped. //! //! # Thread safety //! //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other //! threads and can be used in a concurrent environment. Clones of [`Receiver`] //! handles may be moved to separate threads and also used concurrently. //! //! [`Sender`]: crate::sync::watch::Sender //! [`Receiver`]: crate::sync::watch::Receiver //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow //! [`channel`]: crate::sync::watch::channel //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed //! [`Sender::closed`]: crate::sync::watch::Sender::closed use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::Ordering::Relaxed; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; use std::fmt; use std::mem; use std::ops; use std::panic; /// Receives values from the associated [`Sender`](struct@Sender). /// /// Instances are created by the [`channel`](fn@channel) function. /// /// To turn this receiver into a `Stream`, you can use the [`WatchStream`] /// wrapper. /// /// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html #[derive(Debug)] pub struct Receiver { /// Pointer to the shared state shared: Arc>, /// Last observed version version: Version, } /// Sends values to the associated [`Receiver`](struct@Receiver). /// /// Instances are created by the [`channel`](fn@channel) function. #[derive(Debug)] pub struct Sender { shared: Arc>, } /// Returns a reference to the inner value. /// /// Outstanding borrows hold a read lock on the inner value. This means that /// long-lived borrows could cause the producer half to block. It is recommended /// to keep the borrow as short-lived as possible. Additionally, if you are /// running in an environment that allows `!Send` futures, you must ensure that /// the returned `Ref` type is never held alive across an `.await` point, /// otherwise, it can lead to a deadlock. /// /// The priority policy of the lock is dependent on the underlying lock /// implementation, and this type does not guarantee that any particular policy /// will be used. In particular, a producer which is waiting to acquire the lock /// in `send` might or might not block concurrent calls to `borrow`, e.g.: /// ///
Potential deadlock example /// /// ```text /// // Task 1 (on thread A) | // Task 2 (on thread B) /// let _ref1 = rx.borrow(); | /// | // will block /// | let _ = tx.send(()); /// // may deadlock | /// let _ref2 = rx.borrow(); | /// ``` ///
#[derive(Debug)] pub struct Ref<'a, T> { inner: RwLockReadGuard<'a, T>, has_changed: bool, } impl<'a, T> Ref<'a, T> { /// Indicates if the borrowed value is considered as _changed_ since the last /// time it has been marked as seen. /// /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed. /// /// When borrowed from the [`Sender`] this function will always return `false`. /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = watch::channel("hello"); /// /// tx.send("goodbye").unwrap(); /// // The sender does never consider the value as changed. /// assert!(!tx.borrow().has_changed()); /// /// // Drop the sender immediately, just for testing purposes. /// drop(tx); /// /// // Even if the sender has already been dropped... /// assert!(rx.has_changed().is_err()); /// // ...the modified value is still readable and detected as changed. /// assert_eq!(*rx.borrow(), "goodbye"); /// assert!(rx.borrow().has_changed()); /// /// // Read the changed value and mark it as seen. /// { /// let received = rx.borrow_and_update(); /// assert_eq!(*received, "goodbye"); /// assert!(received.has_changed()); /// // Release the read lock when leaving this scope. /// } /// /// // Now the value has already been marked as seen and could /// // never be modified again (after the sender has been dropped). /// assert!(!rx.borrow().has_changed()); /// } /// ``` pub fn has_changed(&self) -> bool { self.has_changed } } struct Shared { /// The most recent value. value: RwLock, /// The current version. /// /// The lowest bit represents a "closed" state. The rest of the bits /// represent the current version. state: AtomicState, /// Tracks the number of `Receiver` instances. ref_count_rx: AtomicUsize, /// Notifies waiting receivers that the value changed. notify_rx: big_notify::BigNotify, /// Notifies any task listening for `Receiver` dropped events. notify_tx: Notify, } impl fmt::Debug for Shared { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let state = self.state.load(); f.debug_struct("Shared") .field("value", &self.value) .field("version", &state.version()) .field("is_closed", &state.is_closed()) .field("ref_count_rx", &self.ref_count_rx) .finish() } } pub mod error { //! Watch error types. use std::fmt; /// Error produced when sending a value fails. #[derive(PartialEq, Eq, Clone, Copy)] pub struct SendError(pub T); // ===== impl SendError ===== impl fmt::Debug for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SendError").finish_non_exhaustive() } } impl fmt::Display for SendError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { write!(fmt, "channel closed") } } impl std::error::Error for SendError {} /// Error produced when receiving a change notification. #[derive(Debug, Clone)] pub struct RecvError(pub(super) ()); // ===== impl RecvError ===== impl fmt::Display for RecvError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { write!(fmt, "channel closed") } } impl std::error::Error for RecvError {} } mod big_notify { use super::*; use crate::sync::notify::Notified; // To avoid contention on the lock inside the `Notify`, we store multiple // copies of it. Then, we use either circular access or randomness to spread // out threads over different `Notify` objects. // // Some simple benchmarks show that randomness performs slightly better than // circular access (probably due to contention on `next`), so we prefer to // use randomness when Tokio is compiled with a random number generator. // // When the random number generator is not available, we fall back to // circular access. pub(super) struct BigNotify { #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))] next: AtomicUsize, inner: [Notify; 8], } impl BigNotify { pub(super) fn new() -> Self { Self { #[cfg(not(all( not(loom), feature = "sync", any(feature = "rt", feature = "macros") )))] next: AtomicUsize::new(0), inner: Default::default(), } } pub(super) fn notify_waiters(&self) { for notify in &self.inner { notify.notify_waiters(); } } /// This function implements the case where randomness is not available. #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))] pub(super) fn notified(&self) -> Notified<'_> { let i = self.next.fetch_add(1, Relaxed) % 8; self.inner[i].notified() } /// This function implements the case where randomness is available. #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))] pub(super) fn notified(&self) -> Notified<'_> { let i = crate::runtime::context::thread_rng_n(8) as usize; self.inner[i].notified() } } } use self::state::{AtomicState, Version}; mod state { use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::Ordering::SeqCst; const CLOSED: usize = 1; /// The version part of the state. The lowest bit is always zero. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub(super) struct Version(usize); /// Snapshot of the state. The first bit is used as the CLOSED bit. /// The remaining bits are used as the version. /// /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all /// receivers does not set it. #[derive(Copy, Clone, Debug)] pub(super) struct StateSnapshot(usize); /// The state stored in an atomic integer. #[derive(Debug)] pub(super) struct AtomicState(AtomicUsize); impl Version { /// Get the initial version when creating the channel. pub(super) fn initial() -> Self { Version(0) } } impl StateSnapshot { /// Extract the version from the state. pub(super) fn version(self) -> Version { Version(self.0 & !CLOSED) } /// Is the closed bit set? pub(super) fn is_closed(self) -> bool { (self.0 & CLOSED) == CLOSED } } impl AtomicState { /// Create a new `AtomicState` that is not closed and which has the /// version set to `Version::initial()`. pub(super) fn new() -> Self { AtomicState(AtomicUsize::new(0)) } /// Load the current value of the state. pub(super) fn load(&self) -> StateSnapshot { StateSnapshot(self.0.load(SeqCst)) } /// Increment the version counter. pub(super) fn increment_version(&self) { // Increment by two to avoid touching the CLOSED bit. self.0.fetch_add(2, SeqCst); } /// Set the closed bit in the state. pub(super) fn set_closed(&self) { self.0.fetch_or(CLOSED, SeqCst); } } } /// Creates a new watch channel, returning the "send" and "receive" handles. /// /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles. /// Only the last value sent is made available to the [`Receiver`] half. All /// intermediate values are dropped. /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// # async fn dox() -> Result<(), Box> { /// let (tx, mut rx) = watch::channel("hello"); /// /// tokio::spawn(async move { /// while rx.changed().await.is_ok() { /// println!("received = {:?}", *rx.borrow()); /// } /// }); /// /// tx.send("world")?; /// # Ok(()) /// # } /// ``` /// /// [`Sender`]: struct@Sender /// [`Receiver`]: struct@Receiver pub fn channel(init: T) -> (Sender, Receiver) { let shared = Arc::new(Shared { value: RwLock::new(init), state: AtomicState::new(), ref_count_rx: AtomicUsize::new(1), notify_rx: big_notify::BigNotify::new(), notify_tx: Notify::new(), }); let tx = Sender { shared: shared.clone(), }; let rx = Receiver { shared, version: Version::initial(), }; (tx, rx) } impl Receiver { fn from_shared(version: Version, shared: Arc>) -> Self { // No synchronization necessary as this is only used as a counter and // not memory access. shared.ref_count_rx.fetch_add(1, Relaxed); Self { shared, version } } /// Returns a reference to the most recently sent value. /// /// This method does not mark the returned value as seen, so future calls to /// [`changed`] may return immediately even if you have already seen the /// value with a call to `borrow`. /// /// Outstanding borrows hold a read lock on the inner value. This means that /// long-lived borrows could cause the producer half to block. It is recommended /// to keep the borrow as short-lived as possible. Additionally, if you are /// running in an environment that allows `!Send` futures, you must ensure that /// the returned `Ref` type is never held alive across an `.await` point, /// otherwise, it can lead to a deadlock. /// /// The priority policy of the lock is dependent on the underlying lock /// implementation, and this type does not guarantee that any particular policy /// will be used. In particular, a producer which is waiting to acquire the lock /// in `send` might or might not block concurrent calls to `borrow`, e.g.: /// ///
Potential deadlock example /// /// ```text /// // Task 1 (on thread A) | // Task 2 (on thread B) /// let _ref1 = rx.borrow(); | /// | // will block /// | let _ = tx.send(()); /// // may deadlock | /// let _ref2 = rx.borrow(); | /// ``` ///
/// /// [`changed`]: Receiver::changed /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// let (_, rx) = watch::channel("hello"); /// assert_eq!(*rx.borrow(), "hello"); /// ``` pub fn borrow(&self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. let new_version = self.shared.state.load().version(); let has_changed = self.version != new_version; Ref { inner, has_changed } } /// Returns a reference to the most recently sent value and marks that value /// as seen. /// /// This method marks the current value as seen. Subsequent calls to [`changed`] /// will not return immediately until the [`Sender`] has modified the shared /// value again. /// /// Outstanding borrows hold a read lock on the inner value. This means that /// long-lived borrows could cause the producer half to block. It is recommended /// to keep the borrow as short-lived as possible. Additionally, if you are /// running in an environment that allows `!Send` futures, you must ensure that /// the returned `Ref` type is never held alive across an `.await` point, /// otherwise, it can lead to a deadlock. /// /// The priority policy of the lock is dependent on the underlying lock /// implementation, and this type does not guarantee that any particular policy /// will be used. In particular, a producer which is waiting to acquire the lock /// in `send` might or might not block concurrent calls to `borrow`, e.g.: /// ///
Potential deadlock example /// /// ```text /// // Task 1 (on thread A) | // Task 2 (on thread B) /// let _ref1 = rx1.borrow_and_update(); | /// | // will block /// | let _ = tx.send(()); /// // may deadlock | /// let _ref2 = rx2.borrow_and_update(); | /// ``` ///
/// /// [`changed`]: Receiver::changed pub fn borrow_and_update(&mut self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. let new_version = self.shared.state.load().version(); let has_changed = self.version != new_version; // Mark the shared value as seen by updating the version self.version = new_version; Ref { inner, has_changed } } /// Checks if this channel contains a message that this receiver has not yet /// seen. The new value is not marked as seen. /// /// Although this method is called `has_changed`, it does not check new /// messages for equality, so this call will return true even if the new /// message is equal to the old message. /// /// Returns an error if the channel has been closed. /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = watch::channel("hello"); /// /// tx.send("goodbye").unwrap(); /// /// assert!(rx.has_changed().unwrap()); /// assert_eq!(*rx.borrow_and_update(), "goodbye"); /// /// // The value has been marked as seen /// assert!(!rx.has_changed().unwrap()); /// /// drop(tx); /// // The `tx` handle has been dropped /// assert!(rx.has_changed().is_err()); /// } /// ``` pub fn has_changed(&self) -> Result { // Load the version from the state let state = self.shared.state.load(); if state.is_closed() { // The sender has dropped. return Err(error::RecvError(())); } let new_version = state.version(); Ok(self.version != new_version) } /// Waits for a change notification, then marks the newest value as seen. /// /// If the newest value in the channel has not yet been marked seen when /// this method is called, the method marks that value seen and returns /// immediately. If the newest value has already been marked seen, then the /// method sleeps until a new message is sent by the [`Sender`] connected to /// this `Receiver`, or until the [`Sender`] is dropped. /// /// This method returns an error if and only if the [`Sender`] is dropped. /// /// # Cancel safety /// /// This method is cancel safe. If you use it as the event in a /// [`tokio::select!`](crate::select) statement and some other branch /// completes first, then it is guaranteed that no values have been marked /// seen by this call to `changed`. /// /// [`Sender`]: struct@Sender /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = watch::channel("hello"); /// /// tokio::spawn(async move { /// tx.send("goodbye").unwrap(); /// }); /// /// assert!(rx.changed().await.is_ok()); /// assert_eq!(*rx.borrow(), "goodbye"); /// /// // The `tx` handle has been dropped /// assert!(rx.changed().await.is_err()); /// } /// ``` pub async fn changed(&mut self) -> Result<(), error::RecvError> { changed_impl(&self.shared, &mut self.version).await } /// Waits for a value that satisifes the provided condition. /// /// This method will call the provided closure whenever something is sent on /// the channel. Once the closure returns `true`, this method will return a /// reference to the value that was passed to the closure. /// /// Before `wait_for` starts waiting for changes, it will call the closure /// on the current value. If the closure returns `true` when given the /// current value, then `wait_for` will immediately return a reference to /// the current value. This is the case even if the current value is already /// considered seen. /// /// The watch channel only keeps track of the most recent value, so if /// several messages are sent faster than `wait_for` is able to call the /// closure, then it may skip some updates. Whenever the closure is called, /// it will be called with the most recent value. /// /// When this function returns, the value that was passed to the closure /// when it returned `true` will be considered seen. /// /// If the channel is closed, then `wait_for` will return a `RecvError`. /// Once this happens, no more messages can ever be sent on the channel. /// When an error is returned, it is guaranteed that the closure has been /// called on the last value, and that it returned `false` for that value. /// (If the closure returned `true`, then the last value would have been /// returned instead of the error.) /// /// Like the `borrow` method, the returned borrow holds a read lock on the /// inner value. This means that long-lived borrows could cause the producer /// half to block. It is recommended to keep the borrow as short-lived as /// possible. See the documentation of `borrow` for more information on /// this. /// /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// #[tokio::main] /// /// async fn main() { /// let (tx, _rx) = watch::channel("hello"); /// /// tx.send("goodbye").unwrap(); /// /// // here we subscribe to a second receiver /// // now in case of using `changed` we would have /// // to first check the current value and then wait /// // for changes or else `changed` would hang. /// let mut rx2 = tx.subscribe(); /// /// // in place of changed we have use `wait_for` /// // which would automatically check the current value /// // and wait for changes until the closure returns true. /// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok()); /// assert_eq!(*rx2.borrow(), "goodbye"); /// } /// ``` pub async fn wait_for( &mut self, mut f: impl FnMut(&T) -> bool, ) -> Result, error::RecvError> { let mut closed = false; loop { { let inner = self.shared.value.read().unwrap(); let new_version = self.shared.state.load().version(); let has_changed = self.version != new_version; self.version = new_version; if (!closed || has_changed) && f(&inner) { return Ok(Ref { inner, has_changed }); } } if closed { return Err(error::RecvError(())); } // Wait for the value to change. closed = changed_impl(&self.shared, &mut self.version).await.is_err(); } } /// Returns `true` if receivers belong to the same channel. /// /// # Examples /// /// ``` /// let (tx, rx) = tokio::sync::watch::channel(true); /// let rx2 = rx.clone(); /// assert!(rx.same_channel(&rx2)); /// /// let (tx3, rx3) = tokio::sync::watch::channel(true); /// assert!(!rx3.same_channel(&rx2)); /// ``` pub fn same_channel(&self, other: &Self) -> bool { Arc::ptr_eq(&self.shared, &other.shared) } cfg_process_driver! { pub(crate) fn try_has_changed(&mut self) -> Option> { maybe_changed(&self.shared, &mut self.version) } } } fn maybe_changed( shared: &Shared, version: &mut Version, ) -> Option> { // Load the version from the state let state = shared.state.load(); let new_version = state.version(); if *version != new_version { // Observe the new version and return *version = new_version; return Some(Ok(())); } if state.is_closed() { // All receivers have dropped. return Some(Err(error::RecvError(()))); } None } async fn changed_impl( shared: &Shared, version: &mut Version, ) -> Result<(), error::RecvError> { crate::trace::async_trace_leaf().await; loop { // In order to avoid a race condition, we first request a notification, // **then** check the current value's version. If a new version exists, // the notification request is dropped. let notified = shared.notify_rx.notified(); if let Some(ret) = maybe_changed(shared, version) { return ret; } notified.await; // loop around again in case the wake-up was spurious } } impl Clone for Receiver { fn clone(&self) -> Self { let version = self.version; let shared = self.shared.clone(); Self::from_shared(version, shared) } } impl Drop for Receiver { fn drop(&mut self) { // No synchronization necessary as this is only used as a counter and // not memory access. if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) { // This is the last `Receiver` handle, tasks waiting on `Sender::closed()` self.shared.notify_tx.notify_waiters(); } } } impl Sender { /// Sends a new value via the channel, notifying all receivers. /// /// This method fails if the channel is closed, which is the case when /// every receiver has been dropped. It is possible to reopen the channel /// using the [`subscribe`] method. However, when `send` fails, the value /// isn't made available for future receivers (but returned with the /// [`SendError`]). /// /// To always make a new value available for future receivers, even if no /// receiver currently exists, one of the other send methods /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be /// used instead. /// /// [`subscribe`]: Sender::subscribe /// [`SendError`]: error::SendError /// [`send_if_modified`]: Sender::send_if_modified /// [`send_modify`]: Sender::send_modify /// [`send_replace`]: Sender::send_replace pub fn send(&self, value: T) -> Result<(), error::SendError> { // This is pretty much only useful as a hint anyway, so synchronization isn't critical. if 0 == self.receiver_count() { return Err(error::SendError(value)); } self.send_replace(value); Ok(()) } /// Modifies the watched value **unconditionally** in-place, /// notifying all receivers. /// /// This can be useful for modifying the watched value, without /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// /// Prefer to use the more versatile function [`Self::send_if_modified()`] /// if the value is only modified conditionally during the mutable borrow /// to prevent unneeded change notifications for unmodified values. /// /// # Panics /// /// This function panics when the invocation of the `modify` closure panics. /// No receivers are notified when panicking. All changes of the watched /// value applied by the closure before panicking will be visible in /// subsequent calls to `borrow`. /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// struct State { /// counter: usize, /// } /// let (state_tx, state_rx) = watch::channel(State { counter: 0 }); /// state_tx.send_modify(|state| state.counter += 1); /// assert_eq!(state_rx.borrow().counter, 1); /// ``` pub fn send_modify(&self, modify: F) where F: FnOnce(&mut T), { self.send_if_modified(|value| { modify(value); true }); } /// Modifies the watched value **conditionally** in-place, /// notifying all receivers only if modified. /// /// This can be useful for modifying the watched value, without /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// /// The `modify` closure must return `true` if the value has actually /// been modified during the mutable borrow. It should only return `false` /// if the value is guaranteed to be unmodified despite the mutable /// borrow. /// /// Receivers are only notified if the closure returned `true`. If the /// closure has modified the value but returned `false` this results /// in a *silent modification*, i.e. the modified value will be visible /// in subsequent calls to `borrow`, but receivers will not receive /// a change notification. /// /// Returns the result of the closure, i.e. `true` if the value has /// been modified and `false` otherwise. /// /// # Panics /// /// This function panics when the invocation of the `modify` closure panics. /// No receivers are notified when panicking. All changes of the watched /// value applied by the closure before panicking will be visible in /// subsequent calls to `borrow`. /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// struct State { /// counter: usize, /// } /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 }); /// let inc_counter_if_odd = |state: &mut State| { /// if state.counter % 2 == 1 { /// state.counter += 1; /// return true; /// } /// false /// }; /// /// assert_eq!(state_rx.borrow().counter, 1); /// /// assert!(!state_rx.has_changed().unwrap()); /// assert!(state_tx.send_if_modified(inc_counter_if_odd)); /// assert!(state_rx.has_changed().unwrap()); /// assert_eq!(state_rx.borrow_and_update().counter, 2); /// /// assert!(!state_rx.has_changed().unwrap()); /// assert!(!state_tx.send_if_modified(inc_counter_if_odd)); /// assert!(!state_rx.has_changed().unwrap()); /// assert_eq!(state_rx.borrow_and_update().counter, 2); /// ``` pub fn send_if_modified(&self, modify: F) -> bool where F: FnOnce(&mut T) -> bool, { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); // Update the value and catch possible panic inside func. let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock))); match result { Ok(modified) => { if !modified { // Abort, i.e. don't notify receivers if unmodified return false; } // Continue if modified } Err(panicked) => { // Drop the lock to avoid poisoning it. drop(lock); // Forward the panic to the caller. panic::resume_unwind(panicked); // Unreachable } }; self.shared.state.increment_version(); // Release the write lock. // // Incrementing the version counter while holding the lock ensures // that receivers are able to figure out the version number of the // value they are currently looking at. drop(lock); } self.shared.notify_rx.notify_waiters(); true } /// Sends a new value via the channel, notifying all receivers and returning /// the previous value in the channel. /// /// This can be useful for reusing the buffers inside a watched value. /// Additionally, this method permits sending values even when there are no /// receivers. /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// let (tx, _rx) = watch::channel(1); /// assert_eq!(tx.send_replace(2), 1); /// assert_eq!(tx.send_replace(3), 2); /// ``` pub fn send_replace(&self, mut value: T) -> T { // swap old watched value with the new one self.send_modify(|old| mem::swap(old, &mut value)); value } /// Returns a reference to the most recently sent value /// /// Outstanding borrows hold a read lock on the inner value. This means that /// long-lived borrows could cause the producer half to block. It is recommended /// to keep the borrow as short-lived as possible. Additionally, if you are /// running in an environment that allows `!Send` futures, you must ensure that /// the returned `Ref` type is never held alive across an `.await` point, /// otherwise, it can lead to a deadlock. /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// let (tx, _) = watch::channel("hello"); /// assert_eq!(*tx.borrow(), "hello"); /// ``` pub fn borrow(&self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); // The sender/producer always sees the current version let has_changed = false; Ref { inner, has_changed } } /// Checks if the channel has been closed. This happens when all receivers /// have dropped. /// /// # Examples /// /// ``` /// let (tx, rx) = tokio::sync::watch::channel(()); /// assert!(!tx.is_closed()); /// /// drop(rx); /// assert!(tx.is_closed()); /// ``` pub fn is_closed(&self) -> bool { self.receiver_count() == 0 } /// Completes when all receivers have dropped. /// /// This allows the producer to get notified when interest in the produced /// values is canceled and immediately stop doing work. /// /// # Cancel safety /// /// This method is cancel safe. Once the channel is closed, it stays closed /// forever and all future calls to `closed` will return immediately. /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// #[tokio::main] /// async fn main() { /// let (tx, rx) = watch::channel("hello"); /// /// tokio::spawn(async move { /// // use `rx` /// drop(rx); /// }); /// /// // Waits for `rx` to drop /// tx.closed().await; /// println!("the `rx` handles dropped") /// } /// ``` pub async fn closed(&self) { crate::trace::async_trace_leaf().await; while self.receiver_count() > 0 { let notified = self.shared.notify_tx.notified(); if self.receiver_count() == 0 { return; } notified.await; // The channel could have been reopened in the meantime by calling // `subscribe`, so we loop again. } } /// Creates a new [`Receiver`] connected to this `Sender`. /// /// All messages sent before this call to `subscribe` are initially marked /// as seen by the new `Receiver`. /// /// This method can be called even if there are no other receivers. In this /// case, the channel is reopened. /// /// # Examples /// /// The new channel will receive messages sent on this `Sender`. /// /// ``` /// use tokio::sync::watch; /// /// #[tokio::main] /// async fn main() { /// let (tx, _rx) = watch::channel(0u64); /// /// tx.send(5).unwrap(); /// /// let rx = tx.subscribe(); /// assert_eq!(5, *rx.borrow()); /// /// tx.send(10).unwrap(); /// assert_eq!(10, *rx.borrow()); /// } /// ``` /// /// The most recent message is considered seen by the channel, so this test /// is guaranteed to pass. /// /// ``` /// use tokio::sync::watch; /// use tokio::time::Duration; /// /// #[tokio::main] /// async fn main() { /// let (tx, _rx) = watch::channel(0u64); /// tx.send(5).unwrap(); /// let mut rx = tx.subscribe(); /// /// tokio::spawn(async move { /// // by spawning and sleeping, the message is sent after `main` /// // hits the call to `changed`. /// # if false { /// tokio::time::sleep(Duration::from_millis(10)).await; /// # } /// tx.send(100).unwrap(); /// }); /// /// rx.changed().await.unwrap(); /// assert_eq!(100, *rx.borrow()); /// } /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); let version = shared.state.load().version(); // The CLOSED bit in the state tracks only whether the sender is // dropped, so we do not need to unset it if this reopens the channel. Receiver::from_shared(version, shared) } /// Returns the number of receivers that currently exist. /// /// # Examples /// /// ``` /// use tokio::sync::watch; /// /// #[tokio::main] /// async fn main() { /// let (tx, rx1) = watch::channel("hello"); /// /// assert_eq!(1, tx.receiver_count()); /// /// let mut _rx2 = rx1.clone(); /// /// assert_eq!(2, tx.receiver_count()); /// } /// ``` pub fn receiver_count(&self) -> usize { self.shared.ref_count_rx.load(Relaxed) } } impl Drop for Sender { fn drop(&mut self) { self.shared.state.set_closed(); self.shared.notify_rx.notify_waiters(); } } // ===== impl Ref ===== impl ops::Deref for Ref<'_, T> { type Target = T; fn deref(&self) -> &T { self.inner.deref() } } #[cfg(all(test, loom))] mod tests { use futures::future::FutureExt; use loom::thread; // test for https://github.com/tokio-rs/tokio/issues/3168 #[test] fn watch_spurious_wakeup() { loom::model(|| { let (send, mut recv) = crate::sync::watch::channel(0i32); send.send(1).unwrap(); let send_thread = thread::spawn(move || { send.send(2).unwrap(); send }); recv.changed().now_or_never(); let send = send_thread.join().unwrap(); let recv_thread = thread::spawn(move || { recv.changed().now_or_never(); recv.changed().now_or_never(); recv }); send.send(3).unwrap(); let mut recv = recv_thread.join().unwrap(); let send_thread = thread::spawn(move || { send.send(2).unwrap(); }); recv.changed().now_or_never(); send_thread.join().unwrap(); }); } #[test] fn watch_borrow() { loom::model(|| { let (send, mut recv) = crate::sync::watch::channel(0i32); assert!(send.borrow().eq(&0)); assert!(recv.borrow().eq(&0)); send.send(1).unwrap(); assert!(send.borrow().eq(&1)); let send_thread = thread::spawn(move || { send.send(2).unwrap(); send }); recv.changed().now_or_never(); let send = send_thread.join().unwrap(); let recv_thread = thread::spawn(move || { recv.changed().now_or_never(); recv.changed().now_or_never(); recv }); send.send(3).unwrap(); let recv = recv_thread.join().unwrap(); assert!(recv.borrow().eq(&3)); assert!(send.borrow().eq(&3)); send.send(2).unwrap(); thread::spawn(move || { assert!(recv.borrow().eq(&2)); }); assert!(send.borrow().eq(&2)); }); } }