summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/common/watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/common/watch.rs')
-rw-r--r--third_party/rust/hyper/src/common/watch.rs73
1 files changed, 73 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/common/watch.rs b/third_party/rust/hyper/src/common/watch.rs
new file mode 100644
index 0000000000..ba17d551cb
--- /dev/null
+++ b/third_party/rust/hyper/src/common/watch.rs
@@ -0,0 +1,73 @@
+//! An SPSC broadcast channel.
+//!
+//! - The value can only be a `usize`.
+//! - The consumer is only notified if the value is different.
+//! - The value `0` is reserved for closed.
+
+use futures_util::task::AtomicWaker;
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+use std::task;
+
+type Value = usize;
+
+pub(crate) const CLOSED: usize = 0;
+
+pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
+ debug_assert!(
+ initial != CLOSED,
+ "watch::channel initial state of 0 is reserved"
+ );
+
+ let shared = Arc::new(Shared {
+ value: AtomicUsize::new(initial),
+ waker: AtomicWaker::new(),
+ });
+
+ (
+ Sender {
+ shared: shared.clone(),
+ },
+ Receiver { shared },
+ )
+}
+
+pub(crate) struct Sender {
+ shared: Arc<Shared>,
+}
+
+pub(crate) struct Receiver {
+ shared: Arc<Shared>,
+}
+
+struct Shared {
+ value: AtomicUsize,
+ waker: AtomicWaker,
+}
+
+impl Sender {
+ pub(crate) fn send(&mut self, value: Value) {
+ if self.shared.value.swap(value, Ordering::SeqCst) != value {
+ self.shared.waker.wake();
+ }
+ }
+}
+
+impl Drop for Sender {
+ fn drop(&mut self) {
+ self.send(CLOSED);
+ }
+}
+
+impl Receiver {
+ pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
+ self.shared.waker.register(cx.waker());
+ self.shared.value.load(Ordering::SeqCst)
+ }
+
+ pub(crate) fn peek(&self) -> Value {
+ self.shared.value.load(Ordering::Relaxed)
+ }
+}