summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/common
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/common')
-rw-r--r--third_party/rust/hyper/src/common/buf.rs72
-rw-r--r--third_party/rust/hyper/src/common/drain.rs232
-rw-r--r--third_party/rust/hyper/src/common/exec.rs111
-rw-r--r--third_party/rust/hyper/src/common/io/mod.rs3
-rw-r--r--third_party/rust/hyper/src/common/io/rewind.rs153
-rw-r--r--third_party/rust/hyper/src/common/lazy.rs69
-rw-r--r--third_party/rust/hyper/src/common/mod.rs26
-rw-r--r--third_party/rust/hyper/src/common/never.rs21
-rw-r--r--third_party/rust/hyper/src/common/task.rs10
-rw-r--r--third_party/rust/hyper/src/common/watch.rs73
10 files changed, 770 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/common/buf.rs b/third_party/rust/hyper/src/common/buf.rs
new file mode 100644
index 0000000000..8f71b7bbad
--- /dev/null
+++ b/third_party/rust/hyper/src/common/buf.rs
@@ -0,0 +1,72 @@
+use std::collections::VecDeque;
+use std::io::IoSlice;
+
+use bytes::Buf;
+
+pub(crate) struct BufList<T> {
+ bufs: VecDeque<T>,
+}
+
+impl<T: Buf> BufList<T> {
+ pub(crate) fn new() -> BufList<T> {
+ BufList {
+ bufs: VecDeque::new(),
+ }
+ }
+
+ #[inline]
+ pub(crate) fn push(&mut self, buf: T) {
+ debug_assert!(buf.has_remaining());
+ self.bufs.push_back(buf);
+ }
+
+ #[inline]
+ pub(crate) fn bufs_cnt(&self) -> usize {
+ self.bufs.len()
+ }
+}
+
+impl<T: Buf> Buf for BufList<T> {
+ #[inline]
+ fn remaining(&self) -> usize {
+ self.bufs.iter().map(|buf| buf.remaining()).sum()
+ }
+
+ #[inline]
+ fn bytes(&self) -> &[u8] {
+ self.bufs.front().map(Buf::bytes).unwrap_or_default()
+ }
+
+ #[inline]
+ fn advance(&mut self, mut cnt: usize) {
+ while cnt > 0 {
+ {
+ let front = &mut self.bufs[0];
+ let rem = front.remaining();
+ if rem > cnt {
+ front.advance(cnt);
+ return;
+ } else {
+ front.advance(rem);
+ cnt -= rem;
+ }
+ }
+ self.bufs.pop_front();
+ }
+ }
+
+ #[inline]
+ fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
+ if dst.is_empty() {
+ return 0;
+ }
+ let mut vecs = 0;
+ for buf in &self.bufs {
+ vecs += buf.bytes_vectored(&mut dst[vecs..]);
+ if vecs == dst.len() {
+ break;
+ }
+ }
+ vecs
+ }
+}
diff --git a/third_party/rust/hyper/src/common/drain.rs b/third_party/rust/hyper/src/common/drain.rs
new file mode 100644
index 0000000000..7abb9f9ded
--- /dev/null
+++ b/third_party/rust/hyper/src/common/drain.rs
@@ -0,0 +1,232 @@
+use std::mem;
+
+use pin_project::pin_project;
+use tokio::sync::{mpsc, watch};
+
+use super::{task, Future, Never, Pin, Poll};
+
+// Sentinel value signaling that the watch is still open
+#[derive(Clone, Copy)]
+enum Action {
+ Open,
+ // Closed isn't sent via the `Action` type, but rather once
+ // the watch::Sender is dropped.
+}
+
+pub fn channel() -> (Signal, Watch) {
+ let (tx, rx) = watch::channel(Action::Open);
+ let (drained_tx, drained_rx) = mpsc::channel(1);
+ (
+ Signal {
+ drained_rx,
+ _tx: tx,
+ },
+ Watch { drained_tx, rx },
+ )
+}
+
+pub struct Signal {
+ drained_rx: mpsc::Receiver<Never>,
+ _tx: watch::Sender<Action>,
+}
+
+pub struct Draining {
+ drained_rx: mpsc::Receiver<Never>,
+}
+
+#[derive(Clone)]
+pub struct Watch {
+ drained_tx: mpsc::Sender<Never>,
+ rx: watch::Receiver<Action>,
+}
+
+#[allow(missing_debug_implementations)]
+#[pin_project]
+pub struct Watching<F, FN> {
+ #[pin]
+ future: F,
+ state: State<FN>,
+ watch: Watch,
+}
+
+enum State<F> {
+ Watch(F),
+ Draining,
+}
+
+impl Signal {
+ pub fn drain(self) -> Draining {
+ // Simply dropping `self.tx` will signal the watchers
+ Draining {
+ drained_rx: self.drained_rx,
+ }
+ }
+}
+
+impl Future for Draining {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match ready!(self.drained_rx.poll_recv(cx)) {
+ Some(never) => match never {},
+ None => Poll::Ready(()),
+ }
+ }
+}
+
+impl Watch {
+ pub fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
+ where
+ F: Future,
+ FN: FnOnce(Pin<&mut F>),
+ {
+ Watching {
+ future,
+ state: State::Watch(on_drain),
+ watch: self,
+ }
+ }
+}
+
+impl<F, FN> Future for Watching<F, FN>
+where
+ F: Future,
+ FN: FnOnce(Pin<&mut F>),
+{
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ loop {
+ match mem::replace(me.state, State::Draining) {
+ State::Watch(on_drain) => {
+ match me.watch.rx.poll_recv_ref(cx) {
+ Poll::Ready(None) => {
+ // Drain has been triggered!
+ on_drain(me.future.as_mut());
+ }
+ Poll::Ready(Some(_ /*State::Open*/)) | Poll::Pending => {
+ *me.state = State::Watch(on_drain);
+ return me.future.poll(cx);
+ }
+ }
+ }
+ State::Draining => return me.future.poll(cx),
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ struct TestMe {
+ draining: bool,
+ finished: bool,
+ poll_cnt: usize,
+ }
+
+ impl Future for TestMe {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
+ self.poll_cnt += 1;
+ if self.finished {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+
+ #[test]
+ fn watch() {
+ let mut mock = tokio_test::task::spawn(());
+ mock.enter(|cx, _| {
+ let (tx, rx) = channel();
+ let fut = TestMe {
+ draining: false,
+ finished: false,
+ poll_cnt: 0,
+ };
+
+ let mut watch = rx.watch(fut, |mut fut| {
+ fut.draining = true;
+ });
+
+ assert_eq!(watch.future.poll_cnt, 0);
+
+ // First poll should poll the inner future
+ assert!(Pin::new(&mut watch).poll(cx).is_pending());
+ assert_eq!(watch.future.poll_cnt, 1);
+
+ // Second poll should poll the inner future again
+ assert!(Pin::new(&mut watch).poll(cx).is_pending());
+ assert_eq!(watch.future.poll_cnt, 2);
+
+ let mut draining = tx.drain();
+ // Drain signaled, but needs another poll to be noticed.
+ assert!(!watch.future.draining);
+ assert_eq!(watch.future.poll_cnt, 2);
+
+ // Now, poll after drain has been signaled.
+ assert!(Pin::new(&mut watch).poll(cx).is_pending());
+ assert_eq!(watch.future.poll_cnt, 3);
+ assert!(watch.future.draining);
+
+ // Draining is not ready until watcher completes
+ assert!(Pin::new(&mut draining).poll(cx).is_pending());
+
+ // Finishing up the watch future
+ watch.future.finished = true;
+ assert!(Pin::new(&mut watch).poll(cx).is_ready());
+ assert_eq!(watch.future.poll_cnt, 4);
+ drop(watch);
+
+ assert!(Pin::new(&mut draining).poll(cx).is_ready());
+ })
+ }
+
+ #[test]
+ fn watch_clones() {
+ let mut mock = tokio_test::task::spawn(());
+ mock.enter(|cx, _| {
+ let (tx, rx) = channel();
+
+ let fut1 = TestMe {
+ draining: false,
+ finished: false,
+ poll_cnt: 0,
+ };
+ let fut2 = TestMe {
+ draining: false,
+ finished: false,
+ poll_cnt: 0,
+ };
+
+ let watch1 = rx.clone().watch(fut1, |mut fut| {
+ fut.draining = true;
+ });
+ let watch2 = rx.watch(fut2, |mut fut| {
+ fut.draining = true;
+ });
+
+ let mut draining = tx.drain();
+
+ // Still 2 outstanding watchers
+ assert!(Pin::new(&mut draining).poll(cx).is_pending());
+
+ // drop 1 for whatever reason
+ drop(watch1);
+
+ // Still not ready, 1 other watcher still pending
+ assert!(Pin::new(&mut draining).poll(cx).is_pending());
+
+ drop(watch2);
+
+ // Now all watchers are gone, draining is complete
+ assert!(Pin::new(&mut draining).poll(cx).is_ready());
+ });
+ }
+}
diff --git a/third_party/rust/hyper/src/common/exec.rs b/third_party/rust/hyper/src/common/exec.rs
new file mode 100644
index 0000000000..94ad4610a2
--- /dev/null
+++ b/third_party/rust/hyper/src/common/exec.rs
@@ -0,0 +1,111 @@
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use crate::body::{Body, Payload};
+use crate::proto::h2::server::H2Stream;
+use crate::server::conn::spawn_all::{NewSvcTask, Watcher};
+use crate::service::HttpService;
+
+/// An executor of futures.
+pub trait Executor<Fut> {
+ /// Place the future into the executor to be run.
+ fn execute(&self, fut: Fut);
+}
+
+pub trait H2Exec<F, B: Payload>: Clone {
+ fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
+}
+
+pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone {
+ fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>);
+}
+
+pub type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
+
+// Either the user provides an executor for background tasks, or we use
+// `tokio::spawn`.
+#[derive(Clone)]
+pub enum Exec {
+ Default,
+ Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>),
+}
+
+// ===== impl Exec =====
+
+impl Exec {
+ pub(crate) fn execute<F>(&self, fut: F)
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ match *self {
+ Exec::Default => {
+ #[cfg(feature = "tcp")]
+ {
+ tokio::task::spawn(fut);
+ }
+ #[cfg(not(feature = "tcp"))]
+ {
+ // If no runtime, we need an executor!
+ panic!("executor must be set")
+ }
+ }
+ Exec::Executor(ref e) => {
+ e.execute(Box::pin(fut));
+ }
+ }
+ }
+}
+
+impl fmt::Debug for Exec {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Exec").finish()
+ }
+}
+
+impl<F, B> H2Exec<F, B> for Exec
+where
+ H2Stream<F, B>: Future<Output = ()> + Send + 'static,
+ B: Payload,
+{
+ fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
+ self.execute(fut)
+ }
+}
+
+impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec
+where
+ NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static,
+ S: HttpService<Body>,
+ W: Watcher<I, S, E>,
+{
+ fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
+ self.execute(fut)
+ }
+}
+
+// ==== impl Executor =====
+
+impl<E, F, B> H2Exec<F, B> for E
+where
+ E: Executor<H2Stream<F, B>> + Clone,
+ H2Stream<F, B>: Future<Output = ()>,
+ B: Payload,
+{
+ fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
+ self.execute(fut)
+ }
+}
+
+impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
+where
+ E: Executor<NewSvcTask<I, N, S, E, W>> + Clone,
+ NewSvcTask<I, N, S, E, W>: Future<Output = ()>,
+ S: HttpService<Body>,
+ W: Watcher<I, S, E>,
+{
+ fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
+ self.execute(fut)
+ }
+}
diff --git a/third_party/rust/hyper/src/common/io/mod.rs b/third_party/rust/hyper/src/common/io/mod.rs
new file mode 100644
index 0000000000..2e6d506153
--- /dev/null
+++ b/third_party/rust/hyper/src/common/io/mod.rs
@@ -0,0 +1,3 @@
+mod rewind;
+
+pub(crate) use self::rewind::Rewind;
diff --git a/third_party/rust/hyper/src/common/io/rewind.rs b/third_party/rust/hyper/src/common/io/rewind.rs
new file mode 100644
index 0000000000..14650697c3
--- /dev/null
+++ b/third_party/rust/hyper/src/common/io/rewind.rs
@@ -0,0 +1,153 @@
+use std::marker::Unpin;
+use std::{cmp, io};
+
+use bytes::{Buf, Bytes};
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use crate::common::{task, Pin, Poll};
+
+/// Combine a buffer with an IO, rewinding reads to use the buffer.
+#[derive(Debug)]
+pub(crate) struct Rewind<T> {
+ pre: Option<Bytes>,
+ inner: T,
+}
+
+impl<T> Rewind<T> {
+ pub(crate) fn new(io: T) -> Self {
+ Rewind {
+ pre: None,
+ inner: io,
+ }
+ }
+
+ pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self {
+ Rewind {
+ pre: Some(buf),
+ inner: io,
+ }
+ }
+
+ pub(crate) fn rewind(&mut self, bs: Bytes) {
+ debug_assert!(self.pre.is_none());
+ self.pre = Some(bs);
+ }
+
+ pub(crate) fn into_inner(self) -> (T, Bytes) {
+ (self.inner, self.pre.unwrap_or_else(Bytes::new))
+ }
+
+ pub(crate) fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+}
+
+impl<T> AsyncRead for Rewind<T>
+where
+ T: AsyncRead + Unpin,
+{
+ #[inline]
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit<u8>]) -> bool {
+ self.inner.prepare_uninitialized_buffer(buf)
+ }
+
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ if let Some(mut prefix) = self.pre.take() {
+ // If there are no remaining bytes, let the bytes get dropped.
+ if !prefix.is_empty() {
+ let copy_len = cmp::min(prefix.len(), buf.len());
+ prefix.copy_to_slice(&mut buf[..copy_len]);
+ // Put back whats left
+ if !prefix.is_empty() {
+ self.pre = Some(prefix);
+ }
+
+ return Poll::Ready(Ok(copy_len));
+ }
+ }
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+}
+
+impl<T> AsyncWrite for Rewind<T>
+where
+ T: AsyncWrite + Unpin,
+{
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+
+ #[inline]
+ fn poll_write_buf<B: Buf>(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write_buf(cx, buf)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ // FIXME: re-implement tests with `async/await`, this import should
+ // trigger a warning to remind us
+ use super::Rewind;
+ use bytes::Bytes;
+ use tokio::io::AsyncReadExt;
+
+ #[tokio::test]
+ async fn partial_rewind() {
+ let underlying = [104, 101, 108, 108, 111];
+
+ let mock = tokio_test::io::Builder::new().read(&underlying).build();
+
+ let mut stream = Rewind::new(mock);
+
+ // Read off some bytes, ensure we filled o1
+ let mut buf = [0; 2];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // Rewind the stream so that it is as if we never read in the first place.
+ stream.rewind(Bytes::copy_from_slice(&buf[..]));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // At this point we should have read everything that was in the MockStream
+ assert_eq!(&buf, &underlying);
+ }
+
+ #[tokio::test]
+ async fn full_rewind() {
+ let underlying = [104, 101, 108, 108, 111];
+
+ let mock = tokio_test::io::Builder::new().read(&underlying).build();
+
+ let mut stream = Rewind::new(mock);
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // Rewind the stream so that it is as if we never read in the first place.
+ stream.rewind(Bytes::copy_from_slice(&buf[..]));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+ }
+}
diff --git a/third_party/rust/hyper/src/common/lazy.rs b/third_party/rust/hyper/src/common/lazy.rs
new file mode 100644
index 0000000000..4d2e322c2c
--- /dev/null
+++ b/third_party/rust/hyper/src/common/lazy.rs
@@ -0,0 +1,69 @@
+use std::mem;
+
+use super::{task, Future, Pin, Poll};
+
+pub(crate) trait Started: Future {
+ fn started(&self) -> bool;
+}
+
+pub(crate) fn lazy<F, R>(func: F) -> Lazy<F, R>
+where
+ F: FnOnce() -> R,
+ R: Future + Unpin,
+{
+ Lazy {
+ inner: Inner::Init(func),
+ }
+}
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(crate) struct Lazy<F, R> {
+ inner: Inner<F, R>,
+}
+
+enum Inner<F, R> {
+ Init(F),
+ Fut(R),
+ Empty,
+}
+
+impl<F, R> Started for Lazy<F, R>
+where
+ F: FnOnce() -> R,
+ R: Future + Unpin,
+{
+ fn started(&self) -> bool {
+ match self.inner {
+ Inner::Init(_) => false,
+ Inner::Fut(_) | Inner::Empty => true,
+ }
+ }
+}
+
+impl<F, R> Future for Lazy<F, R>
+where
+ F: FnOnce() -> R,
+ R: Future + Unpin,
+{
+ type Output = R::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ if let Inner::Fut(ref mut f) = self.inner {
+ return Pin::new(f).poll(cx);
+ }
+
+ match mem::replace(&mut self.inner, Inner::Empty) {
+ Inner::Init(func) => {
+ let mut fut = func();
+ let ret = Pin::new(&mut fut).poll(cx);
+ self.inner = Inner::Fut(fut);
+ ret
+ }
+ _ => unreachable!("lazy state wrong"),
+ }
+ }
+}
+
+// The closure `F` is never pinned
+impl<F, R: Unpin> Unpin for Lazy<F, R> {}
diff --git a/third_party/rust/hyper/src/common/mod.rs b/third_party/rust/hyper/src/common/mod.rs
new file mode 100644
index 0000000000..e436fe5e2d
--- /dev/null
+++ b/third_party/rust/hyper/src/common/mod.rs
@@ -0,0 +1,26 @@
+macro_rules! ready {
+ ($e:expr) => {
+ match $e {
+ std::task::Poll::Ready(v) => v,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }
+ };
+}
+
+pub(crate) mod buf;
+pub(crate) mod drain;
+pub(crate) mod exec;
+pub(crate) mod io;
+mod lazy;
+mod never;
+pub(crate) mod task;
+pub(crate) mod watch;
+
+pub use self::exec::Executor;
+pub(crate) use self::exec::{BoxSendFuture, Exec};
+pub(crate) use self::lazy::{lazy, Started as Lazy};
+pub use self::never::Never;
+pub(crate) use self::task::Poll;
+
+// group up types normally needed for `Future`
+pub(crate) use std::{future::Future, marker::Unpin, pin::Pin};
diff --git a/third_party/rust/hyper/src/common/never.rs b/third_party/rust/hyper/src/common/never.rs
new file mode 100644
index 0000000000..f4fdb95ddd
--- /dev/null
+++ b/third_party/rust/hyper/src/common/never.rs
@@ -0,0 +1,21 @@
+//! An uninhabitable type meaning it can never happen.
+//!
+//! To be replaced with `!` once it is stable.
+
+use std::error::Error;
+use std::fmt;
+
+#[derive(Debug)]
+pub enum Never {}
+
+impl fmt::Display for Never {
+ fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {}
+ }
+}
+
+impl Error for Never {
+ fn description(&self) -> &str {
+ match *self {}
+ }
+}
diff --git a/third_party/rust/hyper/src/common/task.rs b/third_party/rust/hyper/src/common/task.rs
new file mode 100644
index 0000000000..bfccfe3bfe
--- /dev/null
+++ b/third_party/rust/hyper/src/common/task.rs
@@ -0,0 +1,10 @@
+use super::Never;
+pub(crate) use std::task::{Context, Poll};
+
+/// A function to help "yield" a future, such that it is re-scheduled immediately.
+///
+/// Useful for spin counts, so a future doesn't hog too much time.
+pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<Never> {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+}
diff --git a/third_party/rust/hyper/src/common/watch.rs b/third_party/rust/hyper/src/common/watch.rs
new file mode 100644
index 0000000000..ba17d551cb
--- /dev/null
+++ b/third_party/rust/hyper/src/common/watch.rs
@@ -0,0 +1,73 @@
+//! An SPSC broadcast channel.
+//!
+//! - The value can only be a `usize`.
+//! - The consumer is only notified if the value is different.
+//! - The value `0` is reserved for closed.
+
+use futures_util::task::AtomicWaker;
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+use std::task;
+
+type Value = usize;
+
+pub(crate) const CLOSED: usize = 0;
+
+pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
+ debug_assert!(
+ initial != CLOSED,
+ "watch::channel initial state of 0 is reserved"
+ );
+
+ let shared = Arc::new(Shared {
+ value: AtomicUsize::new(initial),
+ waker: AtomicWaker::new(),
+ });
+
+ (
+ Sender {
+ shared: shared.clone(),
+ },
+ Receiver { shared },
+ )
+}
+
+pub(crate) struct Sender {
+ shared: Arc<Shared>,
+}
+
+pub(crate) struct Receiver {
+ shared: Arc<Shared>,
+}
+
+struct Shared {
+ value: AtomicUsize,
+ waker: AtomicWaker,
+}
+
+impl Sender {
+ pub(crate) fn send(&mut self, value: Value) {
+ if self.shared.value.swap(value, Ordering::SeqCst) != value {
+ self.shared.waker.wake();
+ }
+ }
+}
+
+impl Drop for Sender {
+ fn drop(&mut self) {
+ self.send(CLOSED);
+ }
+}
+
+impl Receiver {
+ pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
+ self.shared.waker.register(cx.waker());
+ self.shared.value.load(Ordering::SeqCst)
+ }
+
+ pub(crate) fn peek(&self) -> Value {
+ self.shared.value.load(Ordering::Relaxed)
+ }
+}