diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/hyper/src/body | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/hyper/src/body')
-rw-r--r-- | third_party/rust/hyper/src/body/aggregate.rs | 31 | ||||
-rw-r--r-- | third_party/rust/hyper/src/body/body.rs | 785 | ||||
-rw-r--r-- | third_party/rust/hyper/src/body/length.rs | 123 | ||||
-rw-r--r-- | third_party/rust/hyper/src/body/mod.rs | 65 | ||||
-rw-r--r-- | third_party/rust/hyper/src/body/to_bytes.rs | 77 |
5 files changed, 1081 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/body/aggregate.rs b/third_party/rust/hyper/src/body/aggregate.rs new file mode 100644 index 0000000000..99662419d3 --- /dev/null +++ b/third_party/rust/hyper/src/body/aggregate.rs @@ -0,0 +1,31 @@ +use bytes::Buf; + +use super::HttpBody; +use crate::common::buf::BufList; + +/// Aggregate the data buffers from a body asynchronously. +/// +/// The returned `impl Buf` groups the `Buf`s from the `HttpBody` without +/// copying them. This is ideal if you don't require a contiguous buffer. +/// +/// # Note +/// +/// Care needs to be taken if the remote is untrusted. The function doesn't implement any length +/// checks and an malicious peer might make it consume arbitrary amounts of memory. Checking the +/// `Content-Length` is a possibility, but it is not strictly mandated to be present. +pub async fn aggregate<T>(body: T) -> Result<impl Buf, T::Error> +where + T: HttpBody, +{ + let mut bufs = BufList::new(); + + futures_util::pin_mut!(body); + while let Some(buf) = body.data().await { + let buf = buf?; + if buf.has_remaining() { + bufs.push(buf); + } + } + + Ok(bufs) +} diff --git a/third_party/rust/hyper/src/body/body.rs b/third_party/rust/hyper/src/body/body.rs new file mode 100644 index 0000000000..9dc1a034f9 --- /dev/null +++ b/third_party/rust/hyper/src/body/body.rs @@ -0,0 +1,785 @@ +use std::borrow::Cow; +#[cfg(feature = "stream")] +use std::error::Error as StdError; +use std::fmt; + +use bytes::Bytes; +use futures_channel::mpsc; +use futures_channel::oneshot; +use futures_core::Stream; // for mpsc::Receiver +#[cfg(feature = "stream")] +use futures_util::TryStreamExt; +use http::HeaderMap; +use http_body::{Body as HttpBody, SizeHint}; + +use super::DecodedLength; +#[cfg(feature = "stream")] +use crate::common::sync_wrapper::SyncWrapper; +use crate::common::Future; +#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] +use crate::common::Never; +use crate::common::{task, watch, Pin, Poll}; +#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] +use crate::proto::h2::ping; + +type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; +type TrailersSender = oneshot::Sender<HeaderMap>; + +/// A stream of `Bytes`, used when receiving bodies. +/// +/// A good default [`HttpBody`](crate::body::HttpBody) to use in many +/// applications. +/// +/// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes) +/// or [`body::aggregate`](crate::body::aggregate). +#[must_use = "streams do nothing unless polled"] +pub struct Body { + kind: Kind, + /// Keep the extra bits in an `Option<Box<Extra>>`, so that + /// Body stays small in the common case (no extras needed). + extra: Option<Box<Extra>>, +} + +enum Kind { + Once(Option<Bytes>), + Chan { + content_length: DecodedLength, + want_tx: watch::Sender, + data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>, + trailers_rx: oneshot::Receiver<HeaderMap>, + }, + #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] + H2 { + ping: ping::Recorder, + content_length: DecodedLength, + recv: h2::RecvStream, + }, + #[cfg(feature = "ffi")] + Ffi(crate::ffi::UserBody), + #[cfg(feature = "stream")] + Wrapped( + SyncWrapper< + Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>, + >, + ), +} + +struct Extra { + /// Allow the client to pass a future to delay the `Body` from returning + /// EOF. This allows the `Client` to try to put the idle connection + /// back into the pool before the body is "finished". + /// + /// The reason for this is so that creating a new request after finishing + /// streaming the body of a response could sometimes result in creating + /// a brand new connection, since the pool didn't know about the idle + /// connection yet. + delayed_eof: Option<DelayEof>, +} + +#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] +type DelayEofUntil = oneshot::Receiver<Never>; + +enum DelayEof { + /// Initial state, stream hasn't seen EOF yet. + #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(feature = "client")] + NotEof(DelayEofUntil), + /// Transitions to this state once we've seen `poll` try to + /// return EOF (`None`). This future is then polled, and + /// when it completes, the Body finally returns EOF (`None`). + #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(feature = "client")] + Eof(DelayEofUntil), +} + +/// A sender half created through [`Body::channel()`]. +/// +/// Useful when wanting to stream chunks from another thread. +/// +/// ## Body Closing +/// +/// Note that the request body will always be closed normally when the sender is dropped (meaning +/// that the empty terminating chunk will be sent to the remote). If you desire to close the +/// connection with an incomplete response (e.g. in the case of an error during asynchronous +/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. +/// +/// [`Body::channel()`]: struct.Body.html#method.channel +/// [`Sender::abort()`]: struct.Sender.html#method.abort +#[must_use = "Sender does nothing unless sent on"] +pub struct Sender { + want_rx: watch::Receiver, + data_tx: BodySender, + trailers_tx: Option<TrailersSender>, +} + +const WANT_PENDING: usize = 1; +const WANT_READY: usize = 2; + +impl Body { + /// Create an empty `Body` stream. + /// + /// # Example + /// + /// ``` + /// use hyper::{Body, Request}; + /// + /// // create a `GET /` request + /// let get = Request::new(Body::empty()); + /// ``` + #[inline] + pub fn empty() -> Body { + Body::new(Kind::Once(None)) + } + + /// Create a `Body` stream with an associated sender half. + /// + /// Useful when wanting to stream chunks from another thread. + #[inline] + pub fn channel() -> (Sender, Body) { + Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) + } + + pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { + let (data_tx, data_rx) = mpsc::channel(0); + let (trailers_tx, trailers_rx) = oneshot::channel(); + + // If wanter is true, `Sender::poll_ready()` won't becoming ready + // until the `Body` has been polled for data once. + let want = if wanter { WANT_PENDING } else { WANT_READY }; + + let (want_tx, want_rx) = watch::channel(want); + + let tx = Sender { + want_rx, + data_tx, + trailers_tx: Some(trailers_tx), + }; + let rx = Body::new(Kind::Chan { + content_length, + want_tx, + data_rx, + trailers_rx, + }); + + (tx, rx) + } + + /// Wrap a futures `Stream` in a box inside `Body`. + /// + /// # Example + /// + /// ``` + /// # use hyper::Body; + /// let chunks: Vec<Result<_, std::io::Error>> = vec![ + /// Ok("hello"), + /// Ok(" "), + /// Ok("world"), + /// ]; + /// + /// let stream = futures_util::stream::iter(chunks); + /// + /// let body = Body::wrap_stream(stream); + /// ``` + /// + /// # Optional + /// + /// This function requires enabling the `stream` feature in your + /// `Cargo.toml`. + #[cfg(feature = "stream")] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + pub fn wrap_stream<S, O, E>(stream: S) -> Body + where + S: Stream<Item = Result<O, E>> + Send + 'static, + O: Into<Bytes> + 'static, + E: Into<Box<dyn StdError + Send + Sync>> + 'static, + { + let mapped = stream.map_ok(Into::into).map_err(Into::into); + Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) + } + + fn new(kind: Kind) -> Body { + Body { kind, extra: None } + } + + #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] + pub(crate) fn h2( + recv: h2::RecvStream, + mut content_length: DecodedLength, + ping: ping::Recorder, + ) -> Self { + // If the stream is already EOS, then the "unknown length" is clearly + // actually ZERO. + if !content_length.is_exact() && recv.is_end_stream() { + content_length = DecodedLength::ZERO; + } + let body = Body::new(Kind::H2 { + ping, + content_length, + recv, + }); + + body + } + + #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(feature = "client")] + pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { + self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); + } + + fn take_delayed_eof(&mut self) -> Option<DelayEof> { + self.extra + .as_mut() + .and_then(|extra| extra.delayed_eof.take()) + } + + #[cfg(any(feature = "http1", feature = "http2"))] + fn extra_mut(&mut self) -> &mut Extra { + self.extra + .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) + } + + fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { + match self.take_delayed_eof() { + #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(feature = "client")] + Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { + ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { + self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); + ok + } + Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) { + Poll::Ready(Ok(never)) => match never {}, + Poll::Pending => { + self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); + Poll::Pending + } + Poll::Ready(Err(_done)) => Poll::Ready(None), + }, + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + }, + #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(feature = "client")] + Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { + Poll::Ready(Ok(never)) => match never {}, + Poll::Pending => { + self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); + Poll::Pending + } + Poll::Ready(Err(_done)) => Poll::Ready(None), + }, + #[cfg(any( + not(any(feature = "http1", feature = "http2")), + not(feature = "client") + ))] + Some(delay_eof) => match delay_eof {}, + None => self.poll_inner(cx), + } + } + + #[cfg(feature = "ffi")] + pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { + match self.kind { + Kind::Ffi(ref mut body) => return body, + _ => { + self.kind = Kind::Ffi(crate::ffi::UserBody::new()); + } + } + + match self.kind { + Kind::Ffi(ref mut body) => body, + _ => unreachable!(), + } + } + + fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { + match self.kind { + Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), + Kind::Chan { + content_length: ref mut len, + ref mut data_rx, + ref mut want_tx, + .. + } => { + want_tx.send(WANT_READY); + + match ready!(Pin::new(data_rx).poll_next(cx)?) { + Some(chunk) => { + len.sub_if(chunk.len() as u64); + Poll::Ready(Some(Ok(chunk))) + } + None => Poll::Ready(None), + } + } + #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] + Kind::H2 { + ref ping, + recv: ref mut h2, + content_length: ref mut len, + } => match ready!(h2.poll_data(cx)) { + Some(Ok(bytes)) => { + let _ = h2.flow_control().release_capacity(bytes.len()); + len.sub_if(bytes.len() as u64); + ping.record_data(bytes.len()); + Poll::Ready(Some(Ok(bytes))) + } + Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), + None => Poll::Ready(None), + }, + + #[cfg(feature = "ffi")] + Kind::Ffi(ref mut body) => body.poll_data(cx), + + #[cfg(feature = "stream")] + Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { + Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), + None => Poll::Ready(None), + }, + } + } + + #[cfg(feature = "http1")] + pub(super) fn take_full_data(&mut self) -> Option<Bytes> { + if let Kind::Once(ref mut chunk) = self.kind { + chunk.take() + } else { + None + } + } +} + +impl Default for Body { + /// Returns `Body::empty()`. + #[inline] + fn default() -> Body { + Body::empty() + } +} + +impl HttpBody for Body { + type Data = Bytes; + type Error = crate::Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll<Option<Result<Self::Data, Self::Error>>> { + self.poll_eof(cx) + } + + fn poll_trailers( + #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, + #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, + ) -> Poll<Result<Option<HeaderMap>, Self::Error>> { + match self.kind { + #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] + Kind::H2 { + recv: ref mut h2, + ref ping, + .. + } => match ready!(h2.poll_trailers(cx)) { + Ok(t) => { + ping.record_non_data(); + Poll::Ready(Ok(t)) + } + Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), + }, + Kind::Chan { + ref mut trailers_rx, + .. + } => match ready!(Pin::new(trailers_rx).poll(cx)) { + Ok(t) => Poll::Ready(Ok(Some(t))), + Err(_) => Poll::Ready(Ok(None)), + }, + #[cfg(feature = "ffi")] + Kind::Ffi(ref mut body) => body.poll_trailers(cx), + _ => Poll::Ready(Ok(None)), + } + } + + fn is_end_stream(&self) -> bool { + match self.kind { + Kind::Once(ref val) => val.is_none(), + Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, + #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] + Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), + #[cfg(feature = "ffi")] + Kind::Ffi(..) => false, + #[cfg(feature = "stream")] + Kind::Wrapped(..) => false, + } + } + + fn size_hint(&self) -> SizeHint { + macro_rules! opt_len { + ($content_length:expr) => {{ + let mut hint = SizeHint::default(); + + if let Some(content_length) = $content_length.into_opt() { + hint.set_exact(content_length); + } + + hint + }}; + } + + match self.kind { + Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), + Kind::Once(None) => SizeHint::with_exact(0), + #[cfg(feature = "stream")] + Kind::Wrapped(..) => SizeHint::default(), + Kind::Chan { content_length, .. } => opt_len!(content_length), + #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] + Kind::H2 { content_length, .. } => opt_len!(content_length), + #[cfg(feature = "ffi")] + Kind::Ffi(..) => SizeHint::default(), + } + } +} + +impl fmt::Debug for Body { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + #[derive(Debug)] + struct Streaming; + #[derive(Debug)] + struct Empty; + #[derive(Debug)] + struct Full<'a>(&'a Bytes); + + let mut builder = f.debug_tuple("Body"); + match self.kind { + Kind::Once(None) => builder.field(&Empty), + Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)), + _ => builder.field(&Streaming), + }; + + builder.finish() + } +} + +/// # Optional +/// +/// This function requires enabling the `stream` feature in your +/// `Cargo.toml`. +#[cfg(feature = "stream")] +impl Stream for Body { + type Item = crate::Result<Bytes>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + HttpBody::poll_data(self, cx) + } +} + +/// # Optional +/// +/// This function requires enabling the `stream` feature in your +/// `Cargo.toml`. +#[cfg(feature = "stream")] +impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body { + #[inline] + fn from( + stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, + ) -> Body { + Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) + } +} + +impl From<Bytes> for Body { + #[inline] + fn from(chunk: Bytes) -> Body { + if chunk.is_empty() { + Body::empty() + } else { + Body::new(Kind::Once(Some(chunk))) + } + } +} + +impl From<Vec<u8>> for Body { + #[inline] + fn from(vec: Vec<u8>) -> Body { + Body::from(Bytes::from(vec)) + } +} + +impl From<&'static [u8]> for Body { + #[inline] + fn from(slice: &'static [u8]) -> Body { + Body::from(Bytes::from(slice)) + } +} + +impl From<Cow<'static, [u8]>> for Body { + #[inline] + fn from(cow: Cow<'static, [u8]>) -> Body { + match cow { + Cow::Borrowed(b) => Body::from(b), + Cow::Owned(o) => Body::from(o), + } + } +} + +impl From<String> for Body { + #[inline] + fn from(s: String) -> Body { + Body::from(Bytes::from(s.into_bytes())) + } +} + +impl From<&'static str> for Body { + #[inline] + fn from(slice: &'static str) -> Body { + Body::from(Bytes::from(slice.as_bytes())) + } +} + +impl From<Cow<'static, str>> for Body { + #[inline] + fn from(cow: Cow<'static, str>) -> Body { + match cow { + Cow::Borrowed(b) => Body::from(b), + Cow::Owned(o) => Body::from(o), + } + } +} + +impl Sender { + /// Check to see if this `Sender` can send more data. + pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + // Check if the receiver end has tried polling for the body yet + ready!(self.poll_want(cx)?); + self.data_tx + .poll_ready(cx) + .map_err(|_| crate::Error::new_closed()) + } + + fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + match self.want_rx.load(cx) { + WANT_READY => Poll::Ready(Ok(())), + WANT_PENDING => Poll::Pending, + watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), + unexpected => unreachable!("want_rx value: {}", unexpected), + } + } + + async fn ready(&mut self) -> crate::Result<()> { + futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + } + + /// Send data on data channel when it is ready. + pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { + self.ready().await?; + self.data_tx + .try_send(Ok(chunk)) + .map_err(|_| crate::Error::new_closed()) + } + + /// Send trailers on trailers channel. + pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { + let tx = match self.trailers_tx.take() { + Some(tx) => tx, + None => return Err(crate::Error::new_closed()), + }; + tx.send(trailers).map_err(|_| crate::Error::new_closed()) + } + + /// Try to send data on this channel. + /// + /// # Errors + /// + /// Returns `Err(Bytes)` if the channel could not (currently) accept + /// another `Bytes`. + /// + /// # Note + /// + /// This is mostly useful for when trying to send from some other thread + /// that doesn't have an async context. If in an async context, prefer + /// `send_data()` instead. + pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { + self.data_tx + .try_send(Ok(chunk)) + .map_err(|err| err.into_inner().expect("just sent Ok")) + } + + /// Aborts the body in an abnormal fashion. + pub fn abort(self) { + let _ = self + .data_tx + // clone so the send works even if buffer is full + .clone() + .try_send(Err(crate::Error::new_body_write_aborted())); + } + + #[cfg(feature = "http1")] + pub(crate) fn send_error(&mut self, err: crate::Error) { + let _ = self.data_tx.try_send(Err(err)); + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + #[derive(Debug)] + struct Open; + #[derive(Debug)] + struct Closed; + + let mut builder = f.debug_tuple("Sender"); + match self.want_rx.peek() { + watch::CLOSED => builder.field(&Closed), + _ => builder.field(&Open), + }; + + builder.finish() + } +} + +#[cfg(test)] +mod tests { + use std::mem; + use std::task::Poll; + + use super::{Body, DecodedLength, HttpBody, Sender, SizeHint}; + + #[test] + fn test_size_of() { + // These are mostly to help catch *accidentally* increasing + // the size by too much. + + let body_size = mem::size_of::<Body>(); + let body_expected_size = mem::size_of::<u64>() * 6; + assert!( + body_size <= body_expected_size, + "Body size = {} <= {}", + body_size, + body_expected_size, + ); + + assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>"); + + assert_eq!( + mem::size_of::<Sender>(), + mem::size_of::<usize>() * 5, + "Sender" + ); + + assert_eq!( + mem::size_of::<Sender>(), + mem::size_of::<Option<Sender>>(), + "Option<Sender>" + ); + } + + #[test] + fn size_hint() { + fn eq(body: Body, b: SizeHint, note: &str) { + let a = body.size_hint(); + assert_eq!(a.lower(), b.lower(), "lower for {:?}", note); + assert_eq!(a.upper(), b.upper(), "upper for {:?}", note); + } + + eq(Body::from("Hello"), SizeHint::with_exact(5), "from str"); + + eq(Body::empty(), SizeHint::with_exact(0), "empty"); + + eq(Body::channel().1, SizeHint::new(), "channel"); + + eq( + Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, + SizeHint::with_exact(4), + "channel with length", + ); + } + + #[tokio::test] + async fn channel_abort() { + let (tx, mut rx) = Body::channel(); + + tx.abort(); + + let err = rx.data().await.unwrap().unwrap_err(); + assert!(err.is_body_write_aborted(), "{:?}", err); + } + + #[tokio::test] + async fn channel_abort_when_buffer_is_full() { + let (mut tx, mut rx) = Body::channel(); + + tx.try_send_data("chunk 1".into()).expect("send 1"); + // buffer is full, but can still send abort + tx.abort(); + + let chunk1 = rx.data().await.expect("item 1").expect("chunk 1"); + assert_eq!(chunk1, "chunk 1"); + + let err = rx.data().await.unwrap().unwrap_err(); + assert!(err.is_body_write_aborted(), "{:?}", err); + } + + #[test] + fn channel_buffers_one() { + let (mut tx, _rx) = Body::channel(); + + tx.try_send_data("chunk 1".into()).expect("send 1"); + + // buffer is now full + let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2"); + assert_eq!(chunk2, "chunk 2"); + } + + #[tokio::test] + async fn channel_empty() { + let (_, mut rx) = Body::channel(); + + assert!(rx.data().await.is_none()); + } + + #[test] + fn channel_ready() { + let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); + + let mut tx_ready = tokio_test::task::spawn(tx.ready()); + + assert!(tx_ready.poll().is_ready(), "tx is ready immediately"); + } + + #[test] + fn channel_wanter() { + let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); + + let mut tx_ready = tokio_test::task::spawn(tx.ready()); + let mut rx_data = tokio_test::task::spawn(rx.data()); + + assert!( + tx_ready.poll().is_pending(), + "tx isn't ready before rx has been polled" + ); + + assert!(rx_data.poll().is_pending(), "poll rx.data"); + assert!(tx_ready.is_woken(), "rx poll wakes tx"); + + assert!( + tx_ready.poll().is_ready(), + "tx is ready after rx has been polled" + ); + } + + #[test] + fn channel_notices_closure() { + let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); + + let mut tx_ready = tokio_test::task::spawn(tx.ready()); + + assert!( + tx_ready.poll().is_pending(), + "tx isn't ready before rx has been polled" + ); + + drop(rx); + assert!(tx_ready.is_woken(), "dropping rx wakes tx"); + + match tx_ready.poll() { + Poll::Ready(Err(ref e)) if e.is_closed() => (), + unexpected => panic!("tx poll ready unexpected: {:?}", unexpected), + } + } +} diff --git a/third_party/rust/hyper/src/body/length.rs b/third_party/rust/hyper/src/body/length.rs new file mode 100644 index 0000000000..e2bbee8039 --- /dev/null +++ b/third_party/rust/hyper/src/body/length.rs @@ -0,0 +1,123 @@ +use std::fmt; + +#[derive(Clone, Copy, PartialEq, Eq)] +pub(crate) struct DecodedLength(u64); + +#[cfg(any(feature = "http1", feature = "http2"))] +impl From<Option<u64>> for DecodedLength { + fn from(len: Option<u64>) -> Self { + len.and_then(|len| { + // If the length is u64::MAX, oh well, just reported chunked. + Self::checked_new(len).ok() + }) + .unwrap_or(DecodedLength::CHUNKED) + } +} + +#[cfg(any(feature = "http1", feature = "http2", test))] +const MAX_LEN: u64 = std::u64::MAX - 2; + +impl DecodedLength { + pub(crate) const CLOSE_DELIMITED: DecodedLength = DecodedLength(::std::u64::MAX); + pub(crate) const CHUNKED: DecodedLength = DecodedLength(::std::u64::MAX - 1); + pub(crate) const ZERO: DecodedLength = DecodedLength(0); + + #[cfg(test)] + pub(crate) fn new(len: u64) -> Self { + debug_assert!(len <= MAX_LEN); + DecodedLength(len) + } + + /// Takes the length as a content-length without other checks. + /// + /// Should only be called if previously confirmed this isn't + /// CLOSE_DELIMITED or CHUNKED. + #[inline] + #[cfg(feature = "http1")] + pub(crate) fn danger_len(self) -> u64 { + debug_assert!(self.0 < Self::CHUNKED.0); + self.0 + } + + /// Converts to an Option<u64> representing a Known or Unknown length. + pub(crate) fn into_opt(self) -> Option<u64> { + match self { + DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => None, + DecodedLength(known) => Some(known), + } + } + + /// Checks the `u64` is within the maximum allowed for content-length. + #[cfg(any(feature = "http1", feature = "http2"))] + pub(crate) fn checked_new(len: u64) -> Result<Self, crate::error::Parse> { + use tracing::warn; + + if len <= MAX_LEN { + Ok(DecodedLength(len)) + } else { + warn!("content-length bigger than maximum: {} > {}", len, MAX_LEN); + Err(crate::error::Parse::TooLarge) + } + } + + pub(crate) fn sub_if(&mut self, amt: u64) { + match *self { + DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => (), + DecodedLength(ref mut known) => { + *known -= amt; + } + } + } + + /// Returns whether this represents an exact length. + /// + /// This includes 0, which of course is an exact known length. + /// + /// It would return false if "chunked" or otherwise size-unknown. + #[cfg(feature = "http2")] + pub(crate) fn is_exact(&self) -> bool { + self.0 <= MAX_LEN + } +} + +impl fmt::Debug for DecodedLength { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + DecodedLength::CLOSE_DELIMITED => f.write_str("CLOSE_DELIMITED"), + DecodedLength::CHUNKED => f.write_str("CHUNKED"), + DecodedLength(n) => f.debug_tuple("DecodedLength").field(&n).finish(), + } + } +} + +impl fmt::Display for DecodedLength { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + DecodedLength::CLOSE_DELIMITED => f.write_str("close-delimited"), + DecodedLength::CHUNKED => f.write_str("chunked encoding"), + DecodedLength::ZERO => f.write_str("empty"), + DecodedLength(n) => write!(f, "content-length ({} bytes)", n), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sub_if_known() { + let mut len = DecodedLength::new(30); + len.sub_if(20); + + assert_eq!(len.0, 10); + } + + #[test] + fn sub_if_chunked() { + let mut len = DecodedLength::CHUNKED; + len.sub_if(20); + + assert_eq!(len, DecodedLength::CHUNKED); + } +} diff --git a/third_party/rust/hyper/src/body/mod.rs b/third_party/rust/hyper/src/body/mod.rs new file mode 100644 index 0000000000..5e2181e941 --- /dev/null +++ b/third_party/rust/hyper/src/body/mod.rs @@ -0,0 +1,65 @@ +//! Streaming bodies for Requests and Responses +//! +//! For both [Clients](crate::client) and [Servers](crate::server), requests and +//! responses use streaming bodies, instead of complete buffering. This +//! allows applications to not use memory they don't need, and allows exerting +//! back-pressure on connections by only reading when asked. +//! +//! There are two pieces to this in hyper: +//! +//! - **The [`HttpBody`](HttpBody) trait** describes all possible bodies. +//! hyper allows any body type that implements `HttpBody`, allowing +//! applications to have fine-grained control over their streaming. +//! - **The [`Body`](Body) concrete type**, which is an implementation of +//! `HttpBody`, and returned by hyper as a "receive stream" (so, for server +//! requests and client responses). It is also a decent default implementation +//! if you don't have very custom needs of your send streams. + +pub use bytes::{Buf, Bytes}; +pub use http_body::Body as HttpBody; +pub use http_body::SizeHint; + +pub use self::aggregate::aggregate; +pub use self::body::{Body, Sender}; +pub(crate) use self::length::DecodedLength; +pub use self::to_bytes::to_bytes; + +mod aggregate; +mod body; +mod length; +mod to_bytes; + +/// An optimization to try to take a full body if immediately available. +/// +/// This is currently limited to *only* `hyper::Body`s. +#[cfg(feature = "http1")] +pub(crate) fn take_full_data<T: HttpBody + 'static>(body: &mut T) -> Option<T::Data> { + use std::any::{Any, TypeId}; + + // This static type check can be optimized at compile-time. + if TypeId::of::<T>() == TypeId::of::<Body>() { + let mut full = (body as &mut dyn Any) + .downcast_mut::<Body>() + .expect("must be Body") + .take_full_data(); + // This second cast is required to make the type system happy. + // Without it, the compiler cannot reason that the type is actually + // `T::Data`. Oh wells. + // + // It's still a measurable win! + (&mut full as &mut dyn Any) + .downcast_mut::<Option<T::Data>>() + .expect("must be T::Data") + .take() + } else { + None + } +} + +fn _assert_send_sync() { + fn _assert_send<T: Send>() {} + fn _assert_sync<T: Sync>() {} + + _assert_send::<Body>(); + _assert_sync::<Body>(); +} diff --git a/third_party/rust/hyper/src/body/to_bytes.rs b/third_party/rust/hyper/src/body/to_bytes.rs new file mode 100644 index 0000000000..62b15a54a9 --- /dev/null +++ b/third_party/rust/hyper/src/body/to_bytes.rs @@ -0,0 +1,77 @@ +use bytes::{Buf, BufMut, Bytes}; + +use super::HttpBody; + +/// Concatenate the buffers from a body into a single `Bytes` asynchronously. +/// +/// This may require copying the data into a single buffer. If you don't need +/// a contiguous buffer, prefer the [`aggregate`](crate::body::aggregate()) +/// function. +/// +/// # Note +/// +/// Care needs to be taken if the remote is untrusted. The function doesn't implement any length +/// checks and an malicious peer might make it consume arbitrary amounts of memory. Checking the +/// `Content-Length` is a possibility, but it is not strictly mandated to be present. +/// +/// # Example +/// +/// ``` +/// # #[cfg(all(feature = "client", feature = "tcp", any(feature = "http1", feature = "http2")))] +/// # async fn doc() -> hyper::Result<()> { +/// use hyper::{body::HttpBody}; +/// +/// # let request = hyper::Request::builder() +/// # .method(hyper::Method::POST) +/// # .uri("http://httpbin.org/post") +/// # .header("content-type", "application/json") +/// # .body(hyper::Body::from(r#"{"library":"hyper"}"#)).unwrap(); +/// # let client = hyper::Client::new(); +/// let response = client.request(request).await?; +/// +/// const MAX_ALLOWED_RESPONSE_SIZE: u64 = 1024; +/// +/// let response_content_length = match response.body().size_hint().upper() { +/// Some(v) => v, +/// None => MAX_ALLOWED_RESPONSE_SIZE + 1 // Just to protect ourselves from a malicious response +/// }; +/// +/// if response_content_length < MAX_ALLOWED_RESPONSE_SIZE { +/// let body_bytes = hyper::body::to_bytes(response.into_body()).await?; +/// println!("body: {:?}", body_bytes); +/// } +/// +/// # Ok(()) +/// # } +/// ``` +pub async fn to_bytes<T>(body: T) -> Result<Bytes, T::Error> +where + T: HttpBody, +{ + futures_util::pin_mut!(body); + + // If there's only 1 chunk, we can just return Buf::to_bytes() + let mut first = if let Some(buf) = body.data().await { + buf? + } else { + return Ok(Bytes::new()); + }; + + let second = if let Some(buf) = body.data().await { + buf? + } else { + return Ok(first.copy_to_bytes(first.remaining())); + }; + + // With more than 1 buf, we gotta flatten into a Vec first. + let cap = first.remaining() + second.remaining() + body.size_hint().lower() as usize; + let mut vec = Vec::with_capacity(cap); + vec.put(first); + vec.put(second); + + while let Some(buf) = body.data().await { + vec.put(buf?); + } + + Ok(vec.into()) +} |