diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/hyper/src/common | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/hyper/src/common')
-rw-r--r-- | third_party/rust/hyper/src/common/buf.rs | 151 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/date.rs | 124 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/drain.rs | 217 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/exec.rs | 145 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/io/mod.rs | 3 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/io/rewind.rs | 155 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/lazy.rs | 76 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/mod.rs | 39 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/never.rs | 21 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/sync_wrapper.rs | 110 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/task.rs | 12 | ||||
-rw-r--r-- | third_party/rust/hyper/src/common/watch.rs | 73 |
12 files changed, 1126 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/common/buf.rs b/third_party/rust/hyper/src/common/buf.rs new file mode 100644 index 0000000000..64e9333ead --- /dev/null +++ b/third_party/rust/hyper/src/common/buf.rs @@ -0,0 +1,151 @@ +use std::collections::VecDeque; +use std::io::IoSlice; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +pub(crate) struct BufList<T> { + bufs: VecDeque<T>, +} + +impl<T: Buf> BufList<T> { + pub(crate) fn new() -> BufList<T> { + BufList { + bufs: VecDeque::new(), + } + } + + #[inline] + pub(crate) fn push(&mut self, buf: T) { + debug_assert!(buf.has_remaining()); + self.bufs.push_back(buf); + } + + #[inline] + #[cfg(feature = "http1")] + pub(crate) fn bufs_cnt(&self) -> usize { + self.bufs.len() + } +} + +impl<T: Buf> Buf for BufList<T> { + #[inline] + fn remaining(&self) -> usize { + self.bufs.iter().map(|buf| buf.remaining()).sum() + } + + #[inline] + fn chunk(&self) -> &[u8] { + self.bufs.front().map(Buf::chunk).unwrap_or_default() + } + + #[inline] + fn advance(&mut self, mut cnt: usize) { + while cnt > 0 { + { + let front = &mut self.bufs[0]; + let rem = front.remaining(); + if rem > cnt { + front.advance(cnt); + return; + } else { + front.advance(rem); + cnt -= rem; + } + } + self.bufs.pop_front(); + } + } + + #[inline] + fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { + if dst.is_empty() { + return 0; + } + let mut vecs = 0; + for buf in &self.bufs { + vecs += buf.chunks_vectored(&mut dst[vecs..]); + if vecs == dst.len() { + break; + } + } + vecs + } + + #[inline] + fn copy_to_bytes(&mut self, len: usize) -> Bytes { + // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole + // request can be fulfilled by the front buffer, we can take advantage. + match self.bufs.front_mut() { + Some(front) if front.remaining() == len => { + let b = front.copy_to_bytes(len); + self.bufs.pop_front(); + b + } + Some(front) if front.remaining() > len => front.copy_to_bytes(len), + _ => { + assert!(len <= self.remaining(), "`len` greater than remaining"); + let mut bm = BytesMut::with_capacity(len); + bm.put(self.take(len)); + bm.freeze() + } + } + } +} + +#[cfg(test)] +mod tests { + use std::ptr; + + use super::*; + + fn hello_world_buf() -> BufList<Bytes> { + BufList { + bufs: vec![Bytes::from("Hello"), Bytes::from(" "), Bytes::from("World")].into(), + } + } + + #[test] + fn to_bytes_shorter() { + let mut bufs = hello_world_buf(); + let old_ptr = bufs.chunk().as_ptr(); + let start = bufs.copy_to_bytes(4); + assert_eq!(start, "Hell"); + assert!(ptr::eq(old_ptr, start.as_ptr())); + assert_eq!(bufs.chunk(), b"o"); + assert!(ptr::eq(old_ptr.wrapping_add(4), bufs.chunk().as_ptr())); + assert_eq!(bufs.remaining(), 7); + } + + #[test] + fn to_bytes_eq() { + let mut bufs = hello_world_buf(); + let old_ptr = bufs.chunk().as_ptr(); + let start = bufs.copy_to_bytes(5); + assert_eq!(start, "Hello"); + assert!(ptr::eq(old_ptr, start.as_ptr())); + assert_eq!(bufs.chunk(), b" "); + assert_eq!(bufs.remaining(), 6); + } + + #[test] + fn to_bytes_longer() { + let mut bufs = hello_world_buf(); + let start = bufs.copy_to_bytes(7); + assert_eq!(start, "Hello W"); + assert_eq!(bufs.remaining(), 4); + } + + #[test] + fn one_long_buf_to_bytes() { + let mut buf = BufList::new(); + buf.push(b"Hello World" as &[_]); + assert_eq!(buf.copy_to_bytes(5), "Hello"); + assert_eq!(buf.chunk(), b" World"); + } + + #[test] + #[should_panic(expected = "`len` greater than remaining")] + fn buf_to_bytes_too_many() { + hello_world_buf().copy_to_bytes(42); + } +} diff --git a/third_party/rust/hyper/src/common/date.rs b/third_party/rust/hyper/src/common/date.rs new file mode 100644 index 0000000000..a436fc07c0 --- /dev/null +++ b/third_party/rust/hyper/src/common/date.rs @@ -0,0 +1,124 @@ +use std::cell::RefCell; +use std::fmt::{self, Write}; +use std::str; +use std::time::{Duration, SystemTime}; + +#[cfg(feature = "http2")] +use http::header::HeaderValue; +use httpdate::HttpDate; + +// "Sun, 06 Nov 1994 08:49:37 GMT".len() +pub(crate) const DATE_VALUE_LENGTH: usize = 29; + +#[cfg(feature = "http1")] +pub(crate) fn extend(dst: &mut Vec<u8>) { + CACHED.with(|cache| { + dst.extend_from_slice(cache.borrow().buffer()); + }) +} + +#[cfg(feature = "http1")] +pub(crate) fn update() { + CACHED.with(|cache| { + cache.borrow_mut().check(); + }) +} + +#[cfg(feature = "http2")] +pub(crate) fn update_and_header_value() -> HeaderValue { + CACHED.with(|cache| { + let mut cache = cache.borrow_mut(); + cache.check(); + HeaderValue::from_bytes(cache.buffer()).expect("Date format should be valid HeaderValue") + }) +} + +struct CachedDate { + bytes: [u8; DATE_VALUE_LENGTH], + pos: usize, + next_update: SystemTime, +} + +thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate::new())); + +impl CachedDate { + fn new() -> Self { + let mut cache = CachedDate { + bytes: [0; DATE_VALUE_LENGTH], + pos: 0, + next_update: SystemTime::now(), + }; + cache.update(cache.next_update); + cache + } + + fn buffer(&self) -> &[u8] { + &self.bytes[..] + } + + fn check(&mut self) { + let now = SystemTime::now(); + if now > self.next_update { + self.update(now); + } + } + + fn update(&mut self, now: SystemTime) { + self.render(now); + self.next_update = now + Duration::new(1, 0); + } + + fn render(&mut self, now: SystemTime) { + self.pos = 0; + let _ = write!(self, "{}", HttpDate::from(now)); + debug_assert!(self.pos == DATE_VALUE_LENGTH); + } +} + +impl fmt::Write for CachedDate { + fn write_str(&mut self, s: &str) -> fmt::Result { + let len = s.len(); + self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes()); + self.pos += len; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "nightly")] + use test::Bencher; + + #[test] + fn test_date_len() { + assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len()); + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_date_check(b: &mut Bencher) { + let mut date = CachedDate::new(); + // cache the first update + date.check(); + + b.iter(|| { + date.check(); + }); + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_date_render(b: &mut Bencher) { + let mut date = CachedDate::new(); + let now = SystemTime::now(); + date.render(now); + b.bytes = date.buffer().len() as u64; + + b.iter(|| { + date.render(now); + test::black_box(&date); + }); + } +} diff --git a/third_party/rust/hyper/src/common/drain.rs b/third_party/rust/hyper/src/common/drain.rs new file mode 100644 index 0000000000..174da876df --- /dev/null +++ b/third_party/rust/hyper/src/common/drain.rs @@ -0,0 +1,217 @@ +use std::mem; + +use pin_project_lite::pin_project; +use tokio::sync::watch; + +use super::{task, Future, Pin, Poll}; + +pub(crate) fn channel() -> (Signal, Watch) { + let (tx, rx) = watch::channel(()); + (Signal { tx }, Watch { rx }) +} + +pub(crate) struct Signal { + tx: watch::Sender<()>, +} + +pub(crate) struct Draining(Pin<Box<dyn Future<Output = ()> + Send + Sync>>); + +#[derive(Clone)] +pub(crate) struct Watch { + rx: watch::Receiver<()>, +} + +pin_project! { + #[allow(missing_debug_implementations)] + pub struct Watching<F, FN> { + #[pin] + future: F, + state: State<FN>, + watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>, + _rx: watch::Receiver<()>, + } +} + +enum State<F> { + Watch(F), + Draining, +} + +impl Signal { + pub(crate) fn drain(self) -> Draining { + let _ = self.tx.send(()); + Draining(Box::pin(async move { self.tx.closed().await })) + } +} + +impl Future for Draining { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.as_mut().0).poll(cx) + } +} + +impl Watch { + pub(crate) fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN> + where + F: Future, + FN: FnOnce(Pin<&mut F>), + { + let Self { mut rx } = self; + let _rx = rx.clone(); + Watching { + future, + state: State::Watch(on_drain), + watch: Box::pin(async move { + let _ = rx.changed().await; + }), + // Keep the receiver alive until the future completes, so that + // dropping it can signal that draining has completed. + _rx, + } + } +} + +impl<F, FN> Future for Watching<F, FN> +where + F: Future, + FN: FnOnce(Pin<&mut F>), +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + match mem::replace(me.state, State::Draining) { + State::Watch(on_drain) => { + match Pin::new(&mut me.watch).poll(cx) { + Poll::Ready(()) => { + // Drain has been triggered! + on_drain(me.future.as_mut()); + } + Poll::Pending => { + *me.state = State::Watch(on_drain); + return me.future.poll(cx); + } + } + } + State::Draining => return me.future.poll(cx), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct TestMe { + draining: bool, + finished: bool, + poll_cnt: usize, + } + + impl Future for TestMe { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> { + self.poll_cnt += 1; + if self.finished { + Poll::Ready(()) + } else { + Poll::Pending + } + } + } + + #[test] + fn watch() { + let mut mock = tokio_test::task::spawn(()); + mock.enter(|cx, _| { + let (tx, rx) = channel(); + let fut = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + + let mut watch = rx.watch(fut, |mut fut| { + fut.draining = true; + }); + + assert_eq!(watch.future.poll_cnt, 0); + + // First poll should poll the inner future + assert!(Pin::new(&mut watch).poll(cx).is_pending()); + assert_eq!(watch.future.poll_cnt, 1); + + // Second poll should poll the inner future again + assert!(Pin::new(&mut watch).poll(cx).is_pending()); + assert_eq!(watch.future.poll_cnt, 2); + + let mut draining = tx.drain(); + // Drain signaled, but needs another poll to be noticed. + assert!(!watch.future.draining); + assert_eq!(watch.future.poll_cnt, 2); + + // Now, poll after drain has been signaled. + assert!(Pin::new(&mut watch).poll(cx).is_pending()); + assert_eq!(watch.future.poll_cnt, 3); + assert!(watch.future.draining); + + // Draining is not ready until watcher completes + assert!(Pin::new(&mut draining).poll(cx).is_pending()); + + // Finishing up the watch future + watch.future.finished = true; + assert!(Pin::new(&mut watch).poll(cx).is_ready()); + assert_eq!(watch.future.poll_cnt, 4); + drop(watch); + + assert!(Pin::new(&mut draining).poll(cx).is_ready()); + }) + } + + #[test] + fn watch_clones() { + let mut mock = tokio_test::task::spawn(()); + mock.enter(|cx, _| { + let (tx, rx) = channel(); + + let fut1 = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + let fut2 = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + + let watch1 = rx.clone().watch(fut1, |mut fut| { + fut.draining = true; + }); + let watch2 = rx.watch(fut2, |mut fut| { + fut.draining = true; + }); + + let mut draining = tx.drain(); + + // Still 2 outstanding watchers + assert!(Pin::new(&mut draining).poll(cx).is_pending()); + + // drop 1 for whatever reason + drop(watch1); + + // Still not ready, 1 other watcher still pending + assert!(Pin::new(&mut draining).poll(cx).is_pending()); + + drop(watch2); + + // Now all watchers are gone, draining is complete + assert!(Pin::new(&mut draining).poll(cx).is_ready()); + }); + } +} diff --git a/third_party/rust/hyper/src/common/exec.rs b/third_party/rust/hyper/src/common/exec.rs new file mode 100644 index 0000000000..76f616184b --- /dev/null +++ b/third_party/rust/hyper/src/common/exec.rs @@ -0,0 +1,145 @@ +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +use crate::body::Body; +#[cfg(feature = "server")] +use crate::body::HttpBody; +#[cfg(all(feature = "http2", feature = "server"))] +use crate::proto::h2::server::H2Stream; +use crate::rt::Executor; +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +use crate::server::server::{new_svc::NewSvcTask, Watcher}; +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +use crate::service::HttpService; + +#[cfg(feature = "server")] +pub trait ConnStreamExec<F, B: HttpBody>: Clone { + fn execute_h2stream(&mut self, fut: H2Stream<F, B>); +} + +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone { + fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>); +} + +pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>; + +// Either the user provides an executor for background tasks, or we use +// `tokio::spawn`. +#[derive(Clone)] +pub enum Exec { + Default, + Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>), +} + +// ===== impl Exec ===== + +impl Exec { + pub(crate) fn execute<F>(&self, fut: F) + where + F: Future<Output = ()> + Send + 'static, + { + match *self { + Exec::Default => { + #[cfg(feature = "tcp")] + { + tokio::task::spawn(fut); + } + #[cfg(not(feature = "tcp"))] + { + // If no runtime, we need an executor! + panic!("executor must be set") + } + } + Exec::Executor(ref e) => { + e.execute(Box::pin(fut)); + } + } + } +} + +impl fmt::Debug for Exec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Exec").finish() + } +} + +#[cfg(feature = "server")] +impl<F, B> ConnStreamExec<F, B> for Exec +where + H2Stream<F, B>: Future<Output = ()> + Send + 'static, + B: HttpBody, +{ + fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { + self.execute(fut) + } +} + +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec +where + NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static, + S: HttpService<Body>, + W: Watcher<I, S, E>, +{ + fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { + self.execute(fut) + } +} + +// ==== impl Executor ===== + +#[cfg(feature = "server")] +impl<E, F, B> ConnStreamExec<F, B> for E +where + E: Executor<H2Stream<F, B>> + Clone, + H2Stream<F, B>: Future<Output = ()>, + B: HttpBody, +{ + fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { + self.execute(fut) + } +} + +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E +where + E: Executor<NewSvcTask<I, N, S, E, W>> + Clone, + NewSvcTask<I, N, S, E, W>: Future<Output = ()>, + S: HttpService<Body>, + W: Watcher<I, S, E>, +{ + fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { + self.execute(fut) + } +} + +// If http2 is not enable, we just have a stub here, so that the trait bounds +// that *would* have been needed are still checked. Why? +// +// Because enabling `http2` shouldn't suddenly add new trait bounds that cause +// a compilation error. +#[cfg(not(feature = "http2"))] +#[allow(missing_debug_implementations)] +pub struct H2Stream<F, B>(std::marker::PhantomData<(F, B)>); + +#[cfg(not(feature = "http2"))] +impl<F, B, E> Future for H2Stream<F, B> +where + F: Future<Output = Result<http::Response<B>, E>>, + B: crate::body::HttpBody, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + E: Into<Box<dyn std::error::Error + Send + Sync>>, +{ + type Output = (); + + fn poll( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Self::Output> { + unreachable!() + } +} diff --git a/third_party/rust/hyper/src/common/io/mod.rs b/third_party/rust/hyper/src/common/io/mod.rs new file mode 100644 index 0000000000..2e6d506153 --- /dev/null +++ b/third_party/rust/hyper/src/common/io/mod.rs @@ -0,0 +1,3 @@ +mod rewind; + +pub(crate) use self::rewind::Rewind; diff --git a/third_party/rust/hyper/src/common/io/rewind.rs b/third_party/rust/hyper/src/common/io/rewind.rs new file mode 100644 index 0000000000..0afef5f7ea --- /dev/null +++ b/third_party/rust/hyper/src/common/io/rewind.rs @@ -0,0 +1,155 @@ +use std::marker::Unpin; +use std::{cmp, io}; + +use bytes::{Buf, Bytes}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use crate::common::{task, Pin, Poll}; + +/// Combine a buffer with an IO, rewinding reads to use the buffer. +#[derive(Debug)] +pub(crate) struct Rewind<T> { + pre: Option<Bytes>, + inner: T, +} + +impl<T> Rewind<T> { + #[cfg(any(all(feature = "http2", feature = "server"), test))] + pub(crate) fn new(io: T) -> Self { + Rewind { + pre: None, + inner: io, + } + } + + pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { + Rewind { + pre: Some(buf), + inner: io, + } + } + + #[cfg(any(all(feature = "http1", feature = "http2", feature = "server"), test))] + pub(crate) fn rewind(&mut self, bs: Bytes) { + debug_assert!(self.pre.is_none()); + self.pre = Some(bs); + } + + pub(crate) fn into_inner(self) -> (T, Bytes) { + (self.inner, self.pre.unwrap_or_else(Bytes::new)) + } + + // pub(crate) fn get_mut(&mut self) -> &mut T { + // &mut self.inner + // } +} + +impl<T> AsyncRead for Rewind<T> +where + T: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + if let Some(mut prefix) = self.pre.take() { + // If there are no remaining bytes, let the bytes get dropped. + if !prefix.is_empty() { + let copy_len = cmp::min(prefix.len(), buf.remaining()); + // TODO: There should be a way to do following two lines cleaner... + buf.put_slice(&prefix[..copy_len]); + prefix.advance(copy_len); + // Put back what's left + if !prefix.is_empty() { + self.pre = Some(prefix); + } + + return Poll::Ready(Ok(())); + } + } + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl<T> AsyncWrite for Rewind<T> +where + T: AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +#[cfg(test)] +mod tests { + // FIXME: re-implement tests with `async/await`, this import should + // trigger a warning to remind us + use super::Rewind; + use bytes::Bytes; + use tokio::io::AsyncReadExt; + + #[tokio::test] + async fn partial_rewind() { + let underlying = [104, 101, 108, 108, 111]; + + let mock = tokio_test::io::Builder::new().read(&underlying).build(); + + let mut stream = Rewind::new(mock); + + // Read off some bytes, ensure we filled o1 + let mut buf = [0; 2]; + stream.read_exact(&mut buf).await.expect("read1"); + + // Rewind the stream so that it is as if we never read in the first place. + stream.rewind(Bytes::copy_from_slice(&buf[..])); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + + // At this point we should have read everything that was in the MockStream + assert_eq!(&buf, &underlying); + } + + #[tokio::test] + async fn full_rewind() { + let underlying = [104, 101, 108, 108, 111]; + + let mock = tokio_test::io::Builder::new().read(&underlying).build(); + + let mut stream = Rewind::new(mock); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + + // Rewind the stream so that it is as if we never read in the first place. + stream.rewind(Bytes::copy_from_slice(&buf[..])); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + } +} diff --git a/third_party/rust/hyper/src/common/lazy.rs b/third_party/rust/hyper/src/common/lazy.rs new file mode 100644 index 0000000000..2722077303 --- /dev/null +++ b/third_party/rust/hyper/src/common/lazy.rs @@ -0,0 +1,76 @@ +use pin_project_lite::pin_project; + +use super::{task, Future, Pin, Poll}; + +pub(crate) trait Started: Future { + fn started(&self) -> bool; +} + +pub(crate) fn lazy<F, R>(func: F) -> Lazy<F, R> +where + F: FnOnce() -> R, + R: Future + Unpin, +{ + Lazy { + inner: Inner::Init { func }, + } +} + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +pin_project! { + #[allow(missing_debug_implementations)] + pub(crate) struct Lazy<F, R> { + #[pin] + inner: Inner<F, R>, + } +} + +pin_project! { + #[project = InnerProj] + #[project_replace = InnerProjReplace] + enum Inner<F, R> { + Init { func: F }, + Fut { #[pin] fut: R }, + Empty, + } +} + +impl<F, R> Started for Lazy<F, R> +where + F: FnOnce() -> R, + R: Future, +{ + fn started(&self) -> bool { + match self.inner { + Inner::Init { .. } => false, + Inner::Fut { .. } | Inner::Empty => true, + } + } +} + +impl<F, R> Future for Lazy<F, R> +where + F: FnOnce() -> R, + R: Future, +{ + type Output = R::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + if let InnerProj::Fut { fut } = this.inner.as_mut().project() { + return fut.poll(cx); + } + + match this.inner.as_mut().project_replace(Inner::Empty) { + InnerProjReplace::Init { func } => { + this.inner.set(Inner::Fut { fut: func() }); + if let InnerProj::Fut { fut } = this.inner.project() { + return fut.poll(cx); + } + unreachable!() + } + _ => unreachable!("lazy state wrong"), + } + } +} diff --git a/third_party/rust/hyper/src/common/mod.rs b/third_party/rust/hyper/src/common/mod.rs new file mode 100644 index 0000000000..e38c6f5c7a --- /dev/null +++ b/third_party/rust/hyper/src/common/mod.rs @@ -0,0 +1,39 @@ +macro_rules! ready { + ($e:expr) => { + match $e { + std::task::Poll::Ready(v) => v, + std::task::Poll::Pending => return std::task::Poll::Pending, + } + }; +} + +pub(crate) mod buf; +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +pub(crate) mod date; +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +pub(crate) mod drain; +#[cfg(any(feature = "http1", feature = "http2", feature = "server"))] +pub(crate) mod exec; +pub(crate) mod io; +#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] +mod lazy; +mod never; +#[cfg(any( + feature = "stream", + all(feature = "client", any(feature = "http1", feature = "http2")) +))] +pub(crate) mod sync_wrapper; +pub(crate) mod task; +pub(crate) mod watch; + +#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] +pub(crate) use self::lazy::{lazy, Started as Lazy}; +#[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))] +pub(crate) use self::never::Never; +pub(crate) use self::task::Poll; + +// group up types normally needed for `Future` +cfg_proto! { + pub(crate) use std::marker::Unpin; +} +pub(crate) use std::{future::Future, pin::Pin}; diff --git a/third_party/rust/hyper/src/common/never.rs b/third_party/rust/hyper/src/common/never.rs new file mode 100644 index 0000000000..f143caf60f --- /dev/null +++ b/third_party/rust/hyper/src/common/never.rs @@ -0,0 +1,21 @@ +//! An uninhabitable type meaning it can never happen. +//! +//! To be replaced with `!` once it is stable. + +use std::error::Error; +use std::fmt; + +#[derive(Debug)] +pub(crate) enum Never {} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self {} + } +} + +impl Error for Never { + fn description(&self) -> &str { + match *self {} + } +} diff --git a/third_party/rust/hyper/src/common/sync_wrapper.rs b/third_party/rust/hyper/src/common/sync_wrapper.rs new file mode 100644 index 0000000000..704d1a6712 --- /dev/null +++ b/third_party/rust/hyper/src/common/sync_wrapper.rs @@ -0,0 +1,110 @@ +/* + * This is a copy of the sync_wrapper crate. + */ + +/// A mutual exclusion primitive that relies on static type information only +/// +/// In some cases synchronization can be proven statically: whenever you hold an exclusive `&mut` +/// reference, the Rust type system ensures that no other part of the program can hold another +/// reference to the data. Therefore it is safe to access it even if the current thread obtained +/// this reference via a channel. Whenever this is the case, the overhead of allocating and locking +/// a [`Mutex`] can be avoided by using this static version. +/// +/// One example where this is often applicable is [`Future`], which requires an exclusive reference +/// for its [`poll`] method: While a given `Future` implementation may not be safe to access by +/// multiple threads concurrently, the executor can only run the `Future` on one thread at any +/// given time, making it [`Sync`] in practice as long as the implementation is `Send`. You can +/// therefore use the sync wrapper to prove that your data structure is `Sync` even though it +/// contains such a `Future`. +/// +/// # Example +/// +/// ```ignore +/// use hyper::common::sync_wrapper::SyncWrapper; +/// use std::future::Future; +/// +/// struct MyThing { +/// future: SyncWrapper<Box<dyn Future<Output = String> + Send>>, +/// } +/// +/// impl MyThing { +/// // all accesses to `self.future` now require an exclusive reference or ownership +/// } +/// +/// fn assert_sync<T: Sync>() {} +/// +/// assert_sync::<MyThing>(); +/// ``` +/// +/// [`Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html +/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html +/// [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#method.poll +/// [`Sync`]: https://doc.rust-lang.org/std/marker/trait.Sync.html +#[repr(transparent)] +pub(crate) struct SyncWrapper<T>(T); + +impl<T> SyncWrapper<T> { + /// Creates a new SyncWrapper containing the given value. + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let wrapped = SyncWrapper::new(42); + /// ``` + pub(crate) fn new(value: T) -> Self { + Self(value) + } + + /// Acquires a reference to the protected value. + /// + /// This is safe because it requires an exclusive reference to the wrapper. Therefore this method + /// neither panics nor does it return an error. This is in contrast to [`Mutex::get_mut`] which + /// returns an error if another thread panicked while holding the lock. It is not recommended + /// to send an exclusive reference to a potentially damaged value to another thread for further + /// processing. + /// + /// [`Mutex::get_mut`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.get_mut + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let mut wrapped = SyncWrapper::new(42); + /// let value = wrapped.get_mut(); + /// *value = 0; + /// assert_eq!(*wrapped.get_mut(), 0); + /// ``` + pub(crate) fn get_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Consumes this wrapper, returning the underlying data. + /// + /// This is safe because it requires ownership of the wrapper, aherefore this method will neither + /// panic nor does it return an error. This is in contrast to [`Mutex::into_inner`] which + /// returns an error if another thread panicked while holding the lock. It is not recommended + /// to send an exclusive reference to a potentially damaged value to another thread for further + /// processing. + /// + /// [`Mutex::into_inner`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.into_inner + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let mut wrapped = SyncWrapper::new(42); + /// assert_eq!(wrapped.into_inner(), 42); + /// ``` + #[allow(dead_code)] + pub(crate) fn into_inner(self) -> T { + self.0 + } +} + +// this is safe because the only operations permitted on this data structure require exclusive +// access or ownership +unsafe impl<T: Send> Sync for SyncWrapper<T> {} diff --git a/third_party/rust/hyper/src/common/task.rs b/third_party/rust/hyper/src/common/task.rs new file mode 100644 index 0000000000..ec70c957d6 --- /dev/null +++ b/third_party/rust/hyper/src/common/task.rs @@ -0,0 +1,12 @@ +#[cfg(feature = "http1")] +use super::Never; +pub(crate) use std::task::{Context, Poll}; + +/// A function to help "yield" a future, such that it is re-scheduled immediately. +/// +/// Useful for spin counts, so a future doesn't hog too much time. +#[cfg(feature = "http1")] +pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<Never> { + cx.waker().wake_by_ref(); + Poll::Pending +} diff --git a/third_party/rust/hyper/src/common/watch.rs b/third_party/rust/hyper/src/common/watch.rs new file mode 100644 index 0000000000..ba17d551cb --- /dev/null +++ b/third_party/rust/hyper/src/common/watch.rs @@ -0,0 +1,73 @@ +//! An SPSC broadcast channel. +//! +//! - The value can only be a `usize`. +//! - The consumer is only notified if the value is different. +//! - The value `0` is reserved for closed. + +use futures_util::task::AtomicWaker; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use std::task; + +type Value = usize; + +pub(crate) const CLOSED: usize = 0; + +pub(crate) fn channel(initial: Value) -> (Sender, Receiver) { + debug_assert!( + initial != CLOSED, + "watch::channel initial state of 0 is reserved" + ); + + let shared = Arc::new(Shared { + value: AtomicUsize::new(initial), + waker: AtomicWaker::new(), + }); + + ( + Sender { + shared: shared.clone(), + }, + Receiver { shared }, + ) +} + +pub(crate) struct Sender { + shared: Arc<Shared>, +} + +pub(crate) struct Receiver { + shared: Arc<Shared>, +} + +struct Shared { + value: AtomicUsize, + waker: AtomicWaker, +} + +impl Sender { + pub(crate) fn send(&mut self, value: Value) { + if self.shared.value.swap(value, Ordering::SeqCst) != value { + self.shared.waker.wake(); + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.send(CLOSED); + } +} + +impl Receiver { + pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value { + self.shared.waker.register(cx.waker()); + self.shared.value.load(Ordering::SeqCst) + } + + pub(crate) fn peek(&self) -> Value { + self.shared.value.load(Ordering::Relaxed) + } +} |