summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/common
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/common')
-rw-r--r--third_party/rust/hyper/src/common/buf.rs151
-rw-r--r--third_party/rust/hyper/src/common/date.rs124
-rw-r--r--third_party/rust/hyper/src/common/drain.rs217
-rw-r--r--third_party/rust/hyper/src/common/exec.rs145
-rw-r--r--third_party/rust/hyper/src/common/io/mod.rs3
-rw-r--r--third_party/rust/hyper/src/common/io/rewind.rs155
-rw-r--r--third_party/rust/hyper/src/common/lazy.rs76
-rw-r--r--third_party/rust/hyper/src/common/mod.rs39
-rw-r--r--third_party/rust/hyper/src/common/never.rs21
-rw-r--r--third_party/rust/hyper/src/common/sync_wrapper.rs110
-rw-r--r--third_party/rust/hyper/src/common/task.rs12
-rw-r--r--third_party/rust/hyper/src/common/watch.rs73
12 files changed, 1126 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/common/buf.rs b/third_party/rust/hyper/src/common/buf.rs
new file mode 100644
index 0000000000..64e9333ead
--- /dev/null
+++ b/third_party/rust/hyper/src/common/buf.rs
@@ -0,0 +1,151 @@
+use std::collections::VecDeque;
+use std::io::IoSlice;
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+
+pub(crate) struct BufList<T> {
+ bufs: VecDeque<T>,
+}
+
+impl<T: Buf> BufList<T> {
+ pub(crate) fn new() -> BufList<T> {
+ BufList {
+ bufs: VecDeque::new(),
+ }
+ }
+
+ #[inline]
+ pub(crate) fn push(&mut self, buf: T) {
+ debug_assert!(buf.has_remaining());
+ self.bufs.push_back(buf);
+ }
+
+ #[inline]
+ #[cfg(feature = "http1")]
+ pub(crate) fn bufs_cnt(&self) -> usize {
+ self.bufs.len()
+ }
+}
+
+impl<T: Buf> Buf for BufList<T> {
+ #[inline]
+ fn remaining(&self) -> usize {
+ self.bufs.iter().map(|buf| buf.remaining()).sum()
+ }
+
+ #[inline]
+ fn chunk(&self) -> &[u8] {
+ self.bufs.front().map(Buf::chunk).unwrap_or_default()
+ }
+
+ #[inline]
+ fn advance(&mut self, mut cnt: usize) {
+ while cnt > 0 {
+ {
+ let front = &mut self.bufs[0];
+ let rem = front.remaining();
+ if rem > cnt {
+ front.advance(cnt);
+ return;
+ } else {
+ front.advance(rem);
+ cnt -= rem;
+ }
+ }
+ self.bufs.pop_front();
+ }
+ }
+
+ #[inline]
+ fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
+ if dst.is_empty() {
+ return 0;
+ }
+ let mut vecs = 0;
+ for buf in &self.bufs {
+ vecs += buf.chunks_vectored(&mut dst[vecs..]);
+ if vecs == dst.len() {
+ break;
+ }
+ }
+ vecs
+ }
+
+ #[inline]
+ fn copy_to_bytes(&mut self, len: usize) -> Bytes {
+ // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
+ // request can be fulfilled by the front buffer, we can take advantage.
+ match self.bufs.front_mut() {
+ Some(front) if front.remaining() == len => {
+ let b = front.copy_to_bytes(len);
+ self.bufs.pop_front();
+ b
+ }
+ Some(front) if front.remaining() > len => front.copy_to_bytes(len),
+ _ => {
+ assert!(len <= self.remaining(), "`len` greater than remaining");
+ let mut bm = BytesMut::with_capacity(len);
+ bm.put(self.take(len));
+ bm.freeze()
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::ptr;
+
+ use super::*;
+
+ fn hello_world_buf() -> BufList<Bytes> {
+ BufList {
+ bufs: vec![Bytes::from("Hello"), Bytes::from(" "), Bytes::from("World")].into(),
+ }
+ }
+
+ #[test]
+ fn to_bytes_shorter() {
+ let mut bufs = hello_world_buf();
+ let old_ptr = bufs.chunk().as_ptr();
+ let start = bufs.copy_to_bytes(4);
+ assert_eq!(start, "Hell");
+ assert!(ptr::eq(old_ptr, start.as_ptr()));
+ assert_eq!(bufs.chunk(), b"o");
+ assert!(ptr::eq(old_ptr.wrapping_add(4), bufs.chunk().as_ptr()));
+ assert_eq!(bufs.remaining(), 7);
+ }
+
+ #[test]
+ fn to_bytes_eq() {
+ let mut bufs = hello_world_buf();
+ let old_ptr = bufs.chunk().as_ptr();
+ let start = bufs.copy_to_bytes(5);
+ assert_eq!(start, "Hello");
+ assert!(ptr::eq(old_ptr, start.as_ptr()));
+ assert_eq!(bufs.chunk(), b" ");
+ assert_eq!(bufs.remaining(), 6);
+ }
+
+ #[test]
+ fn to_bytes_longer() {
+ let mut bufs = hello_world_buf();
+ let start = bufs.copy_to_bytes(7);
+ assert_eq!(start, "Hello W");
+ assert_eq!(bufs.remaining(), 4);
+ }
+
+ #[test]
+ fn one_long_buf_to_bytes() {
+ let mut buf = BufList::new();
+ buf.push(b"Hello World" as &[_]);
+ assert_eq!(buf.copy_to_bytes(5), "Hello");
+ assert_eq!(buf.chunk(), b" World");
+ }
+
+ #[test]
+ #[should_panic(expected = "`len` greater than remaining")]
+ fn buf_to_bytes_too_many() {
+ hello_world_buf().copy_to_bytes(42);
+ }
+}
diff --git a/third_party/rust/hyper/src/common/date.rs b/third_party/rust/hyper/src/common/date.rs
new file mode 100644
index 0000000000..a436fc07c0
--- /dev/null
+++ b/third_party/rust/hyper/src/common/date.rs
@@ -0,0 +1,124 @@
+use std::cell::RefCell;
+use std::fmt::{self, Write};
+use std::str;
+use std::time::{Duration, SystemTime};
+
+#[cfg(feature = "http2")]
+use http::header::HeaderValue;
+use httpdate::HttpDate;
+
+// "Sun, 06 Nov 1994 08:49:37 GMT".len()
+pub(crate) const DATE_VALUE_LENGTH: usize = 29;
+
+#[cfg(feature = "http1")]
+pub(crate) fn extend(dst: &mut Vec<u8>) {
+ CACHED.with(|cache| {
+ dst.extend_from_slice(cache.borrow().buffer());
+ })
+}
+
+#[cfg(feature = "http1")]
+pub(crate) fn update() {
+ CACHED.with(|cache| {
+ cache.borrow_mut().check();
+ })
+}
+
+#[cfg(feature = "http2")]
+pub(crate) fn update_and_header_value() -> HeaderValue {
+ CACHED.with(|cache| {
+ let mut cache = cache.borrow_mut();
+ cache.check();
+ HeaderValue::from_bytes(cache.buffer()).expect("Date format should be valid HeaderValue")
+ })
+}
+
+struct CachedDate {
+ bytes: [u8; DATE_VALUE_LENGTH],
+ pos: usize,
+ next_update: SystemTime,
+}
+
+thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate::new()));
+
+impl CachedDate {
+ fn new() -> Self {
+ let mut cache = CachedDate {
+ bytes: [0; DATE_VALUE_LENGTH],
+ pos: 0,
+ next_update: SystemTime::now(),
+ };
+ cache.update(cache.next_update);
+ cache
+ }
+
+ fn buffer(&self) -> &[u8] {
+ &self.bytes[..]
+ }
+
+ fn check(&mut self) {
+ let now = SystemTime::now();
+ if now > self.next_update {
+ self.update(now);
+ }
+ }
+
+ fn update(&mut self, now: SystemTime) {
+ self.render(now);
+ self.next_update = now + Duration::new(1, 0);
+ }
+
+ fn render(&mut self, now: SystemTime) {
+ self.pos = 0;
+ let _ = write!(self, "{}", HttpDate::from(now));
+ debug_assert!(self.pos == DATE_VALUE_LENGTH);
+ }
+}
+
+impl fmt::Write for CachedDate {
+ fn write_str(&mut self, s: &str) -> fmt::Result {
+ let len = s.len();
+ self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
+ self.pos += len;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[cfg(feature = "nightly")]
+ use test::Bencher;
+
+ #[test]
+ fn test_date_len() {
+ assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
+ }
+
+ #[cfg(feature = "nightly")]
+ #[bench]
+ fn bench_date_check(b: &mut Bencher) {
+ let mut date = CachedDate::new();
+ // cache the first update
+ date.check();
+
+ b.iter(|| {
+ date.check();
+ });
+ }
+
+ #[cfg(feature = "nightly")]
+ #[bench]
+ fn bench_date_render(b: &mut Bencher) {
+ let mut date = CachedDate::new();
+ let now = SystemTime::now();
+ date.render(now);
+ b.bytes = date.buffer().len() as u64;
+
+ b.iter(|| {
+ date.render(now);
+ test::black_box(&date);
+ });
+ }
+}
diff --git a/third_party/rust/hyper/src/common/drain.rs b/third_party/rust/hyper/src/common/drain.rs
new file mode 100644
index 0000000000..174da876df
--- /dev/null
+++ b/third_party/rust/hyper/src/common/drain.rs
@@ -0,0 +1,217 @@
+use std::mem;
+
+use pin_project_lite::pin_project;
+use tokio::sync::watch;
+
+use super::{task, Future, Pin, Poll};
+
+pub(crate) fn channel() -> (Signal, Watch) {
+ let (tx, rx) = watch::channel(());
+ (Signal { tx }, Watch { rx })
+}
+
+pub(crate) struct Signal {
+ tx: watch::Sender<()>,
+}
+
+pub(crate) struct Draining(Pin<Box<dyn Future<Output = ()> + Send + Sync>>);
+
+#[derive(Clone)]
+pub(crate) struct Watch {
+ rx: watch::Receiver<()>,
+}
+
+pin_project! {
+ #[allow(missing_debug_implementations)]
+ pub struct Watching<F, FN> {
+ #[pin]
+ future: F,
+ state: State<FN>,
+ watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
+ _rx: watch::Receiver<()>,
+ }
+}
+
+enum State<F> {
+ Watch(F),
+ Draining,
+}
+
+impl Signal {
+ pub(crate) fn drain(self) -> Draining {
+ let _ = self.tx.send(());
+ Draining(Box::pin(async move { self.tx.closed().await }))
+ }
+}
+
+impl Future for Draining {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.as_mut().0).poll(cx)
+ }
+}
+
+impl Watch {
+ pub(crate) fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
+ where
+ F: Future,
+ FN: FnOnce(Pin<&mut F>),
+ {
+ let Self { mut rx } = self;
+ let _rx = rx.clone();
+ Watching {
+ future,
+ state: State::Watch(on_drain),
+ watch: Box::pin(async move {
+ let _ = rx.changed().await;
+ }),
+ // Keep the receiver alive until the future completes, so that
+ // dropping it can signal that draining has completed.
+ _rx,
+ }
+ }
+}
+
+impl<F, FN> Future for Watching<F, FN>
+where
+ F: Future,
+ FN: FnOnce(Pin<&mut F>),
+{
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ loop {
+ match mem::replace(me.state, State::Draining) {
+ State::Watch(on_drain) => {
+ match Pin::new(&mut me.watch).poll(cx) {
+ Poll::Ready(()) => {
+ // Drain has been triggered!
+ on_drain(me.future.as_mut());
+ }
+ Poll::Pending => {
+ *me.state = State::Watch(on_drain);
+ return me.future.poll(cx);
+ }
+ }
+ }
+ State::Draining => return me.future.poll(cx),
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ struct TestMe {
+ draining: bool,
+ finished: bool,
+ poll_cnt: usize,
+ }
+
+ impl Future for TestMe {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
+ self.poll_cnt += 1;
+ if self.finished {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+
+ #[test]
+ fn watch() {
+ let mut mock = tokio_test::task::spawn(());
+ mock.enter(|cx, _| {
+ let (tx, rx) = channel();
+ let fut = TestMe {
+ draining: false,
+ finished: false,
+ poll_cnt: 0,
+ };
+
+ let mut watch = rx.watch(fut, |mut fut| {
+ fut.draining = true;
+ });
+
+ assert_eq!(watch.future.poll_cnt, 0);
+
+ // First poll should poll the inner future
+ assert!(Pin::new(&mut watch).poll(cx).is_pending());
+ assert_eq!(watch.future.poll_cnt, 1);
+
+ // Second poll should poll the inner future again
+ assert!(Pin::new(&mut watch).poll(cx).is_pending());
+ assert_eq!(watch.future.poll_cnt, 2);
+
+ let mut draining = tx.drain();
+ // Drain signaled, but needs another poll to be noticed.
+ assert!(!watch.future.draining);
+ assert_eq!(watch.future.poll_cnt, 2);
+
+ // Now, poll after drain has been signaled.
+ assert!(Pin::new(&mut watch).poll(cx).is_pending());
+ assert_eq!(watch.future.poll_cnt, 3);
+ assert!(watch.future.draining);
+
+ // Draining is not ready until watcher completes
+ assert!(Pin::new(&mut draining).poll(cx).is_pending());
+
+ // Finishing up the watch future
+ watch.future.finished = true;
+ assert!(Pin::new(&mut watch).poll(cx).is_ready());
+ assert_eq!(watch.future.poll_cnt, 4);
+ drop(watch);
+
+ assert!(Pin::new(&mut draining).poll(cx).is_ready());
+ })
+ }
+
+ #[test]
+ fn watch_clones() {
+ let mut mock = tokio_test::task::spawn(());
+ mock.enter(|cx, _| {
+ let (tx, rx) = channel();
+
+ let fut1 = TestMe {
+ draining: false,
+ finished: false,
+ poll_cnt: 0,
+ };
+ let fut2 = TestMe {
+ draining: false,
+ finished: false,
+ poll_cnt: 0,
+ };
+
+ let watch1 = rx.clone().watch(fut1, |mut fut| {
+ fut.draining = true;
+ });
+ let watch2 = rx.watch(fut2, |mut fut| {
+ fut.draining = true;
+ });
+
+ let mut draining = tx.drain();
+
+ // Still 2 outstanding watchers
+ assert!(Pin::new(&mut draining).poll(cx).is_pending());
+
+ // drop 1 for whatever reason
+ drop(watch1);
+
+ // Still not ready, 1 other watcher still pending
+ assert!(Pin::new(&mut draining).poll(cx).is_pending());
+
+ drop(watch2);
+
+ // Now all watchers are gone, draining is complete
+ assert!(Pin::new(&mut draining).poll(cx).is_ready());
+ });
+ }
+}
diff --git a/third_party/rust/hyper/src/common/exec.rs b/third_party/rust/hyper/src/common/exec.rs
new file mode 100644
index 0000000000..76f616184b
--- /dev/null
+++ b/third_party/rust/hyper/src/common/exec.rs
@@ -0,0 +1,145 @@
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+use crate::body::Body;
+#[cfg(feature = "server")]
+use crate::body::HttpBody;
+#[cfg(all(feature = "http2", feature = "server"))]
+use crate::proto::h2::server::H2Stream;
+use crate::rt::Executor;
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+use crate::server::server::{new_svc::NewSvcTask, Watcher};
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+use crate::service::HttpService;
+
+#[cfg(feature = "server")]
+pub trait ConnStreamExec<F, B: HttpBody>: Clone {
+ fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
+}
+
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone {
+ fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>);
+}
+
+pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
+
+// Either the user provides an executor for background tasks, or we use
+// `tokio::spawn`.
+#[derive(Clone)]
+pub enum Exec {
+ Default,
+ Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>),
+}
+
+// ===== impl Exec =====
+
+impl Exec {
+ pub(crate) fn execute<F>(&self, fut: F)
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ match *self {
+ Exec::Default => {
+ #[cfg(feature = "tcp")]
+ {
+ tokio::task::spawn(fut);
+ }
+ #[cfg(not(feature = "tcp"))]
+ {
+ // If no runtime, we need an executor!
+ panic!("executor must be set")
+ }
+ }
+ Exec::Executor(ref e) => {
+ e.execute(Box::pin(fut));
+ }
+ }
+ }
+}
+
+impl fmt::Debug for Exec {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Exec").finish()
+ }
+}
+
+#[cfg(feature = "server")]
+impl<F, B> ConnStreamExec<F, B> for Exec
+where
+ H2Stream<F, B>: Future<Output = ()> + Send + 'static,
+ B: HttpBody,
+{
+ fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
+ self.execute(fut)
+ }
+}
+
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec
+where
+ NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static,
+ S: HttpService<Body>,
+ W: Watcher<I, S, E>,
+{
+ fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
+ self.execute(fut)
+ }
+}
+
+// ==== impl Executor =====
+
+#[cfg(feature = "server")]
+impl<E, F, B> ConnStreamExec<F, B> for E
+where
+ E: Executor<H2Stream<F, B>> + Clone,
+ H2Stream<F, B>: Future<Output = ()>,
+ B: HttpBody,
+{
+ fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
+ self.execute(fut)
+ }
+}
+
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
+where
+ E: Executor<NewSvcTask<I, N, S, E, W>> + Clone,
+ NewSvcTask<I, N, S, E, W>: Future<Output = ()>,
+ S: HttpService<Body>,
+ W: Watcher<I, S, E>,
+{
+ fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
+ self.execute(fut)
+ }
+}
+
+// If http2 is not enable, we just have a stub here, so that the trait bounds
+// that *would* have been needed are still checked. Why?
+//
+// Because enabling `http2` shouldn't suddenly add new trait bounds that cause
+// a compilation error.
+#[cfg(not(feature = "http2"))]
+#[allow(missing_debug_implementations)]
+pub struct H2Stream<F, B>(std::marker::PhantomData<(F, B)>);
+
+#[cfg(not(feature = "http2"))]
+impl<F, B, E> Future for H2Stream<F, B>
+where
+ F: Future<Output = Result<http::Response<B>, E>>,
+ B: crate::body::HttpBody,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ E: Into<Box<dyn std::error::Error + Send + Sync>>,
+{
+ type Output = ();
+
+ fn poll(
+ self: Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ unreachable!()
+ }
+}
diff --git a/third_party/rust/hyper/src/common/io/mod.rs b/third_party/rust/hyper/src/common/io/mod.rs
new file mode 100644
index 0000000000..2e6d506153
--- /dev/null
+++ b/third_party/rust/hyper/src/common/io/mod.rs
@@ -0,0 +1,3 @@
+mod rewind;
+
+pub(crate) use self::rewind::Rewind;
diff --git a/third_party/rust/hyper/src/common/io/rewind.rs b/third_party/rust/hyper/src/common/io/rewind.rs
new file mode 100644
index 0000000000..0afef5f7ea
--- /dev/null
+++ b/third_party/rust/hyper/src/common/io/rewind.rs
@@ -0,0 +1,155 @@
+use std::marker::Unpin;
+use std::{cmp, io};
+
+use bytes::{Buf, Bytes};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+
+use crate::common::{task, Pin, Poll};
+
+/// Combine a buffer with an IO, rewinding reads to use the buffer.
+#[derive(Debug)]
+pub(crate) struct Rewind<T> {
+ pre: Option<Bytes>,
+ inner: T,
+}
+
+impl<T> Rewind<T> {
+ #[cfg(any(all(feature = "http2", feature = "server"), test))]
+ pub(crate) fn new(io: T) -> Self {
+ Rewind {
+ pre: None,
+ inner: io,
+ }
+ }
+
+ pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self {
+ Rewind {
+ pre: Some(buf),
+ inner: io,
+ }
+ }
+
+ #[cfg(any(all(feature = "http1", feature = "http2", feature = "server"), test))]
+ pub(crate) fn rewind(&mut self, bs: Bytes) {
+ debug_assert!(self.pre.is_none());
+ self.pre = Some(bs);
+ }
+
+ pub(crate) fn into_inner(self) -> (T, Bytes) {
+ (self.inner, self.pre.unwrap_or_else(Bytes::new))
+ }
+
+ // pub(crate) fn get_mut(&mut self) -> &mut T {
+ // &mut self.inner
+ // }
+}
+
+impl<T> AsyncRead for Rewind<T>
+where
+ T: AsyncRead + Unpin,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ if let Some(mut prefix) = self.pre.take() {
+ // If there are no remaining bytes, let the bytes get dropped.
+ if !prefix.is_empty() {
+ let copy_len = cmp::min(prefix.len(), buf.remaining());
+ // TODO: There should be a way to do following two lines cleaner...
+ buf.put_slice(&prefix[..copy_len]);
+ prefix.advance(copy_len);
+ // Put back what's left
+ if !prefix.is_empty() {
+ self.pre = Some(prefix);
+ }
+
+ return Poll::Ready(Ok(()));
+ }
+ }
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+}
+
+impl<T> AsyncWrite for Rewind<T>
+where
+ T: AsyncWrite + Unpin,
+{
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.inner.is_write_vectored()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ // FIXME: re-implement tests with `async/await`, this import should
+ // trigger a warning to remind us
+ use super::Rewind;
+ use bytes::Bytes;
+ use tokio::io::AsyncReadExt;
+
+ #[tokio::test]
+ async fn partial_rewind() {
+ let underlying = [104, 101, 108, 108, 111];
+
+ let mock = tokio_test::io::Builder::new().read(&underlying).build();
+
+ let mut stream = Rewind::new(mock);
+
+ // Read off some bytes, ensure we filled o1
+ let mut buf = [0; 2];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // Rewind the stream so that it is as if we never read in the first place.
+ stream.rewind(Bytes::copy_from_slice(&buf[..]));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // At this point we should have read everything that was in the MockStream
+ assert_eq!(&buf, &underlying);
+ }
+
+ #[tokio::test]
+ async fn full_rewind() {
+ let underlying = [104, 101, 108, 108, 111];
+
+ let mock = tokio_test::io::Builder::new().read(&underlying).build();
+
+ let mut stream = Rewind::new(mock);
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // Rewind the stream so that it is as if we never read in the first place.
+ stream.rewind(Bytes::copy_from_slice(&buf[..]));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+ }
+}
diff --git a/third_party/rust/hyper/src/common/lazy.rs b/third_party/rust/hyper/src/common/lazy.rs
new file mode 100644
index 0000000000..2722077303
--- /dev/null
+++ b/third_party/rust/hyper/src/common/lazy.rs
@@ -0,0 +1,76 @@
+use pin_project_lite::pin_project;
+
+use super::{task, Future, Pin, Poll};
+
+pub(crate) trait Started: Future {
+ fn started(&self) -> bool;
+}
+
+pub(crate) fn lazy<F, R>(func: F) -> Lazy<F, R>
+where
+ F: FnOnce() -> R,
+ R: Future + Unpin,
+{
+ Lazy {
+ inner: Inner::Init { func },
+ }
+}
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+pin_project! {
+ #[allow(missing_debug_implementations)]
+ pub(crate) struct Lazy<F, R> {
+ #[pin]
+ inner: Inner<F, R>,
+ }
+}
+
+pin_project! {
+ #[project = InnerProj]
+ #[project_replace = InnerProjReplace]
+ enum Inner<F, R> {
+ Init { func: F },
+ Fut { #[pin] fut: R },
+ Empty,
+ }
+}
+
+impl<F, R> Started for Lazy<F, R>
+where
+ F: FnOnce() -> R,
+ R: Future,
+{
+ fn started(&self) -> bool {
+ match self.inner {
+ Inner::Init { .. } => false,
+ Inner::Fut { .. } | Inner::Empty => true,
+ }
+ }
+}
+
+impl<F, R> Future for Lazy<F, R>
+where
+ F: FnOnce() -> R,
+ R: Future,
+{
+ type Output = R::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+
+ if let InnerProj::Fut { fut } = this.inner.as_mut().project() {
+ return fut.poll(cx);
+ }
+
+ match this.inner.as_mut().project_replace(Inner::Empty) {
+ InnerProjReplace::Init { func } => {
+ this.inner.set(Inner::Fut { fut: func() });
+ if let InnerProj::Fut { fut } = this.inner.project() {
+ return fut.poll(cx);
+ }
+ unreachable!()
+ }
+ _ => unreachable!("lazy state wrong"),
+ }
+ }
+}
diff --git a/third_party/rust/hyper/src/common/mod.rs b/third_party/rust/hyper/src/common/mod.rs
new file mode 100644
index 0000000000..e38c6f5c7a
--- /dev/null
+++ b/third_party/rust/hyper/src/common/mod.rs
@@ -0,0 +1,39 @@
+macro_rules! ready {
+ ($e:expr) => {
+ match $e {
+ std::task::Poll::Ready(v) => v,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }
+ };
+}
+
+pub(crate) mod buf;
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+pub(crate) mod date;
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+pub(crate) mod drain;
+#[cfg(any(feature = "http1", feature = "http2", feature = "server"))]
+pub(crate) mod exec;
+pub(crate) mod io;
+#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
+mod lazy;
+mod never;
+#[cfg(any(
+ feature = "stream",
+ all(feature = "client", any(feature = "http1", feature = "http2"))
+))]
+pub(crate) mod sync_wrapper;
+pub(crate) mod task;
+pub(crate) mod watch;
+
+#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
+pub(crate) use self::lazy::{lazy, Started as Lazy};
+#[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))]
+pub(crate) use self::never::Never;
+pub(crate) use self::task::Poll;
+
+// group up types normally needed for `Future`
+cfg_proto! {
+ pub(crate) use std::marker::Unpin;
+}
+pub(crate) use std::{future::Future, pin::Pin};
diff --git a/third_party/rust/hyper/src/common/never.rs b/third_party/rust/hyper/src/common/never.rs
new file mode 100644
index 0000000000..f143caf60f
--- /dev/null
+++ b/third_party/rust/hyper/src/common/never.rs
@@ -0,0 +1,21 @@
+//! An uninhabitable type meaning it can never happen.
+//!
+//! To be replaced with `!` once it is stable.
+
+use std::error::Error;
+use std::fmt;
+
+#[derive(Debug)]
+pub(crate) enum Never {}
+
+impl fmt::Display for Never {
+ fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {}
+ }
+}
+
+impl Error for Never {
+ fn description(&self) -> &str {
+ match *self {}
+ }
+}
diff --git a/third_party/rust/hyper/src/common/sync_wrapper.rs b/third_party/rust/hyper/src/common/sync_wrapper.rs
new file mode 100644
index 0000000000..704d1a6712
--- /dev/null
+++ b/third_party/rust/hyper/src/common/sync_wrapper.rs
@@ -0,0 +1,110 @@
+/*
+ * This is a copy of the sync_wrapper crate.
+ */
+
+/// A mutual exclusion primitive that relies on static type information only
+///
+/// In some cases synchronization can be proven statically: whenever you hold an exclusive `&mut`
+/// reference, the Rust type system ensures that no other part of the program can hold another
+/// reference to the data. Therefore it is safe to access it even if the current thread obtained
+/// this reference via a channel. Whenever this is the case, the overhead of allocating and locking
+/// a [`Mutex`] can be avoided by using this static version.
+///
+/// One example where this is often applicable is [`Future`], which requires an exclusive reference
+/// for its [`poll`] method: While a given `Future` implementation may not be safe to access by
+/// multiple threads concurrently, the executor can only run the `Future` on one thread at any
+/// given time, making it [`Sync`] in practice as long as the implementation is `Send`. You can
+/// therefore use the sync wrapper to prove that your data structure is `Sync` even though it
+/// contains such a `Future`.
+///
+/// # Example
+///
+/// ```ignore
+/// use hyper::common::sync_wrapper::SyncWrapper;
+/// use std::future::Future;
+///
+/// struct MyThing {
+/// future: SyncWrapper<Box<dyn Future<Output = String> + Send>>,
+/// }
+///
+/// impl MyThing {
+/// // all accesses to `self.future` now require an exclusive reference or ownership
+/// }
+///
+/// fn assert_sync<T: Sync>() {}
+///
+/// assert_sync::<MyThing>();
+/// ```
+///
+/// [`Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
+/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html
+/// [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#method.poll
+/// [`Sync`]: https://doc.rust-lang.org/std/marker/trait.Sync.html
+#[repr(transparent)]
+pub(crate) struct SyncWrapper<T>(T);
+
+impl<T> SyncWrapper<T> {
+ /// Creates a new SyncWrapper containing the given value.
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// use hyper::common::sync_wrapper::SyncWrapper;
+ ///
+ /// let wrapped = SyncWrapper::new(42);
+ /// ```
+ pub(crate) fn new(value: T) -> Self {
+ Self(value)
+ }
+
+ /// Acquires a reference to the protected value.
+ ///
+ /// This is safe because it requires an exclusive reference to the wrapper. Therefore this method
+ /// neither panics nor does it return an error. This is in contrast to [`Mutex::get_mut`] which
+ /// returns an error if another thread panicked while holding the lock. It is not recommended
+ /// to send an exclusive reference to a potentially damaged value to another thread for further
+ /// processing.
+ ///
+ /// [`Mutex::get_mut`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.get_mut
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// use hyper::common::sync_wrapper::SyncWrapper;
+ ///
+ /// let mut wrapped = SyncWrapper::new(42);
+ /// let value = wrapped.get_mut();
+ /// *value = 0;
+ /// assert_eq!(*wrapped.get_mut(), 0);
+ /// ```
+ pub(crate) fn get_mut(&mut self) -> &mut T {
+ &mut self.0
+ }
+
+ /// Consumes this wrapper, returning the underlying data.
+ ///
+ /// This is safe because it requires ownership of the wrapper, aherefore this method will neither
+ /// panic nor does it return an error. This is in contrast to [`Mutex::into_inner`] which
+ /// returns an error if another thread panicked while holding the lock. It is not recommended
+ /// to send an exclusive reference to a potentially damaged value to another thread for further
+ /// processing.
+ ///
+ /// [`Mutex::into_inner`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.into_inner
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// use hyper::common::sync_wrapper::SyncWrapper;
+ ///
+ /// let mut wrapped = SyncWrapper::new(42);
+ /// assert_eq!(wrapped.into_inner(), 42);
+ /// ```
+ #[allow(dead_code)]
+ pub(crate) fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+// this is safe because the only operations permitted on this data structure require exclusive
+// access or ownership
+unsafe impl<T: Send> Sync for SyncWrapper<T> {}
diff --git a/third_party/rust/hyper/src/common/task.rs b/third_party/rust/hyper/src/common/task.rs
new file mode 100644
index 0000000000..ec70c957d6
--- /dev/null
+++ b/third_party/rust/hyper/src/common/task.rs
@@ -0,0 +1,12 @@
+#[cfg(feature = "http1")]
+use super::Never;
+pub(crate) use std::task::{Context, Poll};
+
+/// A function to help "yield" a future, such that it is re-scheduled immediately.
+///
+/// Useful for spin counts, so a future doesn't hog too much time.
+#[cfg(feature = "http1")]
+pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<Never> {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+}
diff --git a/third_party/rust/hyper/src/common/watch.rs b/third_party/rust/hyper/src/common/watch.rs
new file mode 100644
index 0000000000..ba17d551cb
--- /dev/null
+++ b/third_party/rust/hyper/src/common/watch.rs
@@ -0,0 +1,73 @@
+//! An SPSC broadcast channel.
+//!
+//! - The value can only be a `usize`.
+//! - The consumer is only notified if the value is different.
+//! - The value `0` is reserved for closed.
+
+use futures_util::task::AtomicWaker;
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+use std::task;
+
+type Value = usize;
+
+pub(crate) const CLOSED: usize = 0;
+
+pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
+ debug_assert!(
+ initial != CLOSED,
+ "watch::channel initial state of 0 is reserved"
+ );
+
+ let shared = Arc::new(Shared {
+ value: AtomicUsize::new(initial),
+ waker: AtomicWaker::new(),
+ });
+
+ (
+ Sender {
+ shared: shared.clone(),
+ },
+ Receiver { shared },
+ )
+}
+
+pub(crate) struct Sender {
+ shared: Arc<Shared>,
+}
+
+pub(crate) struct Receiver {
+ shared: Arc<Shared>,
+}
+
+struct Shared {
+ value: AtomicUsize,
+ waker: AtomicWaker,
+}
+
+impl Sender {
+ pub(crate) fn send(&mut self, value: Value) {
+ if self.shared.value.swap(value, Ordering::SeqCst) != value {
+ self.shared.waker.wake();
+ }
+ }
+}
+
+impl Drop for Sender {
+ fn drop(&mut self) {
+ self.send(CLOSED);
+ }
+}
+
+impl Receiver {
+ pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
+ self.shared.waker.register(cx.waker());
+ self.shared.value.load(Ordering::SeqCst)
+ }
+
+ pub(crate) fn peek(&self) -> Value {
+ self.shared.value.load(Ordering::Relaxed)
+ }
+}