diff options
Diffstat (limited to 'third_party/rust/hyper/src/common')
-rw-r--r-- | third_party/rust/hyper/src/common/buf.rs | 72 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/drain.rs | 232 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/exec.rs | 111 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/io/mod.rs | 3 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/io/rewind.rs | 153 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/lazy.rs | 69 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/mod.rs | 26 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/never.rs | 21 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/task.rs | 10 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/watch.rs | 73 |
10 files changed, 770 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/common/buf.rs b/third_party/rust/hyper/src/common/buf.rs new file mode 100644 index 0000000000..8f71b7bbad --- /dev/null +++ b/third_party/rust/hyper/src/common/buf.rs @@ -0,0 +1,72 @@ +use std::collections::VecDeque; +use std::io::IoSlice; + +use bytes::Buf; + +pub(crate) struct BufList<T> { + bufs: VecDeque<T>, +} + +impl<T: Buf> BufList<T> { + pub(crate) fn new() -> BufList<T> { + BufList { + bufs: VecDeque::new(), + } + } + + #[inline] + pub(crate) fn push(&mut self, buf: T) { + debug_assert!(buf.has_remaining()); + self.bufs.push_back(buf); + } + + #[inline] + pub(crate) fn bufs_cnt(&self) -> usize { + self.bufs.len() + } +} + +impl<T: Buf> Buf for BufList<T> { + #[inline] + fn remaining(&self) -> usize { + self.bufs.iter().map(|buf| buf.remaining()).sum() + } + + #[inline] + fn bytes(&self) -> &[u8] { + self.bufs.front().map(Buf::bytes).unwrap_or_default() + } + + #[inline] + fn advance(&mut self, mut cnt: usize) { + while cnt > 0 { + { + let front = &mut self.bufs[0]; + let rem = front.remaining(); + if rem > cnt { + front.advance(cnt); + return; + } else { + front.advance(rem); + cnt -= rem; + } + } + self.bufs.pop_front(); + } + } + + #[inline] + fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { + if dst.is_empty() { + return 0; + } + let mut vecs = 0; + for buf in &self.bufs { + vecs += buf.bytes_vectored(&mut dst[vecs..]); + if vecs == dst.len() { + break; + } + } + vecs + } +} diff --git a/third_party/rust/hyper/src/common/drain.rs b/third_party/rust/hyper/src/common/drain.rs new file mode 100644 index 0000000000..7abb9f9ded --- /dev/null +++ b/third_party/rust/hyper/src/common/drain.rs @@ -0,0 +1,232 @@ +use std::mem; + +use pin_project::pin_project; +use tokio::sync::{mpsc, watch}; + +use super::{task, Future, Never, Pin, Poll}; + +// Sentinel value signaling that the watch is still open +#[derive(Clone, Copy)] +enum Action { + Open, + // Closed isn't sent via the `Action` type, but rather once + // the watch::Sender is dropped. +} + +pub fn channel() -> (Signal, Watch) { + let (tx, rx) = watch::channel(Action::Open); + let (drained_tx, drained_rx) = mpsc::channel(1); + ( + Signal { + drained_rx, + _tx: tx, + }, + Watch { drained_tx, rx }, + ) +} + +pub struct Signal { + drained_rx: mpsc::Receiver<Never>, + _tx: watch::Sender<Action>, +} + +pub struct Draining { + drained_rx: mpsc::Receiver<Never>, +} + +#[derive(Clone)] +pub struct Watch { + drained_tx: mpsc::Sender<Never>, + rx: watch::Receiver<Action>, +} + +#[allow(missing_debug_implementations)] +#[pin_project] +pub struct Watching<F, FN> { + #[pin] + future: F, + state: State<FN>, + watch: Watch, +} + +enum State<F> { + Watch(F), + Draining, +} + +impl Signal { + pub fn drain(self) -> Draining { + // Simply dropping `self.tx` will signal the watchers + Draining { + drained_rx: self.drained_rx, + } + } +} + +impl Future for Draining { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + match ready!(self.drained_rx.poll_recv(cx)) { + Some(never) => match never {}, + None => Poll::Ready(()), + } + } +} + +impl Watch { + pub fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN> + where + F: Future, + FN: FnOnce(Pin<&mut F>), + { + Watching { + future, + state: State::Watch(on_drain), + watch: self, + } + } +} + +impl<F, FN> Future for Watching<F, FN> +where + F: Future, + FN: FnOnce(Pin<&mut F>), +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + match mem::replace(me.state, State::Draining) { + State::Watch(on_drain) => { + match me.watch.rx.poll_recv_ref(cx) { + Poll::Ready(None) => { + // Drain has been triggered! + on_drain(me.future.as_mut()); + } + Poll::Ready(Some(_ /*State::Open*/)) | Poll::Pending => { + *me.state = State::Watch(on_drain); + return me.future.poll(cx); + } + } + } + State::Draining => return me.future.poll(cx), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct TestMe { + draining: bool, + finished: bool, + poll_cnt: usize, + } + + impl Future for TestMe { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> { + self.poll_cnt += 1; + if self.finished { + Poll::Ready(()) + } else { + Poll::Pending + } + } + } + + #[test] + fn watch() { + let mut mock = tokio_test::task::spawn(()); + mock.enter(|cx, _| { + let (tx, rx) = channel(); + let fut = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + + let mut watch = rx.watch(fut, |mut fut| { + fut.draining = true; + }); + + assert_eq!(watch.future.poll_cnt, 0); + + // First poll should poll the inner future + assert!(Pin::new(&mut watch).poll(cx).is_pending()); + assert_eq!(watch.future.poll_cnt, 1); + + // Second poll should poll the inner future again + assert!(Pin::new(&mut watch).poll(cx).is_pending()); + assert_eq!(watch.future.poll_cnt, 2); + + let mut draining = tx.drain(); + // Drain signaled, but needs another poll to be noticed. + assert!(!watch.future.draining); + assert_eq!(watch.future.poll_cnt, 2); + + // Now, poll after drain has been signaled. + assert!(Pin::new(&mut watch).poll(cx).is_pending()); + assert_eq!(watch.future.poll_cnt, 3); + assert!(watch.future.draining); + + // Draining is not ready until watcher completes + assert!(Pin::new(&mut draining).poll(cx).is_pending()); + + // Finishing up the watch future + watch.future.finished = true; + assert!(Pin::new(&mut watch).poll(cx).is_ready()); + assert_eq!(watch.future.poll_cnt, 4); + drop(watch); + + assert!(Pin::new(&mut draining).poll(cx).is_ready()); + }) + } + + #[test] + fn watch_clones() { + let mut mock = tokio_test::task::spawn(()); + mock.enter(|cx, _| { + let (tx, rx) = channel(); + + let fut1 = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + let fut2 = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + + let watch1 = rx.clone().watch(fut1, |mut fut| { + fut.draining = true; + }); + let watch2 = rx.watch(fut2, |mut fut| { + fut.draining = true; + }); + + let mut draining = tx.drain(); + + // Still 2 outstanding watchers + assert!(Pin::new(&mut draining).poll(cx).is_pending()); + + // drop 1 for whatever reason + drop(watch1); + + // Still not ready, 1 other watcher still pending + assert!(Pin::new(&mut draining).poll(cx).is_pending()); + + drop(watch2); + + // Now all watchers are gone, draining is complete + assert!(Pin::new(&mut draining).poll(cx).is_ready()); + }); + } +} diff --git a/third_party/rust/hyper/src/common/exec.rs b/third_party/rust/hyper/src/common/exec.rs new file mode 100644 index 0000000000..94ad4610a2 --- /dev/null +++ b/third_party/rust/hyper/src/common/exec.rs @@ -0,0 +1,111 @@ +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use crate::body::{Body, Payload}; +use crate::proto::h2::server::H2Stream; +use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; +use crate::service::HttpService; + +/// An executor of futures. +pub trait Executor<Fut> { + /// Place the future into the executor to be run. + fn execute(&self, fut: Fut); +} + +pub trait H2Exec<F, B: Payload>: Clone { + fn execute_h2stream(&mut self, fut: H2Stream<F, B>); +} + +pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone { + fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>); +} + +pub type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>; + +// Either the user provides an executor for background tasks, or we use +// `tokio::spawn`. +#[derive(Clone)] +pub enum Exec { + Default, + Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>), +} + +// ===== impl Exec ===== + +impl Exec { + pub(crate) fn execute<F>(&self, fut: F) + where + F: Future<Output = ()> + Send + 'static, + { + match *self { + Exec::Default => { + #[cfg(feature = "tcp")] + { + tokio::task::spawn(fut); + } + #[cfg(not(feature = "tcp"))] + { + // If no runtime, we need an executor! + panic!("executor must be set") + } + } + Exec::Executor(ref e) => { + e.execute(Box::pin(fut)); + } + } + } +} + +impl fmt::Debug for Exec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Exec").finish() + } +} + +impl<F, B> H2Exec<F, B> for Exec +where + H2Stream<F, B>: Future<Output = ()> + Send + 'static, + B: Payload, +{ + fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { + self.execute(fut) + } +} + +impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec +where + NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static, + S: HttpService<Body>, + W: Watcher<I, S, E>, +{ + fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { + self.execute(fut) + } +} + +// ==== impl Executor ===== + +impl<E, F, B> H2Exec<F, B> for E +where + E: Executor<H2Stream<F, B>> + Clone, + H2Stream<F, B>: Future<Output = ()>, + B: Payload, +{ + fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { + self.execute(fut) + } +} + +impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E +where + E: Executor<NewSvcTask<I, N, S, E, W>> + Clone, + NewSvcTask<I, N, S, E, W>: Future<Output = ()>, + S: HttpService<Body>, + W: Watcher<I, S, E>, +{ + fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { + self.execute(fut) + } +} diff --git a/third_party/rust/hyper/src/common/io/mod.rs b/third_party/rust/hyper/src/common/io/mod.rs new file mode 100644 index 0000000000..2e6d506153 --- /dev/null +++ b/third_party/rust/hyper/src/common/io/mod.rs @@ -0,0 +1,3 @@ +mod rewind; + +pub(crate) use self::rewind::Rewind; diff --git a/third_party/rust/hyper/src/common/io/rewind.rs b/third_party/rust/hyper/src/common/io/rewind.rs new file mode 100644 index 0000000000..14650697c3 --- /dev/null +++ b/third_party/rust/hyper/src/common/io/rewind.rs @@ -0,0 +1,153 @@ +use std::marker::Unpin; +use std::{cmp, io}; + +use bytes::{Buf, Bytes}; +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::common::{task, Pin, Poll}; + +/// Combine a buffer with an IO, rewinding reads to use the buffer. +#[derive(Debug)] +pub(crate) struct Rewind<T> { + pre: Option<Bytes>, + inner: T, +} + +impl<T> Rewind<T> { + pub(crate) fn new(io: T) -> Self { + Rewind { + pre: None, + inner: io, + } + } + + pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { + Rewind { + pre: Some(buf), + inner: io, + } + } + + pub(crate) fn rewind(&mut self, bs: Bytes) { + debug_assert!(self.pre.is_none()); + self.pre = Some(bs); + } + + pub(crate) fn into_inner(self) -> (T, Bytes) { + (self.inner, self.pre.unwrap_or_else(Bytes::new)) + } + + pub(crate) fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> AsyncRead for Rewind<T> +where + T: AsyncRead + Unpin, +{ + #[inline] + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit<u8>]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } + + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + if let Some(mut prefix) = self.pre.take() { + // If there are no remaining bytes, let the bytes get dropped. + if !prefix.is_empty() { + let copy_len = cmp::min(prefix.len(), buf.len()); + prefix.copy_to_slice(&mut buf[..copy_len]); + // Put back whats left + if !prefix.is_empty() { + self.pre = Some(prefix); + } + + return Poll::Ready(Ok(copy_len)); + } + } + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl<T> AsyncWrite for Rewind<T> +where + T: AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + + #[inline] + fn poll_write_buf<B: Buf>( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + Pin::new(&mut self.inner).poll_write_buf(cx, buf) + } +} + +#[cfg(test)] +mod tests { + // FIXME: re-implement tests with `async/await`, this import should + // trigger a warning to remind us + use super::Rewind; + use bytes::Bytes; + use tokio::io::AsyncReadExt; + + #[tokio::test] + async fn partial_rewind() { + let underlying = [104, 101, 108, 108, 111]; + + let mock = tokio_test::io::Builder::new().read(&underlying).build(); + + let mut stream = Rewind::new(mock); + + // Read off some bytes, ensure we filled o1 + let mut buf = [0; 2]; + stream.read_exact(&mut buf).await.expect("read1"); + + // Rewind the stream so that it is as if we never read in the first place. + stream.rewind(Bytes::copy_from_slice(&buf[..])); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + + // At this point we should have read everything that was in the MockStream + assert_eq!(&buf, &underlying); + } + + #[tokio::test] + async fn full_rewind() { + let underlying = [104, 101, 108, 108, 111]; + + let mock = tokio_test::io::Builder::new().read(&underlying).build(); + + let mut stream = Rewind::new(mock); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + + // Rewind the stream so that it is as if we never read in the first place. + stream.rewind(Bytes::copy_from_slice(&buf[..])); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + } +} diff --git a/third_party/rust/hyper/src/common/lazy.rs b/third_party/rust/hyper/src/common/lazy.rs new file mode 100644 index 0000000000..4d2e322c2c --- /dev/null +++ b/third_party/rust/hyper/src/common/lazy.rs @@ -0,0 +1,69 @@ +use std::mem; + +use super::{task, Future, Pin, Poll}; + +pub(crate) trait Started: Future { + fn started(&self) -> bool; +} + +pub(crate) fn lazy<F, R>(func: F) -> Lazy<F, R> +where + F: FnOnce() -> R, + R: Future + Unpin, +{ + Lazy { + inner: Inner::Init(func), + } +} + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +pub(crate) struct Lazy<F, R> { + inner: Inner<F, R>, +} + +enum Inner<F, R> { + Init(F), + Fut(R), + Empty, +} + +impl<F, R> Started for Lazy<F, R> +where + F: FnOnce() -> R, + R: Future + Unpin, +{ + fn started(&self) -> bool { + match self.inner { + Inner::Init(_) => false, + Inner::Fut(_) | Inner::Empty => true, + } + } +} + +impl<F, R> Future for Lazy<F, R> +where + F: FnOnce() -> R, + R: Future + Unpin, +{ + type Output = R::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + if let Inner::Fut(ref mut f) = self.inner { + return Pin::new(f).poll(cx); + } + + match mem::replace(&mut self.inner, Inner::Empty) { + Inner::Init(func) => { + let mut fut = func(); + let ret = Pin::new(&mut fut).poll(cx); + self.inner = Inner::Fut(fut); + ret + } + _ => unreachable!("lazy state wrong"), + } + } +} + +// The closure `F` is never pinned +impl<F, R: Unpin> Unpin for Lazy<F, R> {} diff --git a/third_party/rust/hyper/src/common/mod.rs b/third_party/rust/hyper/src/common/mod.rs new file mode 100644 index 0000000000..e436fe5e2d --- /dev/null +++ b/third_party/rust/hyper/src/common/mod.rs @@ -0,0 +1,26 @@ +macro_rules! ready { + ($e:expr) => { + match $e { + std::task::Poll::Ready(v) => v, + std::task::Poll::Pending => return std::task::Poll::Pending, + } + }; +} + +pub(crate) mod buf; +pub(crate) mod drain; +pub(crate) mod exec; +pub(crate) mod io; +mod lazy; +mod never; +pub(crate) mod task; +pub(crate) mod watch; + +pub use self::exec::Executor; +pub(crate) use self::exec::{BoxSendFuture, Exec}; +pub(crate) use self::lazy::{lazy, Started as Lazy}; +pub use self::never::Never; +pub(crate) use self::task::Poll; + +// group up types normally needed for `Future` +pub(crate) use std::{future::Future, marker::Unpin, pin::Pin}; diff --git a/third_party/rust/hyper/src/common/never.rs b/third_party/rust/hyper/src/common/never.rs new file mode 100644 index 0000000000..f4fdb95ddd --- /dev/null +++ b/third_party/rust/hyper/src/common/never.rs @@ -0,0 +1,21 @@ +//! An uninhabitable type meaning it can never happen. +//! +//! To be replaced with `!` once it is stable. + +use std::error::Error; +use std::fmt; + +#[derive(Debug)] +pub enum Never {} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self {} + } +} + +impl Error for Never { + fn description(&self) -> &str { + match *self {} + } +} diff --git a/third_party/rust/hyper/src/common/task.rs b/third_party/rust/hyper/src/common/task.rs new file mode 100644 index 0000000000..bfccfe3bfe --- /dev/null +++ b/third_party/rust/hyper/src/common/task.rs @@ -0,0 +1,10 @@ +use super::Never; +pub(crate) use std::task::{Context, Poll}; + +/// A function to help "yield" a future, such that it is re-scheduled immediately. +/// +/// Useful for spin counts, so a future doesn't hog too much time. +pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<Never> { + cx.waker().wake_by_ref(); + Poll::Pending +} diff --git a/third_party/rust/hyper/src/common/watch.rs b/third_party/rust/hyper/src/common/watch.rs new file mode 100644 index 0000000000..ba17d551cb --- /dev/null +++ b/third_party/rust/hyper/src/common/watch.rs @@ -0,0 +1,73 @@ +//! An SPSC broadcast channel. +//! +//! - The value can only be a `usize`. +//! - The consumer is only notified if the value is different. +//! - The value `0` is reserved for closed. + +use futures_util::task::AtomicWaker; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use std::task; + +type Value = usize; + +pub(crate) const CLOSED: usize = 0; + +pub(crate) fn channel(initial: Value) -> (Sender, Receiver) { + debug_assert!( + initial != CLOSED, + "watch::channel initial state of 0 is reserved" + ); + + let shared = Arc::new(Shared { + value: AtomicUsize::new(initial), + waker: AtomicWaker::new(), + }); + + ( + Sender { + shared: shared.clone(), + }, + Receiver { shared }, + ) +} + +pub(crate) struct Sender { + shared: Arc<Shared>, +} + +pub(crate) struct Receiver { + shared: Arc<Shared>, +} + +struct Shared { + value: AtomicUsize, + waker: AtomicWaker, +} + +impl Sender { + pub(crate) fn send(&mut self, value: Value) { + if self.shared.value.swap(value, Ordering::SeqCst) != value { + self.shared.waker.wake(); + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.send(CLOSED); + } +} + +impl Receiver { + pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value { + self.shared.waker.register(cx.waker()); + self.shared.value.load(Ordering::SeqCst) + } + + pub(crate) fn peek(&self) -> Value { + self.shared.value.load(Ordering::Relaxed) + } +} |