//! An SPSC broadcast channel. //! //! - The value can only be a `usize`. //! - The consumer is only notified if the value is different. //! - The value `0` is reserved for closed. use futures_util::task::AtomicWaker; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; use std::task; type Value = usize; pub(crate) const CLOSED: usize = 0; pub(crate) fn channel(initial: Value) -> (Sender, Receiver) { debug_assert!( initial != CLOSED, "watch::channel initial state of 0 is reserved" ); let shared = Arc::new(Shared { value: AtomicUsize::new(initial), waker: AtomicWaker::new(), }); ( Sender { shared: shared.clone(), }, Receiver { shared }, ) } pub(crate) struct Sender { shared: Arc, } pub(crate) struct Receiver { shared: Arc, } struct Shared { value: AtomicUsize, waker: AtomicWaker, } impl Sender { pub(crate) fn send(&mut self, value: Value) { if self.shared.value.swap(value, Ordering::SeqCst) != value { self.shared.waker.wake(); } } } impl Drop for Sender { fn drop(&mut self) { self.send(CLOSED); } } impl Receiver { pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value { self.shared.waker.register(cx.waker()); self.shared.value.load(Ordering::SeqCst) } pub(crate) fn peek(&self) -> Value { self.shared.value.load(Ordering::Relaxed) } }