summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/body
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/body')
-rw-r--r--third_party/rust/hyper/src/body/aggregate.rs31
-rw-r--r--third_party/rust/hyper/src/body/body.rs785
-rw-r--r--third_party/rust/hyper/src/body/length.rs123
-rw-r--r--third_party/rust/hyper/src/body/mod.rs65
-rw-r--r--third_party/rust/hyper/src/body/to_bytes.rs82
5 files changed, 1086 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/body/aggregate.rs b/third_party/rust/hyper/src/body/aggregate.rs
new file mode 100644
index 0000000000..99662419d3
--- /dev/null
+++ b/third_party/rust/hyper/src/body/aggregate.rs
@@ -0,0 +1,31 @@
+use bytes::Buf;
+
+use super::HttpBody;
+use crate::common::buf::BufList;
+
+/// Aggregate the data buffers from a body asynchronously.
+///
+/// The returned `impl Buf` groups the `Buf`s from the `HttpBody` without
+/// copying them. This is ideal if you don't require a contiguous buffer.
+///
+/// # Note
+///
+/// Care needs to be taken if the remote is untrusted. The function doesn't implement any length
+/// checks and an malicious peer might make it consume arbitrary amounts of memory. Checking the
+/// `Content-Length` is a possibility, but it is not strictly mandated to be present.
+pub async fn aggregate<T>(body: T) -> Result<impl Buf, T::Error>
+where
+ T: HttpBody,
+{
+ let mut bufs = BufList::new();
+
+ futures_util::pin_mut!(body);
+ while let Some(buf) = body.data().await {
+ let buf = buf?;
+ if buf.has_remaining() {
+ bufs.push(buf);
+ }
+ }
+
+ Ok(bufs)
+}
diff --git a/third_party/rust/hyper/src/body/body.rs b/third_party/rust/hyper/src/body/body.rs
new file mode 100644
index 0000000000..9dc1a034f9
--- /dev/null
+++ b/third_party/rust/hyper/src/body/body.rs
@@ -0,0 +1,785 @@
+use std::borrow::Cow;
+#[cfg(feature = "stream")]
+use std::error::Error as StdError;
+use std::fmt;
+
+use bytes::Bytes;
+use futures_channel::mpsc;
+use futures_channel::oneshot;
+use futures_core::Stream; // for mpsc::Receiver
+#[cfg(feature = "stream")]
+use futures_util::TryStreamExt;
+use http::HeaderMap;
+use http_body::{Body as HttpBody, SizeHint};
+
+use super::DecodedLength;
+#[cfg(feature = "stream")]
+use crate::common::sync_wrapper::SyncWrapper;
+use crate::common::Future;
+#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
+use crate::common::Never;
+use crate::common::{task, watch, Pin, Poll};
+#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+use crate::proto::h2::ping;
+
+type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
+type TrailersSender = oneshot::Sender<HeaderMap>;
+
+/// A stream of `Bytes`, used when receiving bodies.
+///
+/// A good default [`HttpBody`](crate::body::HttpBody) to use in many
+/// applications.
+///
+/// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes)
+/// or [`body::aggregate`](crate::body::aggregate).
+#[must_use = "streams do nothing unless polled"]
+pub struct Body {
+ kind: Kind,
+ /// Keep the extra bits in an `Option<Box<Extra>>`, so that
+ /// Body stays small in the common case (no extras needed).
+ extra: Option<Box<Extra>>,
+}
+
+enum Kind {
+ Once(Option<Bytes>),
+ Chan {
+ content_length: DecodedLength,
+ want_tx: watch::Sender,
+ data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
+ trailers_rx: oneshot::Receiver<HeaderMap>,
+ },
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ H2 {
+ ping: ping::Recorder,
+ content_length: DecodedLength,
+ recv: h2::RecvStream,
+ },
+ #[cfg(feature = "ffi")]
+ Ffi(crate::ffi::UserBody),
+ #[cfg(feature = "stream")]
+ Wrapped(
+ SyncWrapper<
+ Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
+ >,
+ ),
+}
+
+struct Extra {
+ /// Allow the client to pass a future to delay the `Body` from returning
+ /// EOF. This allows the `Client` to try to put the idle connection
+ /// back into the pool before the body is "finished".
+ ///
+ /// The reason for this is so that creating a new request after finishing
+ /// streaming the body of a response could sometimes result in creating
+ /// a brand new connection, since the pool didn't know about the idle
+ /// connection yet.
+ delayed_eof: Option<DelayEof>,
+}
+
+#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
+type DelayEofUntil = oneshot::Receiver<Never>;
+
+enum DelayEof {
+ /// Initial state, stream hasn't seen EOF yet.
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ #[cfg(feature = "client")]
+ NotEof(DelayEofUntil),
+ /// Transitions to this state once we've seen `poll` try to
+ /// return EOF (`None`). This future is then polled, and
+ /// when it completes, the Body finally returns EOF (`None`).
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ #[cfg(feature = "client")]
+ Eof(DelayEofUntil),
+}
+
+/// A sender half created through [`Body::channel()`].
+///
+/// Useful when wanting to stream chunks from another thread.
+///
+/// ## Body Closing
+///
+/// Note that the request body will always be closed normally when the sender is dropped (meaning
+/// that the empty terminating chunk will be sent to the remote). If you desire to close the
+/// connection with an incomplete response (e.g. in the case of an error during asynchronous
+/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
+///
+/// [`Body::channel()`]: struct.Body.html#method.channel
+/// [`Sender::abort()`]: struct.Sender.html#method.abort
+#[must_use = "Sender does nothing unless sent on"]
+pub struct Sender {
+ want_rx: watch::Receiver,
+ data_tx: BodySender,
+ trailers_tx: Option<TrailersSender>,
+}
+
+const WANT_PENDING: usize = 1;
+const WANT_READY: usize = 2;
+
+impl Body {
+ /// Create an empty `Body` stream.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use hyper::{Body, Request};
+ ///
+ /// // create a `GET /` request
+ /// let get = Request::new(Body::empty());
+ /// ```
+ #[inline]
+ pub fn empty() -> Body {
+ Body::new(Kind::Once(None))
+ }
+
+ /// Create a `Body` stream with an associated sender half.
+ ///
+ /// Useful when wanting to stream chunks from another thread.
+ #[inline]
+ pub fn channel() -> (Sender, Body) {
+ Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
+ }
+
+ pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
+ let (data_tx, data_rx) = mpsc::channel(0);
+ let (trailers_tx, trailers_rx) = oneshot::channel();
+
+ // If wanter is true, `Sender::poll_ready()` won't becoming ready
+ // until the `Body` has been polled for data once.
+ let want = if wanter { WANT_PENDING } else { WANT_READY };
+
+ let (want_tx, want_rx) = watch::channel(want);
+
+ let tx = Sender {
+ want_rx,
+ data_tx,
+ trailers_tx: Some(trailers_tx),
+ };
+ let rx = Body::new(Kind::Chan {
+ content_length,
+ want_tx,
+ data_rx,
+ trailers_rx,
+ });
+
+ (tx, rx)
+ }
+
+ /// Wrap a futures `Stream` in a box inside `Body`.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # use hyper::Body;
+ /// let chunks: Vec<Result<_, std::io::Error>> = vec![
+ /// Ok("hello"),
+ /// Ok(" "),
+ /// Ok("world"),
+ /// ];
+ ///
+ /// let stream = futures_util::stream::iter(chunks);
+ ///
+ /// let body = Body::wrap_stream(stream);
+ /// ```
+ ///
+ /// # Optional
+ ///
+ /// This function requires enabling the `stream` feature in your
+ /// `Cargo.toml`.
+ #[cfg(feature = "stream")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
+ pub fn wrap_stream<S, O, E>(stream: S) -> Body
+ where
+ S: Stream<Item = Result<O, E>> + Send + 'static,
+ O: Into<Bytes> + 'static,
+ E: Into<Box<dyn StdError + Send + Sync>> + 'static,
+ {
+ let mapped = stream.map_ok(Into::into).map_err(Into::into);
+ Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
+ }
+
+ fn new(kind: Kind) -> Body {
+ Body { kind, extra: None }
+ }
+
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ pub(crate) fn h2(
+ recv: h2::RecvStream,
+ mut content_length: DecodedLength,
+ ping: ping::Recorder,
+ ) -> Self {
+ // If the stream is already EOS, then the "unknown length" is clearly
+ // actually ZERO.
+ if !content_length.is_exact() && recv.is_end_stream() {
+ content_length = DecodedLength::ZERO;
+ }
+ let body = Body::new(Kind::H2 {
+ ping,
+ content_length,
+ recv,
+ });
+
+ body
+ }
+
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ #[cfg(feature = "client")]
+ pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
+ self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
+ }
+
+ fn take_delayed_eof(&mut self) -> Option<DelayEof> {
+ self.extra
+ .as_mut()
+ .and_then(|extra| extra.delayed_eof.take())
+ }
+
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ fn extra_mut(&mut self) -> &mut Extra {
+ self.extra
+ .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
+ }
+
+ fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
+ match self.take_delayed_eof() {
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ #[cfg(feature = "client")]
+ Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
+ ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
+ self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
+ ok
+ }
+ Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
+ Poll::Ready(Ok(never)) => match never {},
+ Poll::Pending => {
+ self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
+ Poll::Pending
+ }
+ Poll::Ready(Err(_done)) => Poll::Ready(None),
+ },
+ Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
+ },
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ #[cfg(feature = "client")]
+ Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
+ Poll::Ready(Ok(never)) => match never {},
+ Poll::Pending => {
+ self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
+ Poll::Pending
+ }
+ Poll::Ready(Err(_done)) => Poll::Ready(None),
+ },
+ #[cfg(any(
+ not(any(feature = "http1", feature = "http2")),
+ not(feature = "client")
+ ))]
+ Some(delay_eof) => match delay_eof {},
+ None => self.poll_inner(cx),
+ }
+ }
+
+ #[cfg(feature = "ffi")]
+ pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
+ match self.kind {
+ Kind::Ffi(ref mut body) => return body,
+ _ => {
+ self.kind = Kind::Ffi(crate::ffi::UserBody::new());
+ }
+ }
+
+ match self.kind {
+ Kind::Ffi(ref mut body) => body,
+ _ => unreachable!(),
+ }
+ }
+
+ fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
+ match self.kind {
+ Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
+ Kind::Chan {
+ content_length: ref mut len,
+ ref mut data_rx,
+ ref mut want_tx,
+ ..
+ } => {
+ want_tx.send(WANT_READY);
+
+ match ready!(Pin::new(data_rx).poll_next(cx)?) {
+ Some(chunk) => {
+ len.sub_if(chunk.len() as u64);
+ Poll::Ready(Some(Ok(chunk)))
+ }
+ None => Poll::Ready(None),
+ }
+ }
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ Kind::H2 {
+ ref ping,
+ recv: ref mut h2,
+ content_length: ref mut len,
+ } => match ready!(h2.poll_data(cx)) {
+ Some(Ok(bytes)) => {
+ let _ = h2.flow_control().release_capacity(bytes.len());
+ len.sub_if(bytes.len() as u64);
+ ping.record_data(bytes.len());
+ Poll::Ready(Some(Ok(bytes)))
+ }
+ Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
+ None => Poll::Ready(None),
+ },
+
+ #[cfg(feature = "ffi")]
+ Kind::Ffi(ref mut body) => body.poll_data(cx),
+
+ #[cfg(feature = "stream")]
+ Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
+ Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
+ None => Poll::Ready(None),
+ },
+ }
+ }
+
+ #[cfg(feature = "http1")]
+ pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
+ if let Kind::Once(ref mut chunk) = self.kind {
+ chunk.take()
+ } else {
+ None
+ }
+ }
+}
+
+impl Default for Body {
+ /// Returns `Body::empty()`.
+ #[inline]
+ fn default() -> Body {
+ Body::empty()
+ }
+}
+
+impl HttpBody for Body {
+ type Data = Bytes;
+ type Error = crate::Error;
+
+ fn poll_data(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+ self.poll_eof(cx)
+ }
+
+ fn poll_trailers(
+ #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
+ #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
+ ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+ match self.kind {
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ Kind::H2 {
+ recv: ref mut h2,
+ ref ping,
+ ..
+ } => match ready!(h2.poll_trailers(cx)) {
+ Ok(t) => {
+ ping.record_non_data();
+ Poll::Ready(Ok(t))
+ }
+ Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
+ },
+ Kind::Chan {
+ ref mut trailers_rx,
+ ..
+ } => match ready!(Pin::new(trailers_rx).poll(cx)) {
+ Ok(t) => Poll::Ready(Ok(Some(t))),
+ Err(_) => Poll::Ready(Ok(None)),
+ },
+ #[cfg(feature = "ffi")]
+ Kind::Ffi(ref mut body) => body.poll_trailers(cx),
+ _ => Poll::Ready(Ok(None)),
+ }
+ }
+
+ fn is_end_stream(&self) -> bool {
+ match self.kind {
+ Kind::Once(ref val) => val.is_none(),
+ Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
+ #[cfg(feature = "ffi")]
+ Kind::Ffi(..) => false,
+ #[cfg(feature = "stream")]
+ Kind::Wrapped(..) => false,
+ }
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ macro_rules! opt_len {
+ ($content_length:expr) => {{
+ let mut hint = SizeHint::default();
+
+ if let Some(content_length) = $content_length.into_opt() {
+ hint.set_exact(content_length);
+ }
+
+ hint
+ }};
+ }
+
+ match self.kind {
+ Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
+ Kind::Once(None) => SizeHint::with_exact(0),
+ #[cfg(feature = "stream")]
+ Kind::Wrapped(..) => SizeHint::default(),
+ Kind::Chan { content_length, .. } => opt_len!(content_length),
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ Kind::H2 { content_length, .. } => opt_len!(content_length),
+ #[cfg(feature = "ffi")]
+ Kind::Ffi(..) => SizeHint::default(),
+ }
+ }
+}
+
+impl fmt::Debug for Body {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ #[derive(Debug)]
+ struct Streaming;
+ #[derive(Debug)]
+ struct Empty;
+ #[derive(Debug)]
+ struct Full<'a>(&'a Bytes);
+
+ let mut builder = f.debug_tuple("Body");
+ match self.kind {
+ Kind::Once(None) => builder.field(&Empty),
+ Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
+ _ => builder.field(&Streaming),
+ };
+
+ builder.finish()
+ }
+}
+
+/// # Optional
+///
+/// This function requires enabling the `stream` feature in your
+/// `Cargo.toml`.
+#[cfg(feature = "stream")]
+impl Stream for Body {
+ type Item = crate::Result<Bytes>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ HttpBody::poll_data(self, cx)
+ }
+}
+
+/// # Optional
+///
+/// This function requires enabling the `stream` feature in your
+/// `Cargo.toml`.
+#[cfg(feature = "stream")]
+impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
+ #[inline]
+ fn from(
+ stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
+ ) -> Body {
+ Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
+ }
+}
+
+impl From<Bytes> for Body {
+ #[inline]
+ fn from(chunk: Bytes) -> Body {
+ if chunk.is_empty() {
+ Body::empty()
+ } else {
+ Body::new(Kind::Once(Some(chunk)))
+ }
+ }
+}
+
+impl From<Vec<u8>> for Body {
+ #[inline]
+ fn from(vec: Vec<u8>) -> Body {
+ Body::from(Bytes::from(vec))
+ }
+}
+
+impl From<&'static [u8]> for Body {
+ #[inline]
+ fn from(slice: &'static [u8]) -> Body {
+ Body::from(Bytes::from(slice))
+ }
+}
+
+impl From<Cow<'static, [u8]>> for Body {
+ #[inline]
+ fn from(cow: Cow<'static, [u8]>) -> Body {
+ match cow {
+ Cow::Borrowed(b) => Body::from(b),
+ Cow::Owned(o) => Body::from(o),
+ }
+ }
+}
+
+impl From<String> for Body {
+ #[inline]
+ fn from(s: String) -> Body {
+ Body::from(Bytes::from(s.into_bytes()))
+ }
+}
+
+impl From<&'static str> for Body {
+ #[inline]
+ fn from(slice: &'static str) -> Body {
+ Body::from(Bytes::from(slice.as_bytes()))
+ }
+}
+
+impl From<Cow<'static, str>> for Body {
+ #[inline]
+ fn from(cow: Cow<'static, str>) -> Body {
+ match cow {
+ Cow::Borrowed(b) => Body::from(b),
+ Cow::Owned(o) => Body::from(o),
+ }
+ }
+}
+
+impl Sender {
+ /// Check to see if this `Sender` can send more data.
+ pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
+ // Check if the receiver end has tried polling for the body yet
+ ready!(self.poll_want(cx)?);
+ self.data_tx
+ .poll_ready(cx)
+ .map_err(|_| crate::Error::new_closed())
+ }
+
+ fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
+ match self.want_rx.load(cx) {
+ WANT_READY => Poll::Ready(Ok(())),
+ WANT_PENDING => Poll::Pending,
+ watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
+ unexpected => unreachable!("want_rx value: {}", unexpected),
+ }
+ }
+
+ async fn ready(&mut self) -> crate::Result<()> {
+ futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
+ }
+
+ /// Send data on data channel when it is ready.
+ pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
+ self.ready().await?;
+ self.data_tx
+ .try_send(Ok(chunk))
+ .map_err(|_| crate::Error::new_closed())
+ }
+
+ /// Send trailers on trailers channel.
+ pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
+ let tx = match self.trailers_tx.take() {
+ Some(tx) => tx,
+ None => return Err(crate::Error::new_closed()),
+ };
+ tx.send(trailers).map_err(|_| crate::Error::new_closed())
+ }
+
+ /// Try to send data on this channel.
+ ///
+ /// # Errors
+ ///
+ /// Returns `Err(Bytes)` if the channel could not (currently) accept
+ /// another `Bytes`.
+ ///
+ /// # Note
+ ///
+ /// This is mostly useful for when trying to send from some other thread
+ /// that doesn't have an async context. If in an async context, prefer
+ /// `send_data()` instead.
+ pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
+ self.data_tx
+ .try_send(Ok(chunk))
+ .map_err(|err| err.into_inner().expect("just sent Ok"))
+ }
+
+ /// Aborts the body in an abnormal fashion.
+ pub fn abort(self) {
+ let _ = self
+ .data_tx
+ // clone so the send works even if buffer is full
+ .clone()
+ .try_send(Err(crate::Error::new_body_write_aborted()));
+ }
+
+ #[cfg(feature = "http1")]
+ pub(crate) fn send_error(&mut self, err: crate::Error) {
+ let _ = self.data_tx.try_send(Err(err));
+ }
+}
+
+impl fmt::Debug for Sender {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ #[derive(Debug)]
+ struct Open;
+ #[derive(Debug)]
+ struct Closed;
+
+ let mut builder = f.debug_tuple("Sender");
+ match self.want_rx.peek() {
+ watch::CLOSED => builder.field(&Closed),
+ _ => builder.field(&Open),
+ };
+
+ builder.finish()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::mem;
+ use std::task::Poll;
+
+ use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
+
+ #[test]
+ fn test_size_of() {
+ // These are mostly to help catch *accidentally* increasing
+ // the size by too much.
+
+ let body_size = mem::size_of::<Body>();
+ let body_expected_size = mem::size_of::<u64>() * 6;
+ assert!(
+ body_size <= body_expected_size,
+ "Body size = {} <= {}",
+ body_size,
+ body_expected_size,
+ );
+
+ assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
+
+ assert_eq!(
+ mem::size_of::<Sender>(),
+ mem::size_of::<usize>() * 5,
+ "Sender"
+ );
+
+ assert_eq!(
+ mem::size_of::<Sender>(),
+ mem::size_of::<Option<Sender>>(),
+ "Option<Sender>"
+ );
+ }
+
+ #[test]
+ fn size_hint() {
+ fn eq(body: Body, b: SizeHint, note: &str) {
+ let a = body.size_hint();
+ assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
+ assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
+ }
+
+ eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
+
+ eq(Body::empty(), SizeHint::with_exact(0), "empty");
+
+ eq(Body::channel().1, SizeHint::new(), "channel");
+
+ eq(
+ Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
+ SizeHint::with_exact(4),
+ "channel with length",
+ );
+ }
+
+ #[tokio::test]
+ async fn channel_abort() {
+ let (tx, mut rx) = Body::channel();
+
+ tx.abort();
+
+ let err = rx.data().await.unwrap().unwrap_err();
+ assert!(err.is_body_write_aborted(), "{:?}", err);
+ }
+
+ #[tokio::test]
+ async fn channel_abort_when_buffer_is_full() {
+ let (mut tx, mut rx) = Body::channel();
+
+ tx.try_send_data("chunk 1".into()).expect("send 1");
+ // buffer is full, but can still send abort
+ tx.abort();
+
+ let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
+ assert_eq!(chunk1, "chunk 1");
+
+ let err = rx.data().await.unwrap().unwrap_err();
+ assert!(err.is_body_write_aborted(), "{:?}", err);
+ }
+
+ #[test]
+ fn channel_buffers_one() {
+ let (mut tx, _rx) = Body::channel();
+
+ tx.try_send_data("chunk 1".into()).expect("send 1");
+
+ // buffer is now full
+ let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
+ assert_eq!(chunk2, "chunk 2");
+ }
+
+ #[tokio::test]
+ async fn channel_empty() {
+ let (_, mut rx) = Body::channel();
+
+ assert!(rx.data().await.is_none());
+ }
+
+ #[test]
+ fn channel_ready() {
+ let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
+
+ let mut tx_ready = tokio_test::task::spawn(tx.ready());
+
+ assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
+ }
+
+ #[test]
+ fn channel_wanter() {
+ let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
+
+ let mut tx_ready = tokio_test::task::spawn(tx.ready());
+ let mut rx_data = tokio_test::task::spawn(rx.data());
+
+ assert!(
+ tx_ready.poll().is_pending(),
+ "tx isn't ready before rx has been polled"
+ );
+
+ assert!(rx_data.poll().is_pending(), "poll rx.data");
+ assert!(tx_ready.is_woken(), "rx poll wakes tx");
+
+ assert!(
+ tx_ready.poll().is_ready(),
+ "tx is ready after rx has been polled"
+ );
+ }
+
+ #[test]
+ fn channel_notices_closure() {
+ let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
+
+ let mut tx_ready = tokio_test::task::spawn(tx.ready());
+
+ assert!(
+ tx_ready.poll().is_pending(),
+ "tx isn't ready before rx has been polled"
+ );
+
+ drop(rx);
+ assert!(tx_ready.is_woken(), "dropping rx wakes tx");
+
+ match tx_ready.poll() {
+ Poll::Ready(Err(ref e)) if e.is_closed() => (),
+ unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
+ }
+ }
+}
diff --git a/third_party/rust/hyper/src/body/length.rs b/third_party/rust/hyper/src/body/length.rs
new file mode 100644
index 0000000000..e2bbee8039
--- /dev/null
+++ b/third_party/rust/hyper/src/body/length.rs
@@ -0,0 +1,123 @@
+use std::fmt;
+
+#[derive(Clone, Copy, PartialEq, Eq)]
+pub(crate) struct DecodedLength(u64);
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl From<Option<u64>> for DecodedLength {
+ fn from(len: Option<u64>) -> Self {
+ len.and_then(|len| {
+ // If the length is u64::MAX, oh well, just reported chunked.
+ Self::checked_new(len).ok()
+ })
+ .unwrap_or(DecodedLength::CHUNKED)
+ }
+}
+
+#[cfg(any(feature = "http1", feature = "http2", test))]
+const MAX_LEN: u64 = std::u64::MAX - 2;
+
+impl DecodedLength {
+ pub(crate) const CLOSE_DELIMITED: DecodedLength = DecodedLength(::std::u64::MAX);
+ pub(crate) const CHUNKED: DecodedLength = DecodedLength(::std::u64::MAX - 1);
+ pub(crate) const ZERO: DecodedLength = DecodedLength(0);
+
+ #[cfg(test)]
+ pub(crate) fn new(len: u64) -> Self {
+ debug_assert!(len <= MAX_LEN);
+ DecodedLength(len)
+ }
+
+ /// Takes the length as a content-length without other checks.
+ ///
+ /// Should only be called if previously confirmed this isn't
+ /// CLOSE_DELIMITED or CHUNKED.
+ #[inline]
+ #[cfg(feature = "http1")]
+ pub(crate) fn danger_len(self) -> u64 {
+ debug_assert!(self.0 < Self::CHUNKED.0);
+ self.0
+ }
+
+ /// Converts to an Option<u64> representing a Known or Unknown length.
+ pub(crate) fn into_opt(self) -> Option<u64> {
+ match self {
+ DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => None,
+ DecodedLength(known) => Some(known),
+ }
+ }
+
+ /// Checks the `u64` is within the maximum allowed for content-length.
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ pub(crate) fn checked_new(len: u64) -> Result<Self, crate::error::Parse> {
+ use tracing::warn;
+
+ if len <= MAX_LEN {
+ Ok(DecodedLength(len))
+ } else {
+ warn!("content-length bigger than maximum: {} > {}", len, MAX_LEN);
+ Err(crate::error::Parse::TooLarge)
+ }
+ }
+
+ pub(crate) fn sub_if(&mut self, amt: u64) {
+ match *self {
+ DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => (),
+ DecodedLength(ref mut known) => {
+ *known -= amt;
+ }
+ }
+ }
+
+ /// Returns whether this represents an exact length.
+ ///
+ /// This includes 0, which of course is an exact known length.
+ ///
+ /// It would return false if "chunked" or otherwise size-unknown.
+ #[cfg(feature = "http2")]
+ pub(crate) fn is_exact(&self) -> bool {
+ self.0 <= MAX_LEN
+ }
+}
+
+impl fmt::Debug for DecodedLength {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ DecodedLength::CLOSE_DELIMITED => f.write_str("CLOSE_DELIMITED"),
+ DecodedLength::CHUNKED => f.write_str("CHUNKED"),
+ DecodedLength(n) => f.debug_tuple("DecodedLength").field(&n).finish(),
+ }
+ }
+}
+
+impl fmt::Display for DecodedLength {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ DecodedLength::CLOSE_DELIMITED => f.write_str("close-delimited"),
+ DecodedLength::CHUNKED => f.write_str("chunked encoding"),
+ DecodedLength::ZERO => f.write_str("empty"),
+ DecodedLength(n) => write!(f, "content-length ({} bytes)", n),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn sub_if_known() {
+ let mut len = DecodedLength::new(30);
+ len.sub_if(20);
+
+ assert_eq!(len.0, 10);
+ }
+
+ #[test]
+ fn sub_if_chunked() {
+ let mut len = DecodedLength::CHUNKED;
+ len.sub_if(20);
+
+ assert_eq!(len, DecodedLength::CHUNKED);
+ }
+}
diff --git a/third_party/rust/hyper/src/body/mod.rs b/third_party/rust/hyper/src/body/mod.rs
new file mode 100644
index 0000000000..5e2181e941
--- /dev/null
+++ b/third_party/rust/hyper/src/body/mod.rs
@@ -0,0 +1,65 @@
+//! Streaming bodies for Requests and Responses
+//!
+//! For both [Clients](crate::client) and [Servers](crate::server), requests and
+//! responses use streaming bodies, instead of complete buffering. This
+//! allows applications to not use memory they don't need, and allows exerting
+//! back-pressure on connections by only reading when asked.
+//!
+//! There are two pieces to this in hyper:
+//!
+//! - **The [`HttpBody`](HttpBody) trait** describes all possible bodies.
+//! hyper allows any body type that implements `HttpBody`, allowing
+//! applications to have fine-grained control over their streaming.
+//! - **The [`Body`](Body) concrete type**, which is an implementation of
+//! `HttpBody`, and returned by hyper as a "receive stream" (so, for server
+//! requests and client responses). It is also a decent default implementation
+//! if you don't have very custom needs of your send streams.
+
+pub use bytes::{Buf, Bytes};
+pub use http_body::Body as HttpBody;
+pub use http_body::SizeHint;
+
+pub use self::aggregate::aggregate;
+pub use self::body::{Body, Sender};
+pub(crate) use self::length::DecodedLength;
+pub use self::to_bytes::to_bytes;
+
+mod aggregate;
+mod body;
+mod length;
+mod to_bytes;
+
+/// An optimization to try to take a full body if immediately available.
+///
+/// This is currently limited to *only* `hyper::Body`s.
+#[cfg(feature = "http1")]
+pub(crate) fn take_full_data<T: HttpBody + 'static>(body: &mut T) -> Option<T::Data> {
+ use std::any::{Any, TypeId};
+
+ // This static type check can be optimized at compile-time.
+ if TypeId::of::<T>() == TypeId::of::<Body>() {
+ let mut full = (body as &mut dyn Any)
+ .downcast_mut::<Body>()
+ .expect("must be Body")
+ .take_full_data();
+ // This second cast is required to make the type system happy.
+ // Without it, the compiler cannot reason that the type is actually
+ // `T::Data`. Oh wells.
+ //
+ // It's still a measurable win!
+ (&mut full as &mut dyn Any)
+ .downcast_mut::<Option<T::Data>>()
+ .expect("must be T::Data")
+ .take()
+ } else {
+ None
+ }
+}
+
+fn _assert_send_sync() {
+ fn _assert_send<T: Send>() {}
+ fn _assert_sync<T: Sync>() {}
+
+ _assert_send::<Body>();
+ _assert_sync::<Body>();
+}
diff --git a/third_party/rust/hyper/src/body/to_bytes.rs b/third_party/rust/hyper/src/body/to_bytes.rs
new file mode 100644
index 0000000000..038c6fd0f3
--- /dev/null
+++ b/third_party/rust/hyper/src/body/to_bytes.rs
@@ -0,0 +1,82 @@
+use bytes::{Buf, BufMut, Bytes};
+
+use super::HttpBody;
+
+/// Concatenate the buffers from a body into a single `Bytes` asynchronously.
+///
+/// This may require copying the data into a single buffer. If you don't need
+/// a contiguous buffer, prefer the [`aggregate`](crate::body::aggregate())
+/// function.
+///
+/// # Note
+///
+/// Care needs to be taken if the remote is untrusted. The function doesn't implement any length
+/// checks and an malicious peer might make it consume arbitrary amounts of memory. Checking the
+/// `Content-Length` is a possibility, but it is not strictly mandated to be present.
+///
+/// # Example
+///
+/// ```
+/// # #[cfg(all(feature = "client", feature = "tcp", any(feature = "http1", feature = "http2")))]
+/// # async fn doc() -> hyper::Result<()> {
+/// use hyper::{body::HttpBody};
+///
+/// # let request = hyper::Request::builder()
+/// # .method(hyper::Method::POST)
+/// # .uri("http://httpbin.org/post")
+/// # .header("content-type", "application/json")
+/// # .body(hyper::Body::from(r#"{"library":"hyper"}"#)).unwrap();
+/// # let client = hyper::Client::new();
+/// let response = client.request(request).await?;
+///
+/// const MAX_ALLOWED_RESPONSE_SIZE: u64 = 1024;
+///
+/// let response_content_length = match response.body().size_hint().upper() {
+/// Some(v) => v,
+/// None => MAX_ALLOWED_RESPONSE_SIZE + 1 // Just to protect ourselves from a malicious response
+/// };
+///
+/// if response_content_length < MAX_ALLOWED_RESPONSE_SIZE {
+/// let body_bytes = hyper::body::to_bytes(response.into_body()).await?;
+/// println!("body: {:?}", body_bytes);
+/// }
+///
+/// # Ok(())
+/// # }
+/// ```
+pub async fn to_bytes<T>(body: T) -> Result<Bytes, T::Error>
+where
+ T: HttpBody,
+{
+ futures_util::pin_mut!(body);
+
+ // If there's only 1 chunk, we can just return Buf::to_bytes()
+ let mut first = if let Some(buf) = body.data().await {
+ buf?
+ } else {
+ return Ok(Bytes::new());
+ };
+
+ let second = if let Some(buf) = body.data().await {
+ buf?
+ } else {
+ return Ok(first.copy_to_bytes(first.remaining()));
+ };
+
+ // Don't pre-emptively reserve *too* much.
+ let rest = (body.size_hint().lower() as usize).min(1024 * 16);
+ let cap = first
+ .remaining()
+ .saturating_add(second.remaining())
+ .saturating_add(rest);
+ // With more than 1 buf, we gotta flatten into a Vec first.
+ let mut vec = Vec::with_capacity(cap);
+ vec.put(first);
+ vec.put(second);
+
+ while let Some(buf) = body.data().await {
+ vec.put(buf?);
+ }
+
+ Ok(vec.into())
+}