summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/proto/h2/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/proto/h2/mod.rs')
-rw-r--r--third_party/rust/hyper/src/proto/h2/mod.rs471
1 files changed, 471 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/proto/h2/mod.rs b/third_party/rust/hyper/src/proto/h2/mod.rs
new file mode 100644
index 0000000000..5857c919d1
--- /dev/null
+++ b/third_party/rust/hyper/src/proto/h2/mod.rs
@@ -0,0 +1,471 @@
+use bytes::{Buf, Bytes};
+use h2::{Reason, RecvStream, SendStream};
+use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE};
+use http::HeaderMap;
+use pin_project_lite::pin_project;
+use std::error::Error as StdError;
+use std::io::{self, Cursor, IoSlice};
+use std::mem;
+use std::task::Context;
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tracing::{debug, trace, warn};
+
+use crate::body::HttpBody;
+use crate::common::{task, Future, Pin, Poll};
+use crate::proto::h2::ping::Recorder;
+
+pub(crate) mod ping;
+
+cfg_client! {
+ pub(crate) mod client;
+ pub(crate) use self::client::ClientTask;
+}
+
+cfg_server! {
+ pub(crate) mod server;
+ pub(crate) use self::server::Server;
+}
+
+/// Default initial stream window size defined in HTTP2 spec.
+pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
+
+fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
+ // List of connection headers from:
+ // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
+ //
+ // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
+ // tested separately.
+ let connection_headers = [
+ HeaderName::from_lowercase(b"keep-alive").unwrap(),
+ HeaderName::from_lowercase(b"proxy-connection").unwrap(),
+ TRAILER,
+ TRANSFER_ENCODING,
+ UPGRADE,
+ ];
+
+ for header in connection_headers.iter() {
+ if headers.remove(header).is_some() {
+ warn!("Connection header illegal in HTTP/2: {}", header.as_str());
+ }
+ }
+
+ if is_request {
+ if headers
+ .get(TE)
+ .map(|te_header| te_header != "trailers")
+ .unwrap_or(false)
+ {
+ warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
+ headers.remove(TE);
+ }
+ } else if headers.remove(TE).is_some() {
+ warn!("TE headers illegal in HTTP/2 responses");
+ }
+
+ if let Some(header) = headers.remove(CONNECTION) {
+ warn!(
+ "Connection header illegal in HTTP/2: {}",
+ CONNECTION.as_str()
+ );
+ let header_contents = header.to_str().unwrap();
+
+ // A `Connection` header may have a comma-separated list of names of other headers that
+ // are meant for only this specific connection.
+ //
+ // Iterate these names and remove them as headers. Connection-specific headers are
+ // forbidden in HTTP2, as that information has been moved into frame types of the h2
+ // protocol.
+ for name in header_contents.split(',') {
+ let name = name.trim();
+ headers.remove(name);
+ }
+ }
+}
+
+// body adapters used by both Client and Server
+
+pin_project! {
+ struct PipeToSendStream<S>
+ where
+ S: HttpBody,
+ {
+ body_tx: SendStream<SendBuf<S::Data>>,
+ data_done: bool,
+ #[pin]
+ stream: S,
+ }
+}
+
+impl<S> PipeToSendStream<S>
+where
+ S: HttpBody,
+{
+ fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
+ PipeToSendStream {
+ body_tx: tx,
+ data_done: false,
+ stream,
+ }
+ }
+}
+
+impl<S> Future for PipeToSendStream<S>
+where
+ S: HttpBody,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Output = crate::Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ loop {
+ if !*me.data_done {
+ // we don't have the next chunk of data yet, so just reserve 1 byte to make
+ // sure there's some capacity available. h2 will handle the capacity management
+ // for the actual body chunk.
+ me.body_tx.reserve_capacity(1);
+
+ if me.body_tx.capacity() == 0 {
+ loop {
+ match ready!(me.body_tx.poll_capacity(cx)) {
+ Some(Ok(0)) => {}
+ Some(Ok(_)) => break,
+ Some(Err(e)) => {
+ return Poll::Ready(Err(crate::Error::new_body_write(e)))
+ }
+ None => {
+ // None means the stream is no longer in a
+ // streaming state, we either finished it
+ // somehow, or the remote reset us.
+ return Poll::Ready(Err(crate::Error::new_body_write(
+ "send stream capacity unexpectedly closed",
+ )));
+ }
+ }
+ }
+ } else if let Poll::Ready(reason) = me
+ .body_tx
+ .poll_reset(cx)
+ .map_err(crate::Error::new_body_write)?
+ {
+ debug!("stream received RST_STREAM: {:?}", reason);
+ return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
+ reason,
+ ))));
+ }
+
+ match ready!(me.stream.as_mut().poll_data(cx)) {
+ Some(Ok(chunk)) => {
+ let is_eos = me.stream.is_end_stream();
+ trace!(
+ "send body chunk: {} bytes, eos={}",
+ chunk.remaining(),
+ is_eos,
+ );
+
+ let buf = SendBuf::Buf(chunk);
+ me.body_tx
+ .send_data(buf, is_eos)
+ .map_err(crate::Error::new_body_write)?;
+
+ if is_eos {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
+ None => {
+ me.body_tx.reserve_capacity(0);
+ let is_eos = me.stream.is_end_stream();
+ if is_eos {
+ return Poll::Ready(me.body_tx.send_eos_frame());
+ } else {
+ *me.data_done = true;
+ // loop again to poll_trailers
+ }
+ }
+ }
+ } else {
+ if let Poll::Ready(reason) = me
+ .body_tx
+ .poll_reset(cx)
+ .map_err(crate::Error::new_body_write)?
+ {
+ debug!("stream received RST_STREAM: {:?}", reason);
+ return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
+ reason,
+ ))));
+ }
+
+ match ready!(me.stream.poll_trailers(cx)) {
+ Ok(Some(trailers)) => {
+ me.body_tx
+ .send_trailers(trailers)
+ .map_err(crate::Error::new_body_write)?;
+ return Poll::Ready(Ok(()));
+ }
+ Ok(None) => {
+ // There were no trailers, so send an empty DATA frame...
+ return Poll::Ready(me.body_tx.send_eos_frame());
+ }
+ Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
+ }
+ }
+ }
+ }
+}
+
+trait SendStreamExt {
+ fn on_user_err<E>(&mut self, err: E) -> crate::Error
+ where
+ E: Into<Box<dyn std::error::Error + Send + Sync>>;
+ fn send_eos_frame(&mut self) -> crate::Result<()>;
+}
+
+impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
+ fn on_user_err<E>(&mut self, err: E) -> crate::Error
+ where
+ E: Into<Box<dyn std::error::Error + Send + Sync>>,
+ {
+ let err = crate::Error::new_user_body(err);
+ debug!("send body user stream error: {}", err);
+ self.send_reset(err.h2_reason());
+ err
+ }
+
+ fn send_eos_frame(&mut self) -> crate::Result<()> {
+ trace!("send body eos");
+ self.send_data(SendBuf::None, true)
+ .map_err(crate::Error::new_body_write)
+ }
+}
+
+#[repr(usize)]
+enum SendBuf<B> {
+ Buf(B),
+ Cursor(Cursor<Box<[u8]>>),
+ None,
+}
+
+impl<B: Buf> Buf for SendBuf<B> {
+ #[inline]
+ fn remaining(&self) -> usize {
+ match *self {
+ Self::Buf(ref b) => b.remaining(),
+ Self::Cursor(ref c) => Buf::remaining(c),
+ Self::None => 0,
+ }
+ }
+
+ #[inline]
+ fn chunk(&self) -> &[u8] {
+ match *self {
+ Self::Buf(ref b) => b.chunk(),
+ Self::Cursor(ref c) => c.chunk(),
+ Self::None => &[],
+ }
+ }
+
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ match *self {
+ Self::Buf(ref mut b) => b.advance(cnt),
+ Self::Cursor(ref mut c) => c.advance(cnt),
+ Self::None => {}
+ }
+ }
+
+ fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
+ match *self {
+ Self::Buf(ref b) => b.chunks_vectored(dst),
+ Self::Cursor(ref c) => c.chunks_vectored(dst),
+ Self::None => 0,
+ }
+ }
+}
+
+struct H2Upgraded<B>
+where
+ B: Buf,
+{
+ ping: Recorder,
+ send_stream: UpgradedSendStream<B>,
+ recv_stream: RecvStream,
+ buf: Bytes,
+}
+
+impl<B> AsyncRead for H2Upgraded<B>
+where
+ B: Buf,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ read_buf: &mut ReadBuf<'_>,
+ ) -> Poll<Result<(), io::Error>> {
+ if self.buf.is_empty() {
+ self.buf = loop {
+ match ready!(self.recv_stream.poll_data(cx)) {
+ None => return Poll::Ready(Ok(())),
+ Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
+ continue
+ }
+ Some(Ok(buf)) => {
+ self.ping.record_data(buf.len());
+ break buf;
+ }
+ Some(Err(e)) => {
+ return Poll::Ready(match e.reason() {
+ Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
+ Some(Reason::STREAM_CLOSED) => {
+ Err(io::Error::new(io::ErrorKind::BrokenPipe, e))
+ }
+ _ => Err(h2_to_io_error(e)),
+ })
+ }
+ }
+ };
+ }
+ let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
+ read_buf.put_slice(&self.buf[..cnt]);
+ self.buf.advance(cnt);
+ let _ = self.recv_stream.flow_control().release_capacity(cnt);
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<B> AsyncWrite for H2Upgraded<B>
+where
+ B: Buf,
+{
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ if buf.is_empty() {
+ return Poll::Ready(Ok(0));
+ }
+ self.send_stream.reserve_capacity(buf.len());
+
+ // We ignore all errors returned by `poll_capacity` and `write`, as we
+ // will get the correct from `poll_reset` anyway.
+ let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
+ None => Some(0),
+ Some(Ok(cnt)) => self
+ .send_stream
+ .write(&buf[..cnt], false)
+ .ok()
+ .map(|()| cnt),
+ Some(Err(_)) => None,
+ };
+
+ if let Some(cnt) = cnt {
+ return Poll::Ready(Ok(cnt));
+ }
+
+ Poll::Ready(Err(h2_to_io_error(
+ match ready!(self.send_stream.poll_reset(cx)) {
+ Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
+ return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
+ }
+ Ok(reason) => reason.into(),
+ Err(e) => e,
+ },
+ )))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), io::Error>> {
+ if self.send_stream.write(&[], true).is_ok() {
+ return Poll::Ready(Ok(()))
+ }
+
+ Poll::Ready(Err(h2_to_io_error(
+ match ready!(self.send_stream.poll_reset(cx)) {
+ Ok(Reason::NO_ERROR) => {
+ return Poll::Ready(Ok(()))
+ }
+ Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
+ return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
+ }
+ Ok(reason) => reason.into(),
+ Err(e) => e,
+ },
+ )))
+ }
+}
+
+fn h2_to_io_error(e: h2::Error) -> io::Error {
+ if e.is_io() {
+ e.into_io().unwrap()
+ } else {
+ io::Error::new(io::ErrorKind::Other, e)
+ }
+}
+
+struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
+
+impl<B> UpgradedSendStream<B>
+where
+ B: Buf,
+{
+ unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
+ assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
+ Self(mem::transmute(inner))
+ }
+
+ fn reserve_capacity(&mut self, cnt: usize) {
+ unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
+ }
+
+ fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
+ unsafe { self.as_inner_unchecked().poll_capacity(cx) }
+ }
+
+ fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
+ unsafe { self.as_inner_unchecked().poll_reset(cx) }
+ }
+
+ fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error> {
+ let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
+ unsafe {
+ self.as_inner_unchecked()
+ .send_data(send_buf, end_of_stream)
+ .map_err(h2_to_io_error)
+ }
+ }
+
+ unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
+ &mut *(&mut self.0 as *mut _ as *mut _)
+ }
+}
+
+#[repr(transparent)]
+struct Neutered<B> {
+ _inner: B,
+ impossible: Impossible,
+}
+
+enum Impossible {}
+
+unsafe impl<B> Send for Neutered<B> {}
+
+impl<B> Buf for Neutered<B> {
+ fn remaining(&self) -> usize {
+ match self.impossible {}
+ }
+
+ fn chunk(&self) -> &[u8] {
+ match self.impossible {}
+ }
+
+ fn advance(&mut self, _cnt: usize) {
+ match self.impossible {}
+ }
+}